from DEVS import * from infinity import INFINITY import random, math, sys, getopt class Canvas: def __init__(self, resolution): self.width = resolution[0] self.height = resolution[1] class ParticleState: def __init__(self, canvas): self.r = random.uniform(5.0, 30.0) x = random.uniform(self.r, canvas.width - self.r) y = random.uniform(self.r, canvas.height - self.r) self.pos = (x, y) self.vel = {'x': 50 + (random.random() * 25.0), 'y': 50 + (random.random() * 25.0)} self.phase = "initializing" # phases: ('initializing', 'initialized', 'bouncing', 'spawning', 'selected', 'deleting', 'clicked') self.color = "red" self.prev_color = "red" self.collision_detected = False self.clicked = False self.frames_passed = 0 self.frames_remaining = 0 self.remaining = 0 class Particle(AtomicDEVS): def __init__(self, particle_id, canvas, framerate, spawn_chance, die_chance): AtomicDEVS.__init__(self, "Particle[%i]" % particle_id) self.particle_id = particle_id self.canvas = canvas self.framerate = framerate self.spawn_chance = spawn_chance self.die_chance = die_chance self.COLLISION_DETECT = self.addInPort("COLLISION_DETECT") self.POS_OUT = self.addOutPort("POS_OUT") self.SPAWNER_COMM = self.addOutPort("SPAWNER_COMM") self.COLOR_OUT = self.addOutPort("COLOR_OUT") self.state = ParticleState(canvas) def timeAdvance(self): return self.state.remaining def outputFnc(self): output = {} if self.state.phase in ("bouncing", "spawning", "selected") or self.state.clicked: output[self.POS_OUT] = (self.particle_id, self.state.pos, self.state.r, self.state.vel) if self.state.collision_detected and self.state.phase != "spawning" and random.random() < self.spawn_chance: output[self.SPAWNER_COMM] = "spawn_particle" if self.state.phase == "selected" and self.state.prev_color == self.state.color: output[self.POS_OUT] = ("delete", self.particle_id) if self.state.color != self.state.prev_color: output[self.COLOR_OUT] = (self.particle_id, self.state.color) if self.state.phase == "initializing": output[self.POS_OUT] = ("created", self.particle_id, self.state.pos, self.state.r, self.state.vel) return output def intTransition(self): if self.state.phase == "initializing": self.state.phase = "initialized" self.state.remaining = 0 elif self.state.phase == "initialized": self.state.phase = "bouncing" self.state.remaining = 1 elif self.state.phase == "selected": if self.state.prev_color == self.state.color: self.state.phase = "deleting" self.state.remaining = INFINITY else: self.state.prev_color = self.state.color self.state.remaining = int(2.5 * self.framerate) elif self.state.prev_color != self.state.color: self.state.prev_color = self.state.color self.state.remaining = 0 elif self.state.phase == "clicked": self.state.remaining = INFINITY elif random.random() < self.die_chance: self.state.vel = {'x': 0, 'y': 0} self.state.color = "yellow" self.state.phase = "selected" self.state.remaining = 0 else: if self.state.frames_passed < self.framerate and self.state.phase in ("bouncing", "spawning") and self.state.color != "black": self.state.frames_passed += 1 if self.state.frames_passed >= self.framerate and self.state.phase == "bouncing": self.state.color = "black" self.state.phase = "bouncing" self.state.frames_passed = 0 if self.state.frames_passed >= 1 * self.framerate and self.state.phase == "spawning": self.state.color = "black" self.state.phase = "bouncing" self.state.frames_passed = 0 x = self.state.pos[0] y = self.state.pos[1] if (x - self.state.r) <= 0 or x + self.state.r >= self.canvas.width: self.state.vel['x'] = -self.state.vel['x'] if (y - self.state.r) <= 0 or y + self.state.r >= self.canvas.height: self.state.vel['y'] = -self.state.vel['y'] self.state.pos = (self.state.pos[0] + (self.state.vel['x'] / self.framerate), self.state.pos[1] + (self.state.vel['y'] / self.framerate)) if self.state.collision_detected: self.state.phase = "spawning" self.state.color = "blue" elif self.state.clicked: self.state.vel = {'x': 0, 'y': 0} self.state.phase = "clicked" self.state.color = "orange" self.state.clicked = False self.state.remaining = 1 self.state.collision_detected = False return self.state def extTransition(self, inputs): if self.COLLISION_DETECT in inputs: if inputs[self.COLLISION_DETECT] == "delete_if_selected": if self.state.phase == "clicked": self.state.phase = "selected" self.state.remaining = 0 elif inputs[self.COLLISION_DETECT] == "clicked": if self.state.phase == "bouncing": self.state.clicked = True self.state.remaining = 0 elif self.state.phase in ("bouncing", "clicked", "deleting"): self.state.phase = "bouncing" self.state.collision_detected = True new_speed = inputs[self.COLLISION_DETECT][1] self.state.vel = {'x': new_speed[0], 'y': new_speed[1]} self.state.remaining = 0 else: self.state.remaining -= self.elapsed return self.state def modelTransition(self, passed_values): if self.state.phase == "deleting": passed_values['to_delete'] = self return True elif self.state.phase == "initialized": passed_values['connect_particle'] = (self.particle_id, self.COLLISION_DETECT) return True return False class ParticleSpawnerState: def __init__(self): self.id_ctr = 0 self.new_particle = None self.remaining = 0 class ParticleSpawner(AtomicDEVS): def __init__(self, canvas, framerate, spawn_chance, die_chance): AtomicDEVS.__init__(self, "ParticleSpawner") self.canvas = canvas self.framerate = framerate self.spawn_chance = spawn_chance self.die_chance = die_chance self.REQUEST = self.addInPort("REQUEST") self.state = ParticleSpawnerState() def timeAdvance(self): return self.state.remaining def intTransition(self): self.state.new_particle = Particle(self.state.id_ctr, self.canvas, self.framerate, self.spawn_chance, self.die_chance) self.state.id_ctr += 1 self.state.remaining = self.framerate return self.state def extTransition(self, inputs): self.state.remaining -= self.elapsed if self.REQUEST in inputs: self.state.new_particle = Particle(self.state.id_ctr, self.canvas, self.framerate, self.spawn_chance, self.die_chance) self.state.id_ctr += 1 return self.state def modelTransition(self, passed_values): if self.state.new_particle: passed_values['new_particle'] = self.state.new_particle return True return False class PositionManagerState: def __init__(self): self.positions = {} self.collisions = set([]) self.phase = "detecting" self.clicked = None self.delete_selected = False self.frames = 0 self.remaining = 0 # self.PARTICLES_OUT = {} def __str__(self): return str((self.phase, self.frames, self.remaining, len(self.PARTICLES_OUT))) ''' def __getstate__(self): odict = self.__dict__.copy() del odict['PARTICLES_OUT'] return odict def __setstate__(self, dict): self.__dict__.update(dict) self.PARTICLES_OUT = {port.host_DEVS.particle_id: port for port in self.OPorts if port.getName().startswith('PARTICLES_OUT')} ''' class PositionManager(AtomicDEVS): def __init__(self, framerate): AtomicDEVS.__init__(self, "PositionManager") self.framerate = framerate self.TIME_OUT = self.addOutPort("TIME_OUT") self.POS_IN = self.addInPort("POS_IN") self.INTERRUPT = self.addInPort("INTERRUPT") self.state = PositionManagerState() def timeAdvance(self): return self.state.remaining def outputFnc(self): output = {} if self.state.collisions: #output = {self.state.PARTICLES_OUT[particle_id]: ("collision_detected", deltap) for (particle_id, deltap) in self.state.collisions} output = {self.ports['PARTICLES_OUT[%i]' % particle_id]: ("collision_detected", deltap) for (particle_id, deltap) in self.state.collisions} if self.state.clicked is not None: #if self.state.clicked in self.state.PARTICLES_OUT: #output[self.state.PARTICLES_OUT[self.state.clicked]] = "clicked" portname = 'PARTICLES_OUT[%i]' % particle_id if portname in self.ports: output[self.ports[portname]] = "clicked" if self.state.delete_selected: #output = {self.state.PARTICLES_OUT[particle_id]: "delete_if_selected" for particle_id in self.state.positions} output = {self.ports['PARTICLES_OUT[%i]']: "delete_if_selected" for particle_id in self.state.positions} output[self.TIME_OUT] = self.state.frames return output def intTransition(self): self.state.frames += self.timeAdvance() if self.state.clicked is not None: self.state.clicked = None self.state.remaining = 1 elif self.state.delete_selected: self.state.delete_selected = False self.state.remaining = 1 elif self.state.phase == "detecting": for k1, v1 in self.state.positions.iteritems(): for k2, v2 in self.state.positions.iteritems(): if k1 != k2: dx = v2[0][0] - v1[0][0] dy = v2[0][1] - v1[0][1] distance = math.sqrt(dx**2 + dy**2) if (distance < (v1[1] + v2[1])) and (k1 not in self.state.collisions or k2 not in self.state.collisions): ''' u = [dx / distance, dy / distance] vab = {'x': v2[2]['x'] - v1[2]['x'], 'y': v2[2]['y'] - v1[2]['y']} vu_mult = vab['x'] * u[0] + vab['y'] * u[1] vu = [u[0] * vu_mult, u[1] * vu_mult] deltap_mult = 1 deltap = (vu[0] * deltap_mult, vu[1] * deltap_mult) self.state.collisions.add((k1, deltap)) self.state.collisions.add((k2, (-deltap[0], -deltap[1]))) ''' new_speed_1 = (v2[2]['x'], v2[2]['y']) new_speed_2 = (v1[2]['x'], v1[2]['y']) self.state.collisions.add((k1, new_speed_1)) self.state.collisions.add((k2, new_speed_2)) self.state.phase = "detected" self.state.remaining = 0 elif self.state.phase == "detected": self.state.collisions = set([]) self.state.phase = "detecting" self.state.remaining = 1 return self.state def extTransition(self, inputs): self.state.remaining -= self.elapsed self.state.frames += self.elapsed if self.POS_IN in inputs: msg = inputs[self.POS_IN] if msg[0] == "created": new_particle_id = msg[1] #self.state.PARTICLES_OUT[new_particle_id] = self.addOutPort("PARTICLES_OUT[%s]" % new_particle_id) self.addOutPort("PARTICLES_OUT[%s]" % new_particle_id) self.state.positions[new_particle_id] = (msg[2], msg[3], msg[4]) elif msg[0] == "delete": #del self.state.PARTICLES_OUT[msg[1]] del self.state.positions[msg[1]] else: particle_id = msg[0] r = msg[2] self.state.positions[particle_id] = (msg[1], msg[2], msg[3]) # TODO: Fix this, needs to be rounded to integer. elif self.INTERRUPT in inputs: msg = inputs[self.INTERRUPT][0] if msg == "delete_selected": self.state.delete_selected = True else: self.state.clicked = int(msg) return self.state class Root(CoupledDEVS): def __init__(self, canvas=Canvas((800, 600)), framerate=30.0, spawn_chance=0.2, die_chance=0.01): CoupledDEVS.__init__(self, "Field") self.INTERRUPT = self.addInPort("INTERRUPT") self.POS_OUT = self.addOutPort("POS_OUT") self.COLOR_OUT = self.addOutPort("COLOR_OUT") self.TIME_OUT = self.addOutPort("TIME_OUT") self.particle_spawner = self.addSubModel(ParticleSpawner(canvas, framerate, spawn_chance, die_chance)) self.position_manager = self.addSubModel(PositionManager(framerate)) self.connectPorts(self.INTERRUPT, self.position_manager.INTERRUPT) self.connectPorts(self.position_manager.TIME_OUT, self.TIME_OUT) def modelTransition(self, passed_values): if 'new_particle' in passed_values: new_particle = passed_values['new_particle'] self.addSubModel(new_particle) self.connectPorts(new_particle.POS_OUT, self.position_manager.POS_IN) self.connectPorts(new_particle.POS_OUT, self.POS_OUT) self.connectPorts(new_particle.COLOR_OUT, self.COLOR_OUT) self.connectPorts(new_particle.SPAWNER_COMM, self.particle_spawner.REQUEST) del passed_values['new_particle'] if 'to_delete' in passed_values: self.removeSubModel(passed_values['to_delete']) del passed_values['to_delete'] if 'connect_particle' in passed_values: particle_id, COLLISION_DETECT = passed_values['connect_particle'] #self.connectPorts(self.position_manager.state.PARTICLES_OUT[particle_id], COLLISION_DETECT) self.connectPorts(self.position_manager.ports['PARTICLES_OUT[%s]' % particle_id], COLLISION_DETECT) del passed_values['connect_particle'] return False def select(self, imm_children): pos_mgr, particle_spawner = [None] * 2 for c in imm_children: if c.getModelName() == "PositionManager": pos_mgr = c elif c.getModelName() == "ParticleSpawner": particle_spawner = c if particle_spawner: return particle_spawner elif pos_mgr: return imm_children[0] #pos_mgr else: return imm_children[0] ''' class ParticleVisualization: def __init__(self, particle_id, circle_id, middle_id, canvas, sim): self.particle_id = particle_id self.circle_id = circle_id self.middle_id = middle_id self.canvas = canvas self.sim = sim self.canvas.tag_bind(self.circle_id, "", self.on_click) self.canvas.tag_bind(self.middle_id, "", self.on_click) def on_click(self, event): self.sim.realtime_interrupt("INTERRUPT %i" % self.particle_id) # TODO: Model this in SCCD class Visualizer(tk.Toplevel): def __init__(self, root, resolution, framerate, sim): tk.Toplevel.__init__(self) self.framerate = int((1.0 / framerate) * 1000) self.sim = sim self.root = root self.geometry('{}x{}'.format(resolution[0], resolution[1])) CANVAS_SIZE_TUPLE = (0, 0, self.winfo_screenwidth(), self.winfo_screenheight()) self.canvas = tk.Canvas(self, relief=tk.RIDGE, scrollregion=CANVAS_SIZE_TUPLE) self.canvas.pack(expand = True, fill=tk.BOTH) self.text = self.canvas.create_text(5, 5, anchor='nw', text="TIME: %s" % 0.0) self.particles = {} self.protocol("WM_DELETE_WINDOW", lambda: self.root.destroy()) self.bind("", self.delete_pressed) self.after(self.framerate, self.check_output) def delete_pressed(self, event): self.sim.realtime_interrupt("INTERRUPT delete_selected") def check_output(self): while True: try: msg = self.sim.model.out_msg_queue.get(False) self.canvas.itemconfig(self.text, text=msg[0][0]) port = msg[1] if port == self.sim.model.POS_OUT: msg_contents = msg[2][0] if msg_contents[0] == "delete": particle_id = msg_contents[1] self.canvas.delete(self.particles[particle_id].circle_id) self.canvas.delete(self.particles[particle_id].middle_id) del self.particles[particle_id] elif msg_contents[0] == "created": pass else: particle_id = msg_contents[0] x = msg_contents[1][0] y = msg_contents[1][1] r = msg_contents[2] if not particle_id in self.particles: circle_id = self.canvas.create_oval(x - r, y - r, x + r, y + r, fill="red") middle_id = self.canvas.create_oval(x - 4, y - 4, x + 4, y + 4, fill="orange") self.particles[particle_id] = ParticleVisualization(particle_id, circle_id, middle_id, self.canvas, self.sim) else: circle_id = self.particles[particle_id].circle_id middle_id = self.particles[particle_id].middle_id curr_pos = self.canvas.coords(circle_id) self.canvas.move(circle_id, x - r - curr_pos[0], y - r - curr_pos[1]) curr_pos_middle = self.canvas.coords(middle_id) self.canvas.move(middle_id, x - 4 - curr_pos_middle[0], y - 4 - curr_pos_middle[1]) elif port == self.sim.model.COLOR_OUT: msg_contents = msg[2][0] particle_id = msg_contents[0] circle_id = self.particles[particle_id].circle_id middle_id = self.particles[particle_id].middle_id color = msg_contents[1] self.canvas.itemconfig(circle_id, fill=color) except Empty: break self.after(self.framerate, self.check_output) if __name__ == '__main__': root = tk.Tk() root.withdraw() try: opts, args = getopt.getopt(sys.argv[1:], "w:h:f:s:d:r:m", ["width=", "height=", "framerate=", "spawn_chance=", "die_chance=", "random_seed=", "main_loop"]) except getopt.GetoptError, e: print e sys.exit(2) width, height, framerate, spawn_chance, die_chance, random_seed, ml = [None] * 7 for opt, arg in opts: if opt in ("--width", "-w"): width = int(arg) elif opt in ("--height", "-h"): height = int(arg) elif opt in ("--framerate", "-f"): framerate = int(arg) elif opt in ("--spawn_chance", "-s"): spawn_chance = float(arg) elif opt in ("--die_chance", "-d"): die_chance = float(arg) elif opt in ("--random_seed", "-r"): random_seed = int(arg) elif opt in ("--main_loop", "-m"): ml = True resolution = (width or 800, height or 600) framerate = framerate or 60 spawn_chance = 0.2 if spawn_chance is None else spawn_chance die_chance = 0.01 if die_chance is None else die_chance if random_seed: random.seed(random_seed) model = Field(Canvas(resolution), framerate, spawn_chance, die_chance) sim = Simulator(model) sim.setRealTime(True) sim.setRealTimeInputFile(None) sim.setRealTimePorts({'INTERRUPT': model.INTERRUPT}) sim.setRealTimePlatformTk(root) sim.setDSDEVS(True) sim.setClassicDEVS(True) sim.setTerminationTime(60) visualizer = Visualizer(root, resolution, framerate, sim) visualizer.title('ParticleInteraction %s' % ml) import cProfile pr = cProfile.Profile() pr.enable() sim.simulate() if ml: root.mainloop() else: while True: try: root.update_idletasks() root.update() except: break pr.disable() pr.print_stats(sort='tottime') '''