model.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. # Copyright 2014 Modelling, Simulation and Design Lab (MSDL) at
  2. # McGill University and the University of Antwerp (http://msdl.cs.mcgill.ca/)
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import sys
  16. # Import code for DEVS model representation:
  17. from DEVS import *
  18. from infinity import INFINITY
  19. class TrafficLightMode:
  20. """
  21. Encapsulates the system's state
  22. """
  23. def __init__(self, current="red"):
  24. """
  25. Constructor (parameterizable).
  26. """
  27. self.set(current)
  28. def set(self, value="red"):
  29. self.__colour=value
  30. def get(self):
  31. return self.__colour
  32. def __str__(self):
  33. return self.get()
  34. class TrafficLight(AtomicDEVS):
  35. """
  36. A traffic light
  37. """
  38. def __init__(self, name=None):
  39. """
  40. Constructor (parameterizable).
  41. """
  42. # Always call parent class' constructor FIRST:
  43. AtomicDEVS.__init__(self, name)
  44. # STATE:
  45. # Define 'state' attribute (initial sate):
  46. self.state = TrafficLightMode("red")
  47. # ELAPSED TIME:
  48. # Initialize 'elapsed time' attribute if required
  49. # (by default, value is 0.0):
  50. self.elapsed = 1.5
  51. # with elapsed time initially 1.5 and initially in
  52. # state "red", which has a time advance of 60,
  53. # there are 60-1.5 = 58.5time-units remaining until the first
  54. # internal transition
  55. # PORTS:
  56. # Declare as many input and output ports as desired
  57. # (usually store returned references in local variables):
  58. self.INTERRUPT = self.addInPort(name="INTERRUPT")
  59. self.OBSERVED = self.addOutPort(name="OBSERVED")
  60. def extTransition(self, inputs):
  61. """
  62. External Transition Function.
  63. """
  64. # Compute the new state 'Snew' based (typically) on current
  65. # State, Elapsed time parameters and calls to 'self.peek(self.IN)'.
  66. input = inputs.get(self.INTERRUPT)[0]
  67. state = self.state.get()
  68. if input == "toManual":
  69. if state == "manual":
  70. # staying in manual mode
  71. return TrafficLightMode("manual")
  72. elif state in ("red", "green", "yellow"):
  73. return TrafficLightMode("manual")
  74. elif input == "toAutonomous":
  75. if state == "manual":
  76. return TrafficLightMode("red")
  77. elif state in ("red", "green", "yellow"):
  78. # If toAutonomous is given while still autonomous, just stay in this state
  79. return self.state
  80. raise DEVSException(\
  81. "unknown state <%s> in TrafficLight external transition function"\
  82. % state)
  83. def intTransition(self):
  84. """
  85. Internal Transition Function.
  86. """
  87. state = self.state.get()
  88. if state == "red":
  89. return TrafficLightMode("green")
  90. elif state == "green":
  91. return TrafficLightMode("yellow")
  92. elif state == "yellow":
  93. return TrafficLightMode("red")
  94. else:
  95. raise DEVSException(\
  96. "unknown state <%s> in TrafficLight internal transition function"\
  97. % state)
  98. def outputFnc(self):
  99. """
  100. Output Funtion.
  101. """
  102. # A colourblind observer sees "grey" instead of "red" or "green".
  103. # BEWARE: ouput is based on the OLD state
  104. # and is produced BEFORE making the transition.
  105. # We'll encode an "observation" of the state the
  106. # system will transition to !
  107. # Send messages (events) to a subset of the atomic-DEVS'
  108. # output ports by means of the 'poke' method, i.e.:
  109. # The content of the messages is based (typically) on current State.
  110. state = self.state.get()
  111. if state == "red":
  112. return {self.OBSERVED: ["grey"]}
  113. elif state == "green":
  114. return {self.OBSERVED: ["yellow"]}
  115. elif state == "yellow":
  116. return {self.OBSERVED: ["grey"]}
  117. else:
  118. raise DEVSException(\
  119. "unknown state <%s> in TrafficLight external transition function"\
  120. % state)
  121. def timeAdvance(self):
  122. """
  123. Time-Advance Function.
  124. """
  125. # Compute 'ta', the time to the next scheduled internal transition,
  126. # based (typically) on current State.
  127. state = self.state.get()
  128. if state == "red":
  129. return 60
  130. elif state == "green":
  131. return 50
  132. elif state == "yellow":
  133. return 10
  134. elif state == "manual":
  135. return INFINITY
  136. else:
  137. raise DEVSException(\
  138. "unknown state <%s> in TrafficLight time advance transition function"\
  139. % state)
  140. class PolicemanMode:
  141. """
  142. Encapsulates the Policeman's state
  143. """
  144. def __init__(self, current="idle"):
  145. """
  146. Constructor (parameterizable).
  147. """
  148. self.set(current)
  149. def set(self, value="idle"):
  150. self.__mode=value
  151. def get(self):
  152. return self.__mode
  153. def __str__(self):
  154. return self.get()
  155. class Policeman(AtomicDEVS):
  156. """
  157. A policeman producing "toManual" and "toAutonomous" events:
  158. "toManual" when going from "idle" to "working" mode
  159. "toAutonomous" when going from "working" to "idle" mode
  160. """
  161. def __init__(self, name=None):
  162. """
  163. Constructor (parameterizable).
  164. """
  165. # Always call parent class' constructor FIRST:
  166. AtomicDEVS.__init__(self, name)
  167. # STATE:
  168. # Define 'state' attribute (initial sate):
  169. self.state = PolicemanMode("idle")
  170. # ELAPSED TIME:
  171. # Initialize 'elapsed time' attribute if required
  172. # (by default, value is 0.0):
  173. self.elapsed = 0
  174. # PORTS:
  175. # Declare as many input and output ports as desired
  176. # (usually store returned references in local variables):
  177. self.OUT = self.addOutPort(name="OUT")
  178. def intTransition(self):
  179. """
  180. Internal Transition Function.
  181. The policeman works forever, so only one mode.
  182. """
  183. state = self.state.get()
  184. if state == "idle":
  185. return PolicemanMode("working")
  186. elif state == "working":
  187. return PolicemanMode("idle")
  188. else:
  189. raise DEVSException(\
  190. "unknown state <%s> in Policeman internal transition function"\
  191. % state)
  192. def outputFnc(self):
  193. """
  194. Output Funtion.
  195. """
  196. # Send messages (events) to a subset of the atomic-DEVS'
  197. # output ports by means of the 'poke' method, i.e.:
  198. # The content of the messages is based (typically) on current State.
  199. state = self.state.get()
  200. if state == "idle":
  201. return {self.OUT: ["toManual"]}
  202. elif state == "working":
  203. return {self.OUT: ["toAutonomous"]}
  204. else:
  205. raise DEVSException(\
  206. "unknown state <%s> in Policeman output function"\
  207. % state)
  208. def timeAdvance(self):
  209. """
  210. Time-Advance Function.
  211. """
  212. # Compute 'ta', the time to the next scheduled internal transition,
  213. # based (typically) on current State.
  214. state = self.state.get()
  215. if state == "idle":
  216. return 200
  217. elif state == "working":
  218. return 100
  219. else:
  220. raise DEVSException(\
  221. "unknown state <%s> in Policeman time advance function"\
  222. % state)
  223. class Root(CoupledDEVS):
  224. def __init__(self, name="Root"):
  225. """
  226. A simple traffic system consisting of a Policeman and a TrafficLight.
  227. """
  228. # Always call parent class' constructor FIRST:
  229. CoupledDEVS.__init__(self, name)
  230. # Declare the coupled model's output ports:
  231. # Autonomous, so no output ports
  232. # Declare the coupled model's sub-models:
  233. # The Policeman generating interrupts
  234. self.policeman = self.addSubModel(Policeman(name="policeman"))
  235. # The TrafficLight
  236. self.trafficLight = self.addSubModel(TrafficLight(name="trafficLight"))
  237. # Only connect ...
  238. self.connectPorts(self.policeman.OUT, self.trafficLight.INTERRUPT)