experiment_realtime.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. from pypdevs.DEVS import AtomicDEVS, DEVSException
  2. from pypdevs.infinity import INFINITY
  3. from pypdevs.simulator import Simulator
  4. class TrafficLightMode:
  5. def __init__(self, current="red"):
  6. self.set(current)
  7. def set(self, value="red"):
  8. self.__colour=value
  9. def get(self):
  10. return self.__colour
  11. def __str__(self):
  12. return self.get()
  13. class TrafficLight(AtomicDEVS):
  14. def __init__(self, name):
  15. AtomicDEVS.__init__(self, name)
  16. self.state = TrafficLightMode("red")
  17. self.INTERRUPT = self.addInPort(name="INTERRUPT")
  18. self.OBSERVED = self.addOutPort(name="OBSERVED")
  19. def extTransition(self, inputs):
  20. input = inputs[self.INTERRUPT][0]
  21. state = self.state.get()
  22. if input == "toManual":
  23. if state == "manual":
  24. # staying in manual mode
  25. return TrafficLightMode("manual")
  26. if state in ("red", "green", "yellow"):
  27. return TrafficLightMode("manual")
  28. elif input == "toAutonomous":
  29. if state == "manual":
  30. return TrafficLightMode("red")
  31. raise DEVSException("Unkown input in TrafficLight")
  32. def intTransition(self):
  33. state = self.state.get()
  34. if state == "red":
  35. return TrafficLightMode("green")
  36. elif state == "green":
  37. return TrafficLightMode("yellow")
  38. elif state == "yellow":
  39. return TrafficLightMode("red")
  40. else:
  41. raise DEVSException("Unkown state in TrafficLight")
  42. def outputFnc(self):
  43. state = self.state.get()
  44. if state == "red":
  45. return {self.OBSERVED: ["grey"]}
  46. elif state == "green":
  47. return {self.OBSERVED: ["yellow"]}
  48. elif state == "yellow":
  49. return {self.OBSERVED: ["grey"]}
  50. else:
  51. raise DEVSException("Unknown state in TrafficLight")
  52. def timeAdvance(self):
  53. state = self.state.get()
  54. if state == "red":
  55. return 60
  56. elif state == "green":
  57. return 50
  58. elif state == "yellow":
  59. return 10
  60. elif state == "manual":
  61. return INFINITY
  62. else:
  63. raise DEVSException("Unknown state in TrafficLight")
  64. model = TrafficLight(name="trafficLight")
  65. refs = {"INTERRUPT": model.INTERRUPT}
  66. sim = Simulator(model)
  67. sim.setRealTime(True)
  68. sim.setRealTimeInputFile(None)
  69. sim.setRealTimePorts(refs)
  70. sim.setVerbose(None)
  71. sim.setRealTimePlatformThreads()
  72. sim.simulate()
  73. while 1:
  74. sim.realtime_interrupt(input('Enter some input:'))
  75. # INTERRUPT toManual
  76. # INTERRUPT toAutonomous