from pypdevs.DEVS import AtomicDEVS, CoupledDEVS from pypdevs.simulator import Simulator from pypdevs.infinity import INFINITY import random, math, sys, getopt import Tkinter as tk from Queue import Empty class Canvas: def __init__(self, resolution): self.width = resolution[0] self.height = resolution[1] class Particle(AtomicDEVS): def __init__(self, particle_id, canvas, framerate, spawn_chance, die_chance): AtomicDEVS.__init__(self, "Particle[%i]" % particle_id) self.particle_id = particle_id self.canvas = canvas self.framerate = framerate self.spawn_chance = spawn_chance self.die_chance = die_chance self.COLLISION_DETECT = self.addInPort("COLLISION_DETECT") self.POS_OUT = self.addOutPort("POS_OUT") self.SPAWNER_COMM = self.addOutPort("SPAWNER_COMM") self.COLOR_OUT = self.addOutPort("COLOR_OUT") self.r = random.uniform(5.0, 30.0) self.vel = {'x': 50 + (random.random() * 25.0), 'y': 50 + (random.random() * 25.0)} x = random.uniform(self.r, self.canvas.width - self.r) y = random.uniform(self.r, self.canvas.height - self.r) self.pos = (x, y) self.curr_state = "initializing" # stateset = ('initializing', 'initialized', 'bouncing', 'spawning', 'selected', 'deleting', 'clicked') self.color = "red" self.prev_color = "red" self.time_passed = 0.0 self.collision_detected = False self.clicked = False self.remaining = 0.0 def timeAdvance(self): if self.curr_state in ("deleting", "clicked"): return INFINITY return self.remaining def outputFnc(self): output = {} if self.curr_state in ("bouncing", "spawning"): output[self.POS_OUT] = (self.particle_id, self.pos, self.r, self.vel) if self.collision_detected and self.state != "spawning" and random.random() < self.spawn_chance: output[self.SPAWNER_COMM] = "spawn_particle" if self.curr_state == "selected" and self.prev_color == self.color: output[self.POS_OUT] = ("delete", self.particle_id) if self.color != self.prev_color: output[self.COLOR_OUT] = (self.particle_id, self.color) if self.curr_state == "initializing": output[self.POS_OUT] = ("created", self.particle_id, self.pos, self.r, self.vel) return output def intTransition(self): if self.curr_state == "initializing": self.curr_state = "initialized" self.remaining = 0.0 elif self.curr_state == "initialized": self.curr_state = "bouncing" self.remaining = self.framerate elif self.curr_state == "selected" and self.prev_color == self.color: self.curr_state = "deleting" self.remaining = 0.0 elif self.curr_state == "selected": self.prev_color = self.color self.remaining = 2.5 elif self.prev_color != self.color: self.prev_color = self.color self.remaining = 0.0 elif self.clicked and self.color == "orange": self.curr_state = "clicked" elif random.random() < self.die_chance: #and not self.curr_state == "bouncing": self.color = "yellow" self.curr_state = "selected" self.remaining = 0.0 else: if self.time_passed < 1 and self.curr_state in ("bouncing", "spawning"): self.time_passed += self.framerate if self.time_passed >= 1: self.color = "black" self.curr_state = "bouncing" self.time_passed = 0 x = self.pos[0] y = self.pos[1] if (x - self.r) <= 0 or x + self.r >= self.canvas.width: self.vel['x'] = -self.vel['x'] if (y - self.r) <= 0 or y + self.r >= self.canvas.height: self.vel['y'] = -self.vel['y'] self.pos = (self.pos[0] + (self.vel['x'] * self.framerate), self.pos[1] + (self.vel['y'] * self.framerate)) if self.collision_detected: self.curr_state = "spawning" self.color = "blue" elif self.clicked: self.color = "orange" self.remaining = self.framerate self.collision_detected = False def extTransition(self, inputs): self.remaining -= self.elapsed if self.COLLISION_DETECT in inputs: if inputs[self.COLLISION_DETECT] == "delete_if_selected": if self.curr_state == "clicked": self.curr_state = "selected" self.remaining = 0.0 elif inputs[self.COLLISION_DETECT] == "clicked": self.clicked = True elif self.curr_state == "bouncing": self.collision_detected = True deltap = inputs[self.COLLISION_DETECT][1] self.vel = {'x': self.vel['x'] + deltap[0], 'y': self.vel['y'] + deltap[1]} def modelTransition(self, passed_values): if self.curr_state == "deleting": passed_values['to_delete'] = self return True elif self.curr_state == "initialized": passed_values['connect_particle'] = (self.particle_id, self.COLLISION_DETECT) return True return False class ParticleSpawner(AtomicDEVS): def __init__(self, canvas, framerate, spawn_chance, die_chance): AtomicDEVS.__init__(self, "ParticleSpawner") self.canvas = canvas self.framerate = framerate self.spawn_chance = spawn_chance self.die_chance = die_chance self.REQUEST = self.addInPort("REQUEST") self.id_ctr = 0 self.new_particle = None self.remaining = 0.0 def timeAdvance(self): return self.remaining def intTransition(self): self.new_particle = Particle(self.id_ctr, self.canvas, self.framerate, self.spawn_chance, self.die_chance) self.id_ctr += 1 self.remaining = 1 def extTransition(self, inputs): self.remaining -= self.elapsed if self.REQUEST in inputs: self.new_particle = Particle(self.id_ctr, self.canvas, self.framerate, self.spawn_chance, self.die_chance) self.id_ctr += 1 def modelTransition(self, passed_values): if self.new_particle: passed_values['new_particle'] = self.new_particle return True return False class PositionManager(AtomicDEVS): def __init__(self, framerate): AtomicDEVS.__init__(self, "PositionManager") self.PARTICLES_OUT = {} self.POS_IN = self.addInPort("POS_IN") self.INTERRUPT = self.addInPort("INTERRUPT") self.positions = {} self.collisions = set([]) self.framerate = framerate self.remaining = self.framerate self.curr_state = "detecting" self.clicked = None self.delete_selected = False def timeAdvance(self): return 0 if self.curr_state == "detected" else self.remaining def outputFnc(self): output = {} if self.collisions: output = {self.PARTICLES_OUT[particle_id]: ("collision_detected", deltap) for (particle_id, deltap) in self.collisions} if self.clicked is not None: output[self.PARTICLES_OUT[self.clicked]] = "clicked" if self.delete_selected: output = {self.PARTICLES_OUT[particle_id]: "delete_if_selected" for particle_id in self.positions} return output def intTransition(self): if self.clicked is not None: self.clicked = None self.remaining = self.framerate elif self.delete_selected: self.delete_selected = False self.remaining = self.framerate elif self.curr_state == "detecting": for k1, v1 in self.positions.iteritems(): for k2, v2 in self.positions.iteritems(): if k1 != k2: dx = v2[0][0] - v1[0][0] dy = v2[0][1] - v1[0][1] distance = math.sqrt(dx**2 + dy**2) if (distance < (v1[1] + v2[1])) and (k1 not in self.collisions or k2 not in self.collisions): u = [dx / distance, dy / distance] vab = {'x': v2[2]['x'] - v1[2]['x'], 'y': v2[2]['y'] - v1[2]['y']} vu_mult = vab['x'] * u[0] + vab['y'] * u[1] vu = [u[0] * vu_mult, u[1] * vu_mult] deltap_mult = 1 deltap = (vu[0] * deltap_mult, vu[1] * deltap_mult) self.collisions.add((k1, deltap)) self.collisions.add((k2, (-deltap[0], -deltap[1]))) self.curr_state = "detected" elif self.curr_state == "detected": self.collisions = set([]) self.curr_state = "detecting" self.remaining = self.framerate def extTransition(self, inputs): if self.POS_IN in inputs: self.remaining -= min(self.remaining, self.elapsed) msg = inputs[self.POS_IN] if msg[0] == "created": new_particle_id = msg[1] self.PARTICLES_OUT[new_particle_id] = self.addOutPort("PARTICLES_OUT[%s]" % new_particle_id) self.positions[new_particle_id] = (msg[2], msg[3], msg[4]) elif msg[0] == "delete": del self.PARTICLES_OUT[msg[1]] del self.positions[msg[1]] else: particle_id = msg[0] r = msg[2] self.positions[particle_id] = (msg[1], msg[2], msg[3]) elif self.INTERRUPT in inputs: msg = inputs[self.INTERRUPT][0] if msg == "delete_selected": self.delete_selected = True else: self.clicked = int(msg) self.remaining = 0.0 class Field(CoupledDEVS): def __init__(self, canvas, framerate, spawn_chance, die_chance): CoupledDEVS.__init__(self, "Field") self.canvas = canvas self.framerate = 1.0 / framerate self.INTERRUPT = self.addInPort("INTERRUPT") self.POS_OUT = self.addOutPort("POS_OUT") self.COLOR_OUT = self.addOutPort("COLOR_OUT") self.particle_spawner = self.addSubModel(ParticleSpawner(self.canvas, self.framerate, spawn_chance, die_chance)) self.position_manager = self.addSubModel(PositionManager(self.framerate)) self.connectPorts(self.INTERRUPT, self.position_manager.INTERRUPT) def modelTransition(self, passed_values): if 'new_particle' in passed_values: new_particle = passed_values['new_particle'] self.addSubModel(new_particle) self.connectPorts(new_particle.POS_OUT, self.position_manager.POS_IN) self.connectPorts(new_particle.POS_OUT, self.POS_OUT) self.connectPorts(new_particle.COLOR_OUT, self.COLOR_OUT) self.connectPorts(new_particle.SPAWNER_COMM, self.particle_spawner.REQUEST) del passed_values['new_particle'] if 'to_delete' in passed_values: self.removeSubModel(passed_values['to_delete']) del passed_values['to_delete'] if 'connect_particle' in passed_values: particle_id, COLLISION_DETECT = passed_values['connect_particle'] self.connectPorts(self.position_manager.PARTICLES_OUT[particle_id], COLLISION_DETECT) del passed_values['connect_particle'] return False class ParticleVisualization: def __init__(self, particle_id, circle_id, middle_id, canvas, sim): self.particle_id = particle_id self.circle_id = circle_id self.middle_id = middle_id self.canvas = canvas self.sim = sim self.canvas.tag_bind(self.circle_id, "", self.on_click) self.canvas.tag_bind(self.middle_id, "", self.on_click) def on_click(self, event): self.sim.realtime_interrupt("INTERRUPT %i" % self.particle_id) # TODO: Model this in SCCD class Visualizer(tk.Toplevel): def __init__(self, resolution, framerate, sim): tk.Toplevel.__init__(self) self.framerate = int((1.0 / framerate) * 1000) self.sim = sim self.geometry('{}x{}'.format(resolution[0], resolution[1])) CANVAS_SIZE_TUPLE = (0, 0, self.winfo_screenwidth(), self.winfo_screenheight()) self.canvas = tk.Canvas(self, relief=tk.RIDGE, scrollregion=CANVAS_SIZE_TUPLE) self.canvas.pack(expand = True, fill=tk.BOTH) self.text = self.canvas.create_text(5, 5, anchor='nw', text="TIME: %s" % 0.0) self.title('ParticleInteraction') self.particles = {} self.protocol("WM_DELETE_WINDOW", lambda: sys.exit(0)) self.bind("", self.delete_pressed) def delete_pressed(self, event): self.sim.realtime_interrupt("INTERRUPT delete_selected") def check_output_pos(self, msgs): for msg_contents in msgs: if msg_contents[0] == "delete": particle_id = msg_contents[1] self.canvas.delete(self.particles[particle_id].circle_id) self.canvas.delete(self.particles[particle_id].middle_id) del self.particles[particle_id] elif msg_contents[0] == "created": pass else: particle_id = msg_contents[0] x = msg_contents[1][0] y = msg_contents[1][1] r = msg_contents[2] if not particle_id in self.particles: circle_id = self.canvas.create_oval(x - r, y - r, x + r, y + r, fill="red") middle_id = self.canvas.create_oval(x - 4, y - 4, x + 4, y + 4, fill="orange") self.particles[particle_id] = ParticleVisualization(particle_id, circle_id, middle_id, self.canvas, self.sim) else: circle_id = self.particles[particle_id].circle_id middle_id = self.particles[particle_id].middle_id curr_pos = self.canvas.coords(circle_id) self.canvas.move(circle_id, x - r - curr_pos[0], y - r - curr_pos[1]) curr_pos_middle = self.canvas.coords(middle_id) self.canvas.move(middle_id, x - 4 - curr_pos_middle[0], y - 4 - curr_pos_middle[1]) def check_output_color(self, msgs): for msg_contents in msgs: particle_id = msg_contents[0] circle_id = self.particles[particle_id].circle_id middle_id = self.particles[particle_id].middle_id color = msg_contents[1] self.canvas.itemconfig(circle_id, fill=color) if __name__ == '__main__': root = tk.Tk() root.withdraw() try: opts, args = getopt.getopt(sys.argv[1:], "w:h:f:s:d:r:", ["width=", "height=", "framerate=", "spawn_chance=", "die_chance=", "random_seed="]) except getopt.GetoptError, e: print e sys.exit(2) width, height, framerate, spawn_chance, die_chance, random_seed = [None] * 6 for opt, arg in opts: if opt in ("--width", "-w"): width = int(arg) elif opt in ("--height", "-h"): height = int(arg) elif opt in ("--framerate", "-f"): framerate = int(arg) elif opt in ("--spawn_chance", "-s"): spawn_chance = float(arg) elif opt in ("--die_chance", "-d"): die_chance = float(arg) elif opt in ("--random_seed", "-r"): random_seed = int(arg) resolution = (width or 800, height or 600) framerate = framerate or 60 spawn_chance = 0.2 if spawn_chance is None else spawn_chance die_chance = 0.01 if die_chance is None else die_chance if random_seed: random.seed(random_seed) model = Field(Canvas(resolution), framerate, spawn_chance, die_chance) sim = Simulator(model) sim.setRealTime(True) sim.setRealTimeInputFile(None) sim.setRealTimePorts({'INTERRUPT': model.INTERRUPT}) sim.setRealTimePlatformTk(root) sim.setDSDEVS(True) sim.setClassicDEVS(True) visualizer = Visualizer(resolution, framerate, sim) sim.setListenPorts(model.POS_OUT, lambda i: visualizer.check_output_pos(i)) sim.setListenPorts(model.COLOR_OUT, lambda i: visualizer.check_output_color(i)) sim.simulate() root.mainloop()