injection.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. # Copyright 2014 Modelling, Simulation and Design Lab (MSDL) at
  2. # McGill University and the University of Antwerp (http://msdl.cs.mcgill.ca/)
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. # Import code for DEVS model representation:
  16. from pypdevs.infinity import *
  17. from pypdevs.DEVS import *
  18. # ====================================================================== #
  19. class TrafficLightMode:
  20. """Encapsulates the system's state
  21. """
  22. ###
  23. def __init__(self, current="red"):
  24. """Constructor (parameterizable).
  25. """
  26. self.set(current)
  27. def set(self, value="red"):
  28. self.__colour=value
  29. def get(self):
  30. return self.__colour
  31. def __str__(self):
  32. return self.get()
  33. class TrafficLight(AtomicDEVS):
  34. """A traffic light
  35. """
  36. ###
  37. def __init__(self, name=None):
  38. """Constructor (parameterizable).
  39. """
  40. # Always call parent class' constructor FIRST:
  41. AtomicDEVS.__init__(self, name)
  42. # STATE:
  43. # Define 'state' attribute (initial sate):
  44. self.state = TrafficLightMode("red")
  45. # PORTS:
  46. # Declare as many input and output ports as desired
  47. # (usually store returned references in local variables):
  48. self.INTERRUPT = self.addInPort(name="INTERRUPT")
  49. self.OBSERVED = self.addOutPort(name="OBSERVED")
  50. ###
  51. def extTransition(self, inputs):
  52. """External Transition Function."""
  53. # Compute the new state 'Snew' based (typically) on current
  54. # State, Elapsed time parameters and calls to 'self.peek(self.IN)'.
  55. #input = self.peek(self.INTERRUPT)
  56. input = inputs[self.INTERRUPT][0]
  57. state = self.state.get()
  58. if input == "toManual":
  59. if state == "manual":
  60. # staying in manual mode
  61. return TrafficLightMode("manual")
  62. if state in ("red", "green", "yellow"):
  63. return TrafficLightMode("manual")
  64. else:
  65. raise DEVSException(\
  66. "unknown state <%s> in TrafficLight external transition function"\
  67. % state)
  68. if input == "toAutonomous":
  69. if state == "manual":
  70. return TrafficLightMode("red")
  71. else:
  72. raise DEVSException(\
  73. "unknown state <%s> in TrafficLight external transition function"\
  74. % state)
  75. raise DEVSException(\
  76. "unknown input <%s> in TrafficLight external transition function"\
  77. % input)
  78. ###
  79. def intTransition(self):
  80. """Internal Transition Function.
  81. """
  82. state = self.state.get()
  83. if state == "red":
  84. return TrafficLightMode("green")
  85. elif state == "green":
  86. return TrafficLightMode("yellow")
  87. elif state == "yellow":
  88. return TrafficLightMode("red")
  89. else:
  90. raise DEVSException(\
  91. "unknown state <%s> in TrafficLight internal transition function"\
  92. % state)
  93. ###
  94. def outputFnc(self):
  95. """Output Funtion.
  96. """
  97. # A colourblind observer sees "grey" instead of "red" or "green".
  98. # BEWARE: ouput is based on the OLD state
  99. # and is produced BEFORE making the transition.
  100. # We'll encode an "observation" of the state the
  101. # system will transition to !
  102. # Send messages (events) to a subset of the atomic-DEVS'
  103. # output ports by means of the 'poke' method, i.e.:
  104. # The content of the messages is based (typically) on current State.
  105. state = self.state.get()
  106. if state == "red":
  107. return {self.OBSERVED: ["grey"]}
  108. elif state == "green":
  109. return {self.OBSERVED: ["yellow"]}
  110. # NOT return {self.OBSERVED: ["grey"]}
  111. elif state == "yellow":
  112. return {self.OBSERVED: ["grey"]}
  113. # NOT return {self.OBSERVED: ["yellow"]}
  114. else:
  115. raise DEVSException(\
  116. "unknown state <%s> in TrafficLight external transition function"\
  117. % state)
  118. ###
  119. def timeAdvance(self):
  120. """Time-Advance Function.
  121. """
  122. # Compute 'ta', the time to the next scheduled internal transition,
  123. # based (typically) on current State.
  124. state = self.state.get()
  125. if state == "red":
  126. return 3
  127. elif state == "green":
  128. return 2
  129. elif state == "yellow":
  130. return 1
  131. elif state == "manual":
  132. return INFINITY
  133. else:
  134. raise DEVSException(\
  135. "unknown state <%s> in TrafficLight time advance transition function"\
  136. % state)
  137. # ====================================================================== #
  138. class TrafficLightSystem(CoupledDEVS):
  139. def __init__(self):
  140. CoupledDEVS.__init__(self, "System")
  141. self.light = self.addSubModel(TrafficLight("Light"))
  142. self.observed = self.addOutPort(name="observed")
  143. self.connectPorts(self.light.OBSERVED, self.observed)
  144. def my_function(event):
  145. print("Observed the following event: " + str(event))
  146. from pypdevs.simulator import Simulator
  147. model = TrafficLightSystem()
  148. sim = Simulator(model)
  149. sim.setRealTime()
  150. sim.setListenPorts(model.observed, my_function)
  151. sim.simulate()
  152. import time
  153. while 1:
  154. time.sleep(0.1)