123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- from pypdevs.DEVS import AtomicDEVS, CoupledDEVS
- from pypdevs.simulator import Simulator
- from pypdevs.infinity import INFINITY
- import random
- import Tkinter as tk
- import math
- class Ball(AtomicDEVS):
- def __init__(self, ball_id, canvas):
- AtomicDEVS.__init__(self, "Ball[%i]" % ball_id)
-
- self.collision_detect = self.addInPort("colision_detect")
- self.position_out = self.addOutPort("position_out")
- self.ball_spawner_comm = self.addOutPort("ball_spawner")
-
- self.ball_id = ball_id
- self.canvas = canvas
- self.r = random.uniform(5.0, 20.0)
- self.vel = {'x': random.random() * 5.0 - 1.0, 'y': random.random() * 5.0 - 1.0}
- self.initialized = False
- self.event = None
- self.curr_state = "bouncing"
- self.pos = None
- self.time_passed = 0
- self.collision_detected = False
- x = random.uniform(0, self.canvas.canvasx(self.canvas.winfo_width()) - (self.r * 2 + 1))
- y = random.uniform(0, self.canvas.canvasy(self.canvas.winfo_height()) - (self.r * 2 + 1))
- self.id = self.canvas.create_oval(x, y, x + (self.r * 2), y + (self.r * 2), fill="red")
- ''' TODO: not legal '''
- self.canvas.tag_bind(self.id, "<Button>", self.buttonPress)
- self.initialized = True
- self.pos = (x, y)
- self.remaining = 0.03
-
- def buttonPress(self, event):
- self.event = event
-
- def timeAdvance(self):
- if self.curr_state in ("bouncing", "spawning", "selected"):
- return self.remaining
- elif self.curr_state == "to_delete":
- return INFINITY
-
- def outputFnc(self):
- output = {}
- if self.pos and self.curr_state in ("bouncing", "spawning"):
- output[self.position_out] = [(self.ball_id, self.pos, self.r)]
- if self.collision_detected and self.state != "spawning":
- output[self.ball_spawner_comm] = ["spawn_ball"]
- if self.curr_state == "selected":
- output[self.position_out] = [("delete", self.ball_id)]
- return output
-
- def intTransition(self):
- if self.initialized:
- if random.random() <= 0.008: #self.event and self.event.num == 1 or random.random() <= 0.008:
- self.canvas.itemconfig(self.id, fill="yellow")
- self.curr_state = "selected"
- self.event = None
- self.remaining = 2.5
- elif self.curr_state == "selected":
- self.curr_state = "to_delete"
- else:
- if self.time_passed < 1 and self.curr_state in ("bouncing", "spawning"):
- self.time_passed += 0.03
- if self.time_passed >= 1:
- self.canvas.itemconfig(self.id, fill="black")
- self.curr_state = "bouncing"
- self.time_passed = 0
- pos = self.canvas.coords(self.id)
- x = self.canvas.canvasx(pos[0])
- y = self.canvas.canvasy(pos[1])
- if x <= 0 or x + (self.r * 2) >= self.canvas.canvasx(self.canvas.winfo_width()):
- self.vel['x'] = -self.vel['x']
- if y <= 0 or y + (self.r * 2) >= self.canvas.canvasy(self.canvas.winfo_height()):
- self.vel['y'] = -self.vel['y']
- self.canvas.move(self.id, self.vel['x'], self.vel['y']);
- self.pos = self.canvas.coords(self.id)
- if self.collision_detected:
- self.curr_state = "spawning"
- self.canvas.itemconfig(self.id, fill="blue")
- self.remaining = 0.03
- self.collision_detected = False
-
- def extTransition(self, inputs):
- self.remaining -= self.elapsed
- if self.collision_detect in inputs and self.curr_state == "bouncing":
- self.collision_detected = True
-
- def modelTransition(self, state):
- if self.curr_state == "to_delete":
- state['to_delete'] = self
- return True
- class BallSpawner(AtomicDEVS):
- def __init__(self, canvas):
- AtomicDEVS.__init__(self, "BallSpawner")
-
- self.request = self.addInPort("request")
-
- self.canvas = canvas
- self.id_ctr = 0
- self.new_ball = None
-
- def timeAdvance(self):
- return 0.5
-
- def intTransition(self):
- self.new_ball = Ball(self.id_ctr, self.canvas)
- self.id_ctr += 1
-
- def extTransition(self, inputs):
- if self.request in inputs:
- self.new_ball = Ball(self.id_ctr, self.canvas)
- self.id_ctr += 1
-
- def modelTransition(self, state):
- if self.new_ball:
- state['new_ball'] = self.new_ball
- return True
- return False
- class PositionManager(AtomicDEVS):
- def __init__(self):
- AtomicDEVS.__init__(self, "PositionManager")
-
- self.balls_in = self.addInPort("balls_in")
- self.balls_out = {}
- self.collisions = set([])
- self.positions = {}
- self.distances = {}
- self.curr_time = 0.0
-
- def timeAdvance(self):
- return 0 if self.collisions else INFINITY
-
- def extTransition(self, inputs):
- self.curr_time += self.elapsed
- if self.balls_in in inputs:
- input_bag = inputs[self.balls_in]
- for i in input_bag:
- if i[0] == "delete":
- if i[1] in self.positions:
- del self.positions[i[1]]
- else:
- self.positions[i[0]] = (i[1], i[2])
- for k1, v1 in self.positions.iteritems():
- for k2, v2 in self.positions.iteritems():
- if k1 != k2:
- dx = v1[0][0] - v2[0][0]
- dy = v1[0][1] - v2[0][1]
- distance = math.sqrt(dx**2 + dy**2)
- if distance < v1[1] + v2[1]:
- self.collisions.add(k1)
- self.collisions.add(k2)
- self.distances[(k1, k2)] = distance
-
- def intTransition(self):
- self.curr_time += self.timeAdvance()
- self.collisions = set([])
-
- def outputFnc(self):
- return {self.balls_out[ball_id]: ["collision_detected"] for ball_id in self.collisions}
- class Window(CoupledDEVS, tk.Toplevel):
- def __init__(self):
- tk.Toplevel.__init__(self)
- CoupledDEVS.__init__(self, "Window")
- self.title('BouncingBalls')
- self.geometry('{}x{}'.format(800, 600))
-
- CANVAS_SIZE_TUPLE = (0, 0, self.winfo_screenwidth(), self.winfo_screenheight())
- self.canvas = tk.Canvas(self, relief=tk.RIDGE, scrollregion=CANVAS_SIZE_TUPLE)
- self.canvas.pack(expand = True, fill=tk.BOTH)
-
- self.ball_spawner = self.addSubModel(BallSpawner(self.canvas))
- self.position_manager = self.addSubModel(PositionManager())
- self.INTERRUPT = self.addInPort("INTERRUPT")
- self.connectPorts(self.INTERRUPT, self.ball_spawner.request)
-
- self.attributes("-topmost", True)
-
- def modelTransition(self, state):
- if 'new_ball' in state:
- new_ball = state['new_ball']
- self.addSubModel(new_ball)
- pos_mgr_port = self.position_manager.addOutPort("balls_out[%s]" % new_ball.ball_id)
- ''' TODO: can I just change the state like that here? '''
- self.position_manager.balls_out[new_ball.ball_id] = pos_mgr_port
- self.connectPorts(new_ball.position_out, self.position_manager.balls_in)
- self.connectPorts(pos_mgr_port, new_ball.collision_detect)
- self.connectPorts(new_ball.ball_spawner_comm, self.ball_spawner.request)
- del state['new_ball']
- if 'to_delete' in state:
- self.removeSubModel(state['to_delete'])
- state['to_delete'].canvas.delete(state['to_delete'].id)
- del state['to_delete']
- return False
-
- if __name__ == '__main__':
- random.seed(1337)
-
- ''' Tkinter initialization '''
- root = tk.Tk()
- root.withdraw()
-
- ''' instantiate DEVS model '''
- model = Window()
-
- ''' simulator setup '''
- sim = Simulator(model)
- sim.setRealTime(True)
- sim.setRealTimeInputFile(None)
- sim.setRealTimePorts({})
- sim.setRealTimePlatformTk(root)
- sim.setDSDEVS(True)
- sim.setTerminationTime(30)
-
- ''' run simulation + visualization '''
- sim.simulate()
- root.mainloop()
|