123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484 |
- from pypdevs.DEVS import AtomicDEVS, CoupledDEVS
- from pypdevs.simulator import Simulator
- from pypdevs.infinity import INFINITY
- import random, math, sys, getopt
- import Tkinter as tk
- from Queue import Empty
- class Canvas:
- def __init__(self, resolution):
- self.width = resolution[0]
- self.height = resolution[1]
-
- class ParticleState:
- def __init__(self, canvas, particle_id):
- self.r = random.uniform(5.0, 30.0)
- x = random.uniform(self.r, canvas.width - self.r)
- y = random.uniform(self.r, canvas.height - self.r)
- self.pos = (x, y)
- self.vel = {'x': random.choice([1,-1]) * (50 + (random.random() * 25.0)), 'y': random.choice([1,-1]) * (50 + (random.random() * 25.0))}
- self.phase = "initializing" # phases: ('initializing', 'initialized', 'bouncing', 'spawning', 'selected', 'deleting', 'clicked')
- self.color = "red"
- self.prev_color = "red"
- self.collision_detected = False
- self.clicked = False
- self.frames_passed = 0
- self.frames_remaining = 0
- self.remaining = 0
- self.particle_id = particle_id
- class Particle(AtomicDEVS):
- def __init__(self, particle_id, canvas, framerate, spawn_chance, die_chance):
- AtomicDEVS.__init__(self, "Particle[%i]" % particle_id)
-
- self.particle_id = particle_id
- self.canvas = canvas
- self.framerate = framerate
- self.spawn_chance = 0.0
- self.die_chance = 0.0
-
- self.COLLISION_DETECT = self.addInPort("COLLISION_DETECT")
- self.POS_OUT = self.addOutPort("POS_OUT")
- self.SPAWNER_COMM = self.addOutPort("SPAWNER_COMM")
- self.COLOR_OUT = self.addOutPort("COLOR_OUT")
-
- self.state = ParticleState(canvas, particle_id)
-
- def timeAdvance(self):
- return self.state.remaining
-
- def outputFnc(self):
- output = {}
- if self.state.phase in ("bouncing", "spawning", "selected") or self.state.clicked:
- output[self.POS_OUT] = (self.particle_id, self.state.pos, self.state.r, self.state.vel)
- if self.state.collision_detected and self.state.phase != "spawning" and random.random() < self.spawn_chance:
- output[self.SPAWNER_COMM] = "spawn_particle"
- if self.state.phase == "selected" and self.state.prev_color == self.state.color:
- output[self.POS_OUT] = ("delete", self.particle_id)
- if self.state.color != self.state.prev_color:
- output[self.COLOR_OUT] = (self.particle_id, self.state.color)
- if self.state.phase == "initializing":
- output[self.POS_OUT] = ("created", self.particle_id, self.state.pos, self.state.r, self.state.vel)
- return output
-
- def intTransition(self):
- if self.state.phase == "initializing":
- self.state.phase = "initialized"
- self.state.remaining = 0
- elif self.state.phase == "initialized":
- self.state.phase = "bouncing"
- self.state.remaining = 1
- elif self.state.phase == "selected":
- if self.state.prev_color == self.state.color:
- self.state.phase = "deleting"
- self.state.remaining = INFINITY
- else:
- self.state.prev_color = self.state.color
- self.state.remaining = int(2.5 * self.framerate)
- elif self.state.prev_color != self.state.color:
- self.state.prev_color = self.state.color
- self.state.remaining = 0
- elif self.state.phase == "clicked":
- self.state.remaining = INFINITY
- elif random.random() < self.die_chance:
- self.state.vel = {'x': 0, 'y': 0}
- self.state.color = "yellow"
- self.state.phase = "selected"
- self.state.remaining = 0
- else:
- if self.state.frames_passed < self.framerate and self.state.phase in ("bouncing", "spawning") and self.state.color != "black":
- self.state.frames_passed += 1
- if self.state.frames_passed >= self.framerate and self.state.phase == "bouncing":
- self.state.color = "black"
- self.state.phase = "bouncing"
- self.state.frames_passed = 0
- if self.state.frames_passed >= 1 * self.framerate and self.state.phase == "spawning":
- self.state.color = "black"
- self.state.phase = "bouncing"
- self.state.frames_passed = 0
- x = self.state.pos[0]
- y = self.state.pos[1]
- if (x - self.state.r) <= 0 or x + self.state.r >= self.canvas.width:
- self.state.vel['x'] = -self.state.vel['x']
- if (y - self.state.r) <= 0 or y + self.state.r >= self.canvas.height:
- self.state.vel['y'] = -self.state.vel['y']
- self.state.pos = (self.state.pos[0] + (self.state.vel['x'] / self.framerate), self.state.pos[1] + (self.state.vel['y'] / self.framerate))
- if self.state.collision_detected:
- self.state.phase = "spawning"
- self.state.color = "blue"
- elif self.state.clicked:
- self.state.vel = {'x': 0, 'y': 0}
- self.state.phase = "clicked"
- self.state.color = "orange"
- self.state.clicked = False
- self.state.remaining = 1
- self.state.collision_detected = False
- return self.state
-
- def extTransition(self, inputs):
- if self.COLLISION_DETECT in inputs:
- if inputs[self.COLLISION_DETECT] == "delete_if_selected":
- if self.state.phase == "clicked":
- self.state.phase = "selected"
- self.state.remaining = 0
- elif inputs[self.COLLISION_DETECT] == "clicked":
- if self.state.phase == "bouncing":
- self.state.clicked = True
- self.state.remaining = 0
- elif self.state.phase in ("bouncing", "clicked", "deleting"):
- self.state.phase = "bouncing"
- self.state.collision_detected = True
- new_speed = inputs[self.COLLISION_DETECT][1]
- self.state.vel = {'x': new_speed[0], 'y': new_speed[1]}
- self.state.remaining = 0
- else:
- self.state.remaining -= self.elapsed
- return self.state
-
- def modelTransition(self, passed_values):
- if self.state.phase == "deleting":
- passed_values['to_delete'] = self
- return True
- elif self.state.phase == "initialized":
- passed_values['connect_particle'] = (self.particle_id, self.COLLISION_DETECT)
- return True
- return False
-
- class ParticleSpawnerState:
- def __init__(self):
- self.id_ctr = 0
- self.new_particle = False
- self.remaining = 0
- class ParticleSpawner(AtomicDEVS):
- def __init__(self, canvas, framerate, spawn_chance, die_chance):
- AtomicDEVS.__init__(self, "ParticleSpawner")
-
- self.canvas = canvas
- self.framerate = framerate
- self.spawn_chance = spawn_chance
- self.die_chance = die_chance
-
- self.REQUEST = self.addInPort("REQUEST")
-
- self.state = ParticleSpawnerState()
-
- def timeAdvance(self):
- return self.state.remaining
-
- def intTransition(self):
- self.state.new_particle = True
- self.state.id_ctr += 1
- self.state.remaining = self.framerate
- return self.state
-
- def extTransition(self, inputs):
- self.state.remaining -= self.elapsed
- if self.REQUEST in inputs:
- self.state.new_particle = True
- self.state.id_ctr += 1
- return self.state
-
- def modelTransition(self, passed_values):
- if self.state.new_particle:
- passed_values['new_particle'] = Particle(self.state.id_ctr, self.canvas, self.framerate, self.spawn_chance, self.die_chance)
- return True
- return False
- class PositionManagerState:
- def __init__(self):
- self.positions = {}
- self.collisions = set([])
- self.phase = "detecting"
- self.clicked = None
- self.delete_selected = False
- self.frames = 0
- self.remaining = 0
- self.PARTICLES_OUT = {}
-
- def __str__(self):
- return str((self.phase, self.frames, self.remaining))
- class PositionManager(AtomicDEVS):
- def __init__(self, framerate):
- AtomicDEVS.__init__(self, "PositionManager")
-
- self.framerate = framerate
-
- self.TIME_OUT = self.addOutPort("TIME_OUT")
- self.POS_IN = self.addInPort("POS_IN")
- self.INTERRUPT = self.addInPort("INTERRUPT")
-
- self.state = PositionManagerState()
-
- def timeAdvance(self):
- return self.state.remaining
-
- def outputFnc(self):
- output = {}
- if self.state.collisions:
- output = {self.state.PARTICLES_OUT[particle_id]: ("collision_detected", deltap) for (particle_id, deltap) in self.state.collisions}
- if self.state.clicked is not None:
- if self.state.clicked in self.state.PARTICLES_OUT:
- output[self.state.PARTICLES_OUT[self.state.clicked]] = "clicked"
- if self.state.delete_selected:
- output = {self.state.PARTICLES_OUT[particle_id]: "delete_if_selected" for particle_id in self.state.positions}
- output[self.TIME_OUT] = self.state.frames
- return output
-
- def intTransition(self):
- self.state.frames += self.timeAdvance()
- if self.state.clicked is not None:
- self.state.clicked = None
- self.state.remaining = 1
- elif self.state.delete_selected:
- self.state.delete_selected = False
- self.state.remaining = 1
- elif self.state.phase == "detecting":
- for k1, v1 in self.state.positions.iteritems():
- for k2, v2 in self.state.positions.iteritems():
- if k1 != k2:
- dx = v2[0][0] - v1[0][0]
- dy = v2[0][1] - v1[0][1]
- distance = math.sqrt(dx**2 + dy**2)
- if (distance < (v1[1] + v2[1])) and (k1 not in self.state.collisions or k2 not in self.state.collisions):
- '''
- u = [dx / distance, dy / distance]
- vab = {'x': v2[2]['x'] - v1[2]['x'], 'y': v2[2]['y'] - v1[2]['y']}
- vu_mult = vab['x'] * u[0] + vab['y'] * u[1]
- vu = [u[0] * vu_mult, u[1] * vu_mult]
- deltap_mult = 1
- deltap = (vu[0] * deltap_mult, vu[1] * deltap_mult)
- self.state.collisions.add((k1, deltap))
- self.state.collisions.add((k2, (-deltap[0], -deltap[1])))
- '''
- new_speed_1 = (v2[2]['x'], v2[2]['y'])
- new_speed_2 = (v1[2]['x'], v1[2]['y'])
- self.state.collisions.add((k1, new_speed_1))
- self.state.collisions.add((k2, new_speed_2))
- self.state.phase = "detected"
- self.state.remaining = 0
- elif self.state.phase == "detected":
- self.state.collisions = set([])
- self.state.phase = "detecting"
- self.state.remaining = 1
- return self.state
-
- def extTransition(self, inputs):
- self.state.remaining -= self.elapsed
- self.state.frames += self.elapsed
- if self.POS_IN in inputs:
- msg = inputs[self.POS_IN]
- if msg[0] == "created":
- new_particle_id = msg[1]
- self.state.PARTICLES_OUT[new_particle_id] = self.addOutPort("PARTICLES_OUT[%s]" % new_particle_id)
- self.state.positions[new_particle_id] = (msg[2], msg[3], msg[4])
- elif msg[0] == "delete":
- del self.state.PARTICLES_OUT[msg[1]]
- del self.state.positions[msg[1]]
- else:
- particle_id = msg[0]
- r = msg[2]
- self.state.positions[particle_id] = (msg[1], msg[2], msg[3])
- # TODO: Fix this, needs to be rounded to integer.
- elif self.INTERRUPT in inputs:
- msg = inputs[self.INTERRUPT][0]
- if msg == "delete_selected":
- self.state.delete_selected = True
- else:
- self.state.clicked = int(msg)
- return self.state
- class Field(CoupledDEVS):
- def __init__(self, canvas, framerate, spawn_chance, die_chance):
- CoupledDEVS.__init__(self, "Field")
-
- self.INTERRUPT = self.addInPort("INTERRUPT")
- self.POS_OUT = self.addOutPort("POS_OUT")
- self.COLOR_OUT = self.addOutPort("COLOR_OUT")
- self.TIME_OUT = self.addOutPort("TIME_OUT")
-
- self.particle_spawner = self.addSubModel(ParticleSpawner(canvas, framerate, spawn_chance, die_chance))
- self.position_manager = self.addSubModel(PositionManager(framerate))
- self.connectPorts(self.INTERRUPT, self.position_manager.INTERRUPT)
- self.connectPorts(self.position_manager.TIME_OUT, self.TIME_OUT)
-
- def modelTransition(self, passed_values):
- if 'new_particle' in passed_values:
- new_particle = passed_values['new_particle']
- self.addSubModel(new_particle)
- self.connectPorts(new_particle.POS_OUT, self.position_manager.POS_IN)
- self.connectPorts(new_particle.POS_OUT, self.POS_OUT)
- self.connectPorts(new_particle.COLOR_OUT, self.COLOR_OUT)
- self.connectPorts(new_particle.SPAWNER_COMM, self.particle_spawner.REQUEST)
- del passed_values['new_particle']
- if 'to_delete' in passed_values:
- self.removeSubModel(passed_values['to_delete'])
- del passed_values['to_delete']
- if 'connect_particle' in passed_values:
- particle_id, COLLISION_DETECT = passed_values['connect_particle']
- self.connectPorts(self.position_manager.state.PARTICLES_OUT[particle_id], COLLISION_DETECT)
- del passed_values['connect_particle']
- return False
-
- def select(self, imm_children):
- pos_mgr, particle_spawner = [None] * 2
- for c in imm_children:
- if c.getModelName() == "PositionManager":
- pos_mgr = c
- elif c.getModelName() == "ParticleSpawner":
- particle_spawner = c
- if particle_spawner:
- return particle_spawner
- elif pos_mgr:
- return imm_children[0] #pos_mgr
- else:
- return imm_children[0]
- '''
- class ParticleVisualization:
- def __init__(self, particle_id, circle_id, middle_id, canvas, sim):
- self.particle_id = particle_id
- self.circle_id = circle_id
- self.middle_id = middle_id
- self.canvas = canvas
- self.sim = sim
-
- self.canvas.tag_bind(self.circle_id, "<Button-1>", self.on_click)
- self.canvas.tag_bind(self.middle_id, "<Button-1>", self.on_click)
-
- def on_click(self, event):
- self.sim.realtime_interrupt("INTERRUPT %i" % self.particle_id)
- # TODO: Model this in SCCD
- class Visualizer(tk.Toplevel):
- def __init__(self, root, resolution, framerate, sim):
- tk.Toplevel.__init__(self)
-
- self.framerate = int((1.0 / framerate) * 1000)
- self.sim = sim
- self.root = root
-
- self.geometry('{}x{}'.format(resolution[0], resolution[1]))
- CANVAS_SIZE_TUPLE = (0, 0, self.winfo_screenwidth(), self.winfo_screenheight())
- self.canvas = tk.Canvas(self, relief=tk.RIDGE, scrollregion=CANVAS_SIZE_TUPLE)
- self.canvas.pack(expand = True, fill=tk.BOTH)
- self.text = self.canvas.create_text(5, 5, anchor='nw', text="TIME: %s" % 0.0)
- self.particles = {}
-
- self.protocol("WM_DELETE_WINDOW", lambda: self.root.destroy())
- self.bind("<Delete>", self.delete_pressed)
-
- self.after(self.framerate, self.check_output)
-
- def delete_pressed(self, event):
- self.sim.realtime_interrupt("INTERRUPT delete_selected")
-
- def check_output(self):
- while True:
- try:
- msg = self.sim.model.out_msg_queue.get(False)
- self.canvas.itemconfig(self.text, text=msg[0][0])
- port = msg[1]
- if port == self.sim.model.POS_OUT:
- msg_contents = msg[2][0]
- if msg_contents[0] == "delete":
- particle_id = msg_contents[1]
- self.canvas.delete(self.particles[particle_id].circle_id)
- self.canvas.delete(self.particles[particle_id].middle_id)
- del self.particles[particle_id]
- elif msg_contents[0] == "created":
- pass
- else:
- particle_id = msg_contents[0]
- x = msg_contents[1][0]
- y = msg_contents[1][1]
- r = msg_contents[2]
- if not particle_id in self.particles:
- circle_id = self.canvas.create_oval(x - r, y - r, x + r, y + r, fill="red")
- middle_id = self.canvas.create_oval(x - 4, y - 4, x + 4, y + 4, fill="orange")
- self.particles[particle_id] = ParticleVisualization(particle_id, circle_id, middle_id, self.canvas, self.sim)
- else:
- circle_id = self.particles[particle_id].circle_id
- middle_id = self.particles[particle_id].middle_id
- curr_pos = self.canvas.coords(circle_id)
- self.canvas.move(circle_id, x - r - curr_pos[0], y - r - curr_pos[1])
- curr_pos_middle = self.canvas.coords(middle_id)
- self.canvas.move(middle_id, x - 4 - curr_pos_middle[0], y - 4 - curr_pos_middle[1])
- elif port == self.sim.model.COLOR_OUT:
- msg_contents = msg[2][0]
- particle_id = msg_contents[0]
- circle_id = self.particles[particle_id].circle_id
- middle_id = self.particles[particle_id].middle_id
- color = msg_contents[1]
- self.canvas.itemconfig(circle_id, fill=color)
- except Empty:
- break
- self.after(self.framerate, self.check_output)
- if __name__ == '__main__':
- root = tk.Tk()
- root.withdraw()
- try:
- opts, args = getopt.getopt(sys.argv[1:], "w:h:f:s:d:r:m", ["width=", "height=", "framerate=", "spawn_chance=", "die_chance=", "random_seed=", "main_loop"])
- except getopt.GetoptError, e:
- print e
- sys.exit(2)
- width, height, framerate, spawn_chance, die_chance, random_seed, ml = [None] * 7
- for opt, arg in opts:
- if opt in ("--width", "-w"):
- width = int(arg)
- elif opt in ("--height", "-h"):
- height = int(arg)
- elif opt in ("--framerate", "-f"):
- framerate = int(arg)
- elif opt in ("--spawn_chance", "-s"):
- spawn_chance = float(arg)
- elif opt in ("--die_chance", "-d"):
- die_chance = float(arg)
- elif opt in ("--random_seed", "-r"):
- random_seed = int(arg)
- elif opt in ("--main_loop", "-m"):
- ml = True
- resolution = (width or 800, height or 600)
- framerate = framerate or 60
- spawn_chance = 0.2 if spawn_chance is None else spawn_chance
- die_chance = 0.01 if die_chance is None else die_chance
-
- if random_seed:
- random.seed(random_seed)
-
- model = Field(Canvas(resolution), framerate, spawn_chance, die_chance)
-
- sim = Simulator(model)
- sim.setRealTime(True)
- sim.setRealTimeInputFile(None)
- sim.setRealTimePorts({'INTERRUPT': model.INTERRUPT})
- sim.setRealTimePlatformTk(root)
- sim.setDSDEVS(True)
- sim.setClassicDEVS(True)
- sim.setTerminationTime(60)
-
- visualizer = Visualizer(root, resolution, framerate, sim)
- visualizer.title('ParticleInteraction %s' % ml)
-
- import cProfile
- pr = cProfile.Profile()
- pr.enable()
-
- sim.simulate()
- if ml:
- root.mainloop()
- else:
- while True:
- try:
- root.update_idletasks()
- root.update()
- except:
- break
-
- pr.disable()
- pr.print_stats(sort='tottime')
- '''
|