from pypdevs.DEVS import AtomicDEVS, CoupledDEVS from pypdevs.simulator import Simulator from pypdevs.infinity import INFINITY import random import math import os import Tkinter as tk class Canvas: def __init__(self, width, height): self.width = width self.height = height class Ball(AtomicDEVS): def __init__(self, ball_id, canvas, framerate): AtomicDEVS.__init__(self, "Ball[%i]" % ball_id) ''' Ports ''' self.collision_detect = self.addInPort("colision_detect") self.position_out = self.addOutPort("position_out") self.ball_spawner_comm = self.addOutPort("ball_spawner") self.color_out = self.addOutPort("color_out") ''' Parameters ''' self.framerate = framerate self.canvas = canvas self.ball_id = ball_id ''' State Initialization ''' self.r = random.uniform(5.0, 20.0) self.vel = {'x': random.random() * 5.0 - 1.0, 'y': random.random() * 5.0 - 1.0} self.curr_state = "bouncing" # stateset = ('bouncing', 'spawning', 'selected', 'deleting') self.color = "red" self.prev_color = "red" self.pos = None self.time_passed = 0 self.collision_detected = False self.remaining = self.framerate self.initialized = False def timeAdvance(self): # immediately output updated color if self.prev_color != self.color: return 0.0 # framerate or waiting in selected mode elif self.curr_state in ("bouncing", "spawning", "selected"): return self.remaining elif self.curr_state == "deleting": return INFINITY def outputFnc(self): output = {} if self.pos and self.curr_state in ("bouncing", "spawning"): output[self.position_out] = [(self.ball_id, self.pos, self.r)] if self.collision_detected and self.state != "spawning": output[self.ball_spawner_comm] = ["spawn_ball"] if self.curr_state == "selected" and self.prev_color == self.color: output[self.position_out] = [("delete", self.ball_id)] if self.color != self.prev_color: output[self.color_out] = [(self.ball_id, self.color)] return output def intTransition(self): # initialize position and velocity if not self.initialized: x = random.uniform(0, self.canvas.width - (self.r * 2 + 1)) y = random.uniform(0, self.canvas.height - (self.r * 2 + 1)) self.pos = (x, y) self.initialized = True else: if random.random() <= 0.008 and not self.curr_state == "selected": self.color = "yellow" self.curr_state = "selected" self.remaining = 2.5 elif self.curr_state == "selected" and self.prev_color == self.color: self.curr_state = "deleting" elif self.prev_color != self.color: self.prev_color = self.color else: if self.time_passed < 1 and self.curr_state in ("bouncing", "spawning"): self.time_passed += self.framerate if self.time_passed >= 1: self.color = "black" self.curr_state = "bouncing" self.time_passed = 0 x = self.pos[0] y = self.pos[1] if x <= 0 or x + (self.r * 2) >= self.canvas.width: self.vel['x'] = -self.vel['x'] if y <= 0 or y + (self.r * 2) >= self.canvas.height: self.vel['y'] = -self.vel['y'] self.pos = (self.pos[0] + self.vel['x'], self.pos[1] + self.vel['y']) if self.collision_detected: self.curr_state = "spawning" self.color = "blue" self.remaining = 0.03 self.collision_detected = False def extTransition(self, inputs): self.remaining -= self.elapsed if self.collision_detect in inputs and self.curr_state == "bouncing": self.collision_detected = True def modelTransition(self, state): if self.curr_state == "deleting": state['to_delete'] = self return True class BallSpawner(AtomicDEVS): def __init__(self, canvas): AtomicDEVS.__init__(self, "BallSpawner") self.request = self.addInPort("request") self.canvas = canvas self.id_ctr = 0 self.new_ball = None self.framerate = 0.03 def timeAdvance(self): return 1 def intTransition(self): self.new_ball = Ball(self.id_ctr, self.canvas, self.framerate) self.id_ctr += 1 def extTransition(self, inputs): if self.request in inputs: self.new_ball = Ball(self.id_ctr, self.canvas, self.framerate) self.id_ctr += 1 def modelTransition(self, state): if self.new_ball: state['new_ball'] = self.new_ball return True return False class PositionManager(AtomicDEVS): def __init__(self): AtomicDEVS.__init__(self, "PositionManager") self.balls_in = self.addInPort("balls_in") self.balls_out = {} self.positions_out = self.addOutPort("positions_out") self.collisions = set([]) self.positions = {} self.remaining = 0.03 def timeAdvance(self): return 0 if self.collisions else self.remaining def extTransition(self, inputs): self.remaining -= self.elapsed if self.balls_in in inputs: input_bag = inputs[self.balls_in] for i in input_bag: if i[0] == "delete": if i[1] in self.positions: del self.positions[i[1]] else: self.positions[i[0]] = (i[1], i[2]) for k1, v1 in self.positions.iteritems(): for k2, v2 in self.positions.iteritems(): if k1 != k2: dx = v1[0][0] - v2[0][0] dy = v1[0][1] - v2[0][1] distance = math.sqrt(dx**2 + dy**2) if distance < v1[1] + v2[1]: self.collisions.add(k1) self.collisions.add(k2) def intTransition(self): self.collisions = set([]) self.remaining = 0.03 def outputFnc(self): output = {} if self.collisions: output = {self.balls_out[ball_id]: ["collision_detected"] for ball_id in self.collisions} output[self.positions_out] = [self.positions] return output class MyTracer(AtomicDEVS): def __init__(self): AtomicDEVS.__init__(self, "MyTracer") self.positions_in = self.addInPort("positions_in") self.color_in = self.addInPort("color_in") self.current_time = 0.0 def timeAdvance(self): return INFINITY def extTransition(self, inputs): self.current_time += self.elapsed with open('sim_output', 'a') as f: f.write('time %f\n' % self.current_time) if self.positions_in in inputs: f.write('positions\n') positions = inputs[self.positions_in][0] for k, v in positions.iteritems(): f.write('%i %f %f %f\n' % (k, v[0][0], v[0][1], v[1])) f.write('endpositions\n') if self.color_in in inputs: f.write('color\n') for color in inputs[self.color_in]: f.write('%i %s\n' % (color[0], color[1])) f.write('endcolor\n') class Field(CoupledDEVS): def __init__(self, width, height): CoupledDEVS.__init__(self, "Field") self.canvas = Canvas(width, height) self.ball_spawner = self.addSubModel(BallSpawner(self.canvas)) self.position_manager = self.addSubModel(PositionManager()) self.tracer = self.addSubModel(MyTracer()) self.connectPorts(self.position_manager.positions_out, self.tracer.positions_in) def modelTransition(self, state): if 'new_ball' in state: new_ball = state['new_ball'] self.addSubModel(new_ball) pos_mgr_port = self.position_manager.addOutPort("balls_out[%s]" % new_ball.ball_id) ''' TODO: can I just change the state like that here? -> Probably move to Ball ''' self.position_manager.balls_out[new_ball.ball_id] = pos_mgr_port self.connectPorts(new_ball.position_out, self.position_manager.balls_in) self.connectPorts(pos_mgr_port, new_ball.collision_detect) self.connectPorts(new_ball.ball_spawner_comm, self.ball_spawner.request) self.connectPorts(new_ball.color_out, self.tracer.color_in) del state['new_ball'] if 'to_delete' in state: self.removeSubModel(state['to_delete']) del state['to_delete'] return False if __name__ == '__main__': try: os.remove('sim_output') except: pass random.seed(1) ''' instantiate DEVS model ''' model = Field(800, 600) ''' simulator setup ''' sim = Simulator(model) sim.setDSDEVS(True) sim.setTerminationTime(60) #sim.setVerbose("myOutputFile") ''' run simulation + visualization ''' sim.simulate()