injecting.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. from pypdevs.DEVS import AtomicDEVS, CoupledDEVS
  2. from pypdevs.simulator import Simulator
  3. class Root(CoupledDEVS):
  4. def __init__(self):
  5. CoupledDEVS.__init__(self, "Root")
  6. self.listener_A = self.addInPort("listener_A")
  7. self.output_A = self.addOutPort("output_A")
  8. self.mini = self.addSubModel(Mini())
  9. self.connectPorts(self.listener_A, self.mini.listener_B)
  10. self.connectPorts(self.mini.output_B, self.output_A)
  11. class Mini(CoupledDEVS):
  12. def __init__(self):
  13. CoupledDEVS.__init__(self, "Mini")
  14. self.listener_B = self.addInPort("listener_B")
  15. self.output_B = self.addOutPort("output_B")
  16. self.model_one = self.addSubModel(Proc("C"))
  17. self.model_two = self.addSubModel(Proc("D"))
  18. self.connectPorts(self.listener_B, self.model_one.inport)
  19. self.connectPorts(self.listener_B, self.model_two.inport)
  20. self.connectPorts(self.model_one.outport, self.output_B)
  21. class Proc(AtomicDEVS):
  22. def __init__(self, name):
  23. AtomicDEVS.__init__(self, "Proc_%s" % name)
  24. self.inport = self.addInPort("listener_%s" % name)
  25. self.outport = self.addOutPort("output_%s" % name)
  26. self.state = None
  27. def intTransition(self):
  28. return None
  29. def extTransition(self, inputs):
  30. return inputs[self.inport][0]
  31. def outputFnc(self):
  32. return {self.outport: [self.state]}
  33. def timeAdvance(self):
  34. return 0.0 if self.state else float('inf')
  35. model = Root()
  36. sim = Simulator(model)
  37. sim.setRealTime(True)
  38. sim.setRealTimePorts({"input_A": model.listener_A,
  39. "input_B": model.mini.listener_B,
  40. "input_C": model.mini.model_one.inport,
  41. "input_D": model.mini.model_two.inport,
  42. "output_A": model.output_A,
  43. "output_B": model.mini.output_B,
  44. "output_C": model.mini.model_one.outport,
  45. "output_D": model.mini.model_two.outport})
  46. sim.setRealTimePlatformThreads()
  47. def output_on_A(evt):
  48. global on_A
  49. on_A = evt[0]
  50. def output_on_B(evt):
  51. global on_B
  52. on_B = evt[0]
  53. sim.setListenPorts(model.output_A, output_on_A)
  54. sim.setListenPorts(model.mini.output_B, output_on_B)
  55. sim.simulate()
  56. import time
  57. on_A = None
  58. on_B = None
  59. sim.realtime_interrupt("input_A 1")
  60. time.sleep(1)
  61. if not (on_A == "1" and on_B == "1"):
  62. raise Exception("Expected input on A or B output port")
  63. on_A = None
  64. on_B = None
  65. sim.realtime_interrupt("input_B 2")
  66. time.sleep(1)
  67. if not (on_A == "2" and on_B == "2"):
  68. print(on_A)
  69. print(type(on_A))
  70. print(on_B)
  71. print(type(on_B))
  72. raise Exception("Expected input on A and B output port")
  73. on_A = None
  74. on_B = None
  75. sim.realtime_interrupt("input_C 3")
  76. time.sleep(1)
  77. if not (on_A == "3" and on_B == "3"):
  78. print(on_A)
  79. print(on_B)
  80. raise Exception("Expected input on A or B output port")
  81. on_A = None
  82. on_B = None
  83. sim.realtime_interrupt("input_D 4")
  84. time.sleep(1)
  85. if not (on_A == None and on_B == None):
  86. raise Exception("Didn't expect input on A or B output port")