123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- from pypdevs.DEVS import AtomicDEVS, CoupledDEVS
- from pypdevs.simulator import Simulator
- from pypdevs.infinity import INFINITY
- import random
- import math
- import os
- import Tkinter as tk
- class Canvas:
- def __init__(self, width, height):
- self.width = width
- self.height = height
- class Ball(AtomicDEVS):
- def __init__(self, ball_id, canvas, framerate):
- AtomicDEVS.__init__(self, "Ball[%i]" % ball_id)
-
- ''' Ports '''
- self.collision_detect = self.addInPort("colision_detect")
- self.position_out = self.addOutPort("position_out")
- self.ball_spawner_comm = self.addOutPort("ball_spawner")
- self.color_out = self.addOutPort("color_out")
-
- ''' Parameters '''
- self.framerate = framerate
- self.canvas = canvas
- self.ball_id = ball_id
-
- ''' State Initialization '''
- self.r = random.uniform(5.0, 20.0)
- self.vel = {'x': random.random() * 5.0 - 1.0, 'y': random.random() * 5.0 - 1.0}
- self.curr_state = "bouncing" # stateset = ('bouncing', 'spawning', 'selected', 'deleting')
- self.color = "red"
- self.prev_color = "red"
- self.pos = None
- self.time_passed = 0
- self.collision_detected = False
- self.remaining = self.framerate
- self.initialized = False
-
- def timeAdvance(self):
- # immediately output updated color
- if self.prev_color != self.color:
- return 0.0
- # framerate or waiting in selected mode
- elif self.curr_state in ("bouncing", "spawning", "selected"):
- return self.remaining
- elif self.curr_state == "deleting":
- return INFINITY
-
- def outputFnc(self):
- output = {}
- if self.pos and self.curr_state in ("bouncing", "spawning"):
- output[self.position_out] = [(self.ball_id, self.pos, self.r)]
- if self.collision_detected and self.state != "spawning":
- output[self.ball_spawner_comm] = ["spawn_ball"]
- if self.curr_state == "selected" and self.prev_color == self.color:
- output[self.position_out] = [("delete", self.ball_id)]
- if self.color != self.prev_color:
- output[self.color_out] = [(self.ball_id, self.color)]
- return output
-
- def intTransition(self):
- # initialize position and velocity
- if not self.initialized:
- x = random.uniform(0, self.canvas.width - (self.r * 2 + 1))
- y = random.uniform(0, self.canvas.height - (self.r * 2 + 1))
- self.pos = (x, y)
- self.initialized = True
- else:
- if random.random() <= 0.008 and not self.curr_state == "selected":
- self.color = "yellow"
- self.curr_state = "selected"
- self.remaining = 2.5
- elif self.curr_state == "selected" and self.prev_color == self.color:
- self.curr_state = "deleting"
- elif self.prev_color != self.color:
- self.prev_color = self.color
- else:
- if self.time_passed < 1 and self.curr_state in ("bouncing", "spawning"):
- self.time_passed += self.framerate
- if self.time_passed >= 1:
- self.color = "black"
- self.curr_state = "bouncing"
- self.time_passed = 0
- x = self.pos[0]
- y = self.pos[1]
- if x <= 0 or x + (self.r * 2) >= self.canvas.width:
- self.vel['x'] = -self.vel['x']
- if y <= 0 or y + (self.r * 2) >= self.canvas.height:
- self.vel['y'] = -self.vel['y']
- self.pos = (self.pos[0] + self.vel['x'], self.pos[1] + self.vel['y'])
- if self.collision_detected:
- self.curr_state = "spawning"
- self.color = "blue"
- self.remaining = 0.03
- self.collision_detected = False
-
- def extTransition(self, inputs):
- self.remaining -= self.elapsed
- if self.collision_detect in inputs and self.curr_state == "bouncing":
- self.collision_detected = True
-
- def modelTransition(self, state):
- if self.curr_state == "deleting":
- state['to_delete'] = self
- return True
-
- class BallSpawner(AtomicDEVS):
- def __init__(self, canvas):
- AtomicDEVS.__init__(self, "BallSpawner")
-
- self.request = self.addInPort("request")
-
- self.canvas = canvas
- self.id_ctr = 0
- self.new_ball = None
- self.framerate = 0.03
-
- def timeAdvance(self):
- return 1
-
- def intTransition(self):
- self.new_ball = Ball(self.id_ctr, self.canvas, self.framerate)
- self.id_ctr += 1
-
- def extTransition(self, inputs):
- if self.request in inputs:
- self.new_ball = Ball(self.id_ctr, self.canvas, self.framerate)
- self.id_ctr += 1
-
- def modelTransition(self, state):
- if self.new_ball:
- state['new_ball'] = self.new_ball
- return True
- return False
- class PositionManager(AtomicDEVS):
- def __init__(self):
- AtomicDEVS.__init__(self, "PositionManager")
-
- self.balls_in = self.addInPort("balls_in")
- self.balls_out = {}
- self.positions_out = self.addOutPort("positions_out")
- self.collisions = set([])
- self.positions = {}
- self.remaining = 0.03
-
- def timeAdvance(self):
- return 0 if self.collisions else self.remaining
-
- def extTransition(self, inputs):
- self.remaining -= self.elapsed
- if self.balls_in in inputs:
- input_bag = inputs[self.balls_in]
- for i in input_bag:
- if i[0] == "delete":
- if i[1] in self.positions:
- del self.positions[i[1]]
- else:
- self.positions[i[0]] = (i[1], i[2])
- for k1, v1 in self.positions.iteritems():
- for k2, v2 in self.positions.iteritems():
- if k1 != k2:
- dx = v1[0][0] - v2[0][0]
- dy = v1[0][1] - v2[0][1]
- distance = math.sqrt(dx**2 + dy**2)
- if distance < v1[1] + v2[1]:
- self.collisions.add(k1)
- self.collisions.add(k2)
-
- def intTransition(self):
- self.collisions = set([])
- self.remaining = 0.03
-
- def outputFnc(self):
- output = {}
- if self.collisions:
- output = {self.balls_out[ball_id]: ["collision_detected"] for ball_id in self.collisions}
- output[self.positions_out] = [self.positions]
- return output
- class MyTracer(AtomicDEVS):
- def __init__(self):
- AtomicDEVS.__init__(self, "MyTracer")
- self.positions_in = self.addInPort("positions_in")
- self.color_in = self.addInPort("color_in")
- self.current_time = 0.0
-
- def timeAdvance(self):
- return INFINITY
-
- def extTransition(self, inputs):
- self.current_time += self.elapsed
- with open('sim_output', 'a') as f:
- f.write('time %f\n' % self.current_time)
- if self.positions_in in inputs:
- f.write('positions\n')
- positions = inputs[self.positions_in][0]
- for k, v in positions.iteritems():
- f.write('%i %f %f %f\n' % (k, v[0][0], v[0][1], v[1]))
- f.write('endpositions\n')
- if self.color_in in inputs:
- f.write('color\n')
- for color in inputs[self.color_in]:
- f.write('%i %s\n' % (color[0], color[1]))
- f.write('endcolor\n')
- class Field(CoupledDEVS):
- def __init__(self, width, height):
- CoupledDEVS.__init__(self, "Field")
-
- self.canvas = Canvas(width, height)
-
- self.ball_spawner = self.addSubModel(BallSpawner(self.canvas))
- self.position_manager = self.addSubModel(PositionManager())
- self.tracer = self.addSubModel(MyTracer())
- self.connectPorts(self.position_manager.positions_out, self.tracer.positions_in)
-
- def modelTransition(self, state):
- if 'new_ball' in state:
- new_ball = state['new_ball']
- self.addSubModel(new_ball)
- pos_mgr_port = self.position_manager.addOutPort("balls_out[%s]" % new_ball.ball_id)
- ''' TODO: can I just change the state like that here? -> Probably move to Ball '''
- self.position_manager.balls_out[new_ball.ball_id] = pos_mgr_port
- self.connectPorts(new_ball.position_out, self.position_manager.balls_in)
- self.connectPorts(pos_mgr_port, new_ball.collision_detect)
- self.connectPorts(new_ball.ball_spawner_comm, self.ball_spawner.request)
- self.connectPorts(new_ball.color_out, self.tracer.color_in)
- del state['new_ball']
- if 'to_delete' in state:
- self.removeSubModel(state['to_delete'])
- del state['to_delete']
- return False
- if __name__ == '__main__':
- try:
- os.remove('sim_output')
- except:
- pass
-
- random.seed(1)
- ''' instantiate DEVS model '''
- model = Field(800, 600)
-
- ''' simulator setup '''
- sim = Simulator(model)
- sim.setDSDEVS(True)
- sim.setTerminationTime(60)
- #sim.setVerbose("myOutputFile")
-
- ''' run simulation + visualization '''
- sim.simulate()
|