123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- from pypdevs.DEVS import AtomicDEVS, CoupledDEVS
- from pypdevs.simulator import Simulator
- class Root(CoupledDEVS):
- def __init__(self):
- CoupledDEVS.__init__(self, "Root")
- self.listener_A = self.addInPort("listener_A")
- self.output_A = self.addOutPort("output_A")
- self.mini = self.addSubModel(Mini())
- self.connectPorts(self.listener_A, self.mini.listener_B)
- self.connectPorts(self.mini.output_B, self.output_A)
- class Mini(CoupledDEVS):
- def __init__(self):
- CoupledDEVS.__init__(self, "Mini")
- self.listener_B = self.addInPort("listener_B")
- self.output_B = self.addOutPort("output_B")
- self.model_one = self.addSubModel(Proc("C"))
- self.model_two = self.addSubModel(Proc("D"))
- self.connectPorts(self.listener_B, self.model_one.inport)
- self.connectPorts(self.listener_B, self.model_two.inport)
- self.connectPorts(self.model_one.outport, self.output_B)
- class Proc(AtomicDEVS):
- def __init__(self, name):
- AtomicDEVS.__init__(self, "Proc_%s" % name)
- self.inport = self.addInPort("listener_%s" % name)
- self.outport = self.addOutPort("output_%s" % name)
- self.state = None
- def intTransition(self):
- return None
- def extTransition(self, inputs):
- return inputs[self.inport][0]
- def outputFnc(self):
- return {self.outport: [self.state]}
- def timeAdvance(self):
- return 0.0 if self.state else float('inf')
- model = Root()
- sim = Simulator(model)
- sim.setRealTime(True)
- sim.setRealTimePorts({"input_A": model.listener_A,
- "input_B": model.mini.listener_B,
- "input_C": model.mini.model_one.inport,
- "input_D": model.mini.model_two.inport,
- "output_A": model.output_A,
- "output_B": model.mini.output_B,
- "output_C": model.mini.model_one.outport,
- "output_D": model.mini.model_two.outport})
- sim.setRealTimePlatformThreads()
- def output_on_A(evt):
- global on_A
- on_A = evt[0]
- def output_on_B(evt):
- global on_B
- on_B = evt[0]
- sim.setListenPorts(model.output_A, output_on_A)
- sim.setListenPorts(model.mini.output_B, output_on_B)
- sim.simulate()
- import time
- on_A = None
- on_B = None
- sim.realtime_interrupt("input_A 1")
- time.sleep(1)
- if not (on_A == "1" and on_B == "1"):
- raise Exception("Expected input on A or B output port")
- on_A = None
- on_B = None
- sim.realtime_interrupt("input_B 2")
- time.sleep(1)
- if not (on_A == "2" and on_B == "2"):
- print(on_A)
- print(type(on_A))
- print(on_B)
- print(type(on_B))
- raise Exception("Expected input on A and B output port")
- on_A = None
- on_B = None
- sim.realtime_interrupt("input_C 3")
- time.sleep(1)
- if not (on_A == "3" and on_B == "3"):
- print(on_A)
- print(on_B)
- raise Exception("Expected input on A or B output port")
- on_A = None
- on_B = None
- sim.realtime_interrupt("input_D 4")
- time.sleep(1)
- if not (on_A == None and on_B == None):
- raise Exception("Didn't expect input on A or B output port")
|