trafficLightModel.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. # Copyright 2014 Modelling, Simulation and Design Lab (MSDL) at
  2. # McGill University and the University of Antwerp (http://msdl.cs.mcgill.ca/)
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. # Import code for DEVS model representation:
  16. from pypdevs.infinity import *
  17. from pypdevs.DEVS import *
  18. # ====================================================================== #
  19. class TrafficLightMode:
  20. """Encapsulates the system's state
  21. """
  22. ###
  23. def __init__(self, current="red"):
  24. """Constructor (parameterizable).
  25. """
  26. self.set(current)
  27. def set(self, value="red"):
  28. self.__colour=value
  29. def get(self):
  30. return self.__colour
  31. def __str__(self):
  32. return self.get()
  33. class TrafficLight(AtomicDEVS):
  34. """A traffic light
  35. """
  36. ###
  37. def __init__(self, name=None):
  38. """Constructor (parameterizable).
  39. """
  40. # Always call parent class' constructor FIRST:
  41. AtomicDEVS.__init__(self, name)
  42. # STATE:
  43. # Define 'state' attribute (initial sate):
  44. self.state = TrafficLightMode("red")
  45. # PORTS:
  46. # Declare as many input and output ports as desired
  47. # (usually store returned references in local variables):
  48. self.INTERRUPT = self.addInPort(name="INTERRUPT")
  49. self.OBSERVED = self.addOutPort(name="OBSERVED")
  50. ###
  51. def extTransition(self, inputs):
  52. """External Transition Function."""
  53. # Compute the new state 'Snew' based (typically) on current
  54. # State, Elapsed time parameters and calls to 'self.peek(self.IN)'.
  55. #input = self.peek(self.INTERRUPT)
  56. input = inputs[self.INTERRUPT][0]
  57. state = self.state.get()
  58. if input == "toManual":
  59. if state == "manual":
  60. # staying in manual mode
  61. return TrafficLightMode("manual")
  62. if state in ("red", "green", "yellow"):
  63. return TrafficLightMode("manual")
  64. else:
  65. raise DEVSException(\
  66. "unknown state <%s> in TrafficLight external transition function"\
  67. % state)
  68. if input == "toAutonomous":
  69. if state == "manual":
  70. return TrafficLightMode("red")
  71. else:
  72. raise DEVSException(\
  73. "unknown state <%s> in TrafficLight external transition function"\
  74. % state)
  75. raise DEVSException(\
  76. "unknown input <%s> in TrafficLight external transition function"\
  77. % input)
  78. ###
  79. def intTransition(self):
  80. """Internal Transition Function.
  81. """
  82. state = self.state.get()
  83. if state == "red":
  84. return TrafficLightMode("green")
  85. elif state == "green":
  86. return TrafficLightMode("yellow")
  87. elif state == "yellow":
  88. return TrafficLightMode("red")
  89. else:
  90. raise DEVSException(\
  91. "unknown state <%s> in TrafficLight internal transition function"\
  92. % state)
  93. ###
  94. def outputFnc(self):
  95. """Output Funtion.
  96. """
  97. # A colourblind observer sees "grey" instead of "red" or "green".
  98. # BEWARE: ouput is based on the OLD state
  99. # and is produced BEFORE making the transition.
  100. # We'll encode an "observation" of the state the
  101. # system will transition to !
  102. # Send messages (events) to a subset of the atomic-DEVS'
  103. # output ports by means of the 'poke' method, i.e.:
  104. # The content of the messages is based (typically) on current State.
  105. state = self.state.get()
  106. if state == "red":
  107. return {self.OBSERVED: ["grey"]}
  108. #self.poke(self.OBSERVED, "grey")
  109. # NOT self.poke(self.OBSERVED, "grey")
  110. elif state == "green":
  111. return {self.OBSERVED: ["yellow"]}
  112. #self.poke(self.OBSERVED, "yellow")
  113. # NOT self.poke(self.OBSERVED, "grey")
  114. elif state == "yellow":
  115. return {self.OBSERVED: ["grey"]}
  116. #self.poke(self.OBSERVED, "grey")
  117. # NOT self.poke(self.OBSERVED, "yellow")
  118. else:
  119. raise DEVSException(\
  120. "unknown state <%s> in TrafficLight external transition function"\
  121. % state)
  122. ###
  123. def timeAdvance(self):
  124. """Time-Advance Function.
  125. """
  126. # Compute 'ta', the time to the next scheduled internal transition,
  127. # based (typically) on current State.
  128. state = self.state.get()
  129. if state == "red":
  130. return 3
  131. elif state == "green":
  132. return 2
  133. elif state == "yellow":
  134. return 1
  135. elif state == "manual":
  136. return INFINITY
  137. else:
  138. raise DEVSException(\
  139. "unknown state <%s> in TrafficLight time advance transition function"\
  140. % state)
  141. # ====================================================================== #
  142. class PolicemanMode:
  143. """Encapsulates the Policeman's state
  144. """
  145. ###
  146. def __init__(self, current="idle"):
  147. """Constructor (parameterizable).
  148. """
  149. self.set(current)
  150. def set(self, value="idle"):
  151. self.__mode=value
  152. def get(self):
  153. return self.__mode
  154. def __str__(self):
  155. return self.get()
  156. class Policeman(AtomicDEVS):
  157. """A policeman producing "toManual" and "toAutonomous" events:
  158. "toManual" when going from "idle" to "working" mode
  159. "toAutonomous" when going from "working" to "idle" mode
  160. """
  161. ###
  162. def __init__(self, name=None):
  163. """Constructor (parameterizable).
  164. """
  165. # Always call parent class' constructor FIRST:
  166. AtomicDEVS.__init__(self, name)
  167. # STATE:
  168. # Define 'state' attribute (initial sate):
  169. self.state = PolicemanMode("idle")
  170. # ELAPSED TIME:
  171. # Initialize 'elapsed time' attribute if required
  172. # (by default, value is 0.0):
  173. self.elapsed = 0
  174. # PORTS:
  175. # Declare as many input and output ports as desired
  176. # (usually store returned references in local variables):
  177. self.OUT = self.addOutPort(name="OUT")
  178. ###
  179. # Autonomous system (no input ports),
  180. # so no External Transition Function required
  181. #
  182. ###
  183. def intTransition(self):
  184. """Internal Transition Function.
  185. The policeman works forever, so only one mode.
  186. """
  187. state = self.state.get()
  188. if state == "idle":
  189. return PolicemanMode("working")
  190. elif state == "working":
  191. return PolicemanMode("idle")
  192. else:
  193. raise DEVSException(\
  194. "unknown state <%s> in Policeman internal transition function"\
  195. % state)
  196. ###
  197. def outputFnc(self):
  198. """Output Funtion.
  199. """
  200. # Send messages (events) to a subset of the atomic-DEVS'
  201. # output ports by means of the 'poke' method, i.e.:
  202. # The content of the messages is based (typically) on current State.
  203. state = self.state.get()
  204. if state == "idle":
  205. return {self.OUT: ["toManual"]}
  206. #self.poke(self.OUT, "toManual")
  207. elif state == "working":
  208. return {self.OUT: ["toAutonomous"]}
  209. #self.poke(self.OUT, "toAutonomous")
  210. else:
  211. raise DEVSException(\
  212. "unknown state <%s> in Policeman output function"\
  213. % state)
  214. ###
  215. def timeAdvance(self):
  216. """Time-Advance Function.
  217. """
  218. # Compute 'ta', the time to the next scheduled internal transition,
  219. # based (typically) on current State.
  220. state = self.state.get()
  221. if state == "idle":
  222. return 200
  223. elif state == "working":
  224. return 100
  225. else:
  226. raise DEVSException(\
  227. "unknown state <%s> in Policeman time advance function"\
  228. % state)
  229. # ====================================================================== #
  230. class TrafficSystem(CoupledDEVS):
  231. def __init__(self, name=None):
  232. """ A simple traffic system consisting of a Policeman and a TrafficLight.
  233. """
  234. # Always call parent class' constructor FIRST:
  235. CoupledDEVS.__init__(self, name)
  236. # Declare the coupled model's output ports:
  237. # Autonomous, so no output ports
  238. #self.OUT = self.addOutPort(name="OUT")
  239. # Declare the coupled model's sub-models:
  240. # The Policeman generating interrupts
  241. self.policeman = self.addSubModel(Policeman(name="policeman"))
  242. # The TrafficLight
  243. self.trafficLight = self.addSubModel(TrafficLight(name="trafficLight"))
  244. # Only connect ...
  245. self.connectPorts(self.policeman.OUT, self.trafficLight.INTERRUPT)