model.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. # Copyright 2014 Modelling, Simulation and Design Lab (MSDL) at
  2. # McGill University and the University of Antwerp (http://msdl.cs.mcgill.ca/)
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import sys
  16. # Import code for DEVS model representation:
  17. from pypdevs.DEVS import *
  18. from pypdevs.infinity import INFINITY
  19. class TrafficLightMode:
  20. """
  21. Encapsulates the system's state
  22. """
  23. def __init__(self, current="red"):
  24. """
  25. Constructor (parameterizable).
  26. """
  27. self.set(current)
  28. def set(self, value="red"):
  29. self.__colour=value
  30. def get(self):
  31. return self.__colour
  32. def __str__(self):
  33. return self.get()
  34. class TrafficLight(AtomicDEVS):
  35. """
  36. A traffic light
  37. """
  38. def __init__(self, name=None):
  39. """
  40. Constructor (parameterizable).
  41. """
  42. # Always call parent class' constructor FIRST:
  43. AtomicDEVS.__init__(self, name)
  44. # STATE:
  45. # Define 'state' attribute (initial sate):
  46. self.state = TrafficLightMode("red")
  47. # ELAPSED TIME:
  48. # Initialize 'elapsed time' attribute if required
  49. # (by default, value is 0.0):
  50. self.elapsed = 1.5
  51. # with elapsed time initially 1.5 and initially in
  52. # state "red", which has a time advance of 60,
  53. # there are 60-1.5 = 58.5time-units remaining until the first
  54. # internal transition
  55. # PORTS:
  56. # Declare as many input and output ports as desired
  57. # (usually store returned references in local variables):
  58. self.INTERRUPT = self.addInPort(name="INTERRUPT")
  59. self.OBSERVED = self.addOutPort(name="OBSERVED")
  60. def extTransition(self, inputs):
  61. """
  62. External Transition Function.
  63. """
  64. # Compute the new state 'Snew' based (typically) on current
  65. # State, Elapsed time parameters and calls to 'self.peek(self.IN)'.
  66. input = inputs.get(self.INTERRUPT)[0]
  67. state = self.state.get()
  68. if input == "toManual":
  69. if state == "manual":
  70. # staying in manual mode
  71. return TrafficLightMode("manual")
  72. elif state in ("red", "green", "yellow"):
  73. return TrafficLightMode("manual")
  74. elif input == "toAutonomous":
  75. if state == "manual":
  76. return TrafficLightMode("red")
  77. elif state in ("red", "green", "yellow"):
  78. # If toAutonomous is given while still autonomous, just stay in this state
  79. return self.state
  80. raise DEVSException(\
  81. "unknown state <%s> in TrafficLight external transition function"\
  82. % state)
  83. def intTransition(self):
  84. """
  85. Internal Transition Function.
  86. """
  87. state = self.state.get()
  88. if state == "red":
  89. return TrafficLightMode("green")
  90. elif state == "green":
  91. return TrafficLightMode("yellow")
  92. elif state == "yellow":
  93. return TrafficLightMode("red")
  94. else:
  95. raise DEVSException(\
  96. "unknown state <%s> in TrafficLight internal transition function"\
  97. % state)
  98. def outputFnc(self):
  99. """
  100. Output Funtion.
  101. """
  102. # A colourblind observer sees "grey" instead of "red" or "green".
  103. # BEWARE: ouput is based on the OLD state
  104. # and is produced BEFORE making the transition.
  105. # We'll encode an "observation" of the state the
  106. # system will transition to !
  107. # Send messages (events) to a subset of the atomic-DEVS'
  108. # output ports by means of the 'poke' method, i.e.:
  109. # The content of the messages is based (typically) on current State.
  110. state = self.state.get()
  111. if state == "red":
  112. return {self.OBSERVED: ["grey"]}
  113. elif state == "green":
  114. return {self.OBSERVED: ["yellow"]}
  115. elif state == "yellow":
  116. return {self.OBSERVED: ["grey"]}
  117. else:
  118. raise DEVSException(\
  119. "unknown state <%s> in TrafficLight external transition function"\
  120. % state)
  121. def timeAdvance(self):
  122. """
  123. Time-Advance Function.
  124. """
  125. # Compute 'ta', the time to the next scheduled internal transition,
  126. # based (typically) on current State.
  127. state = self.state.get()
  128. if state == "red":
  129. return 60
  130. elif state == "green":
  131. return 50
  132. elif state == "yellow":
  133. return 10
  134. elif state == "manual":
  135. return INFINITY
  136. else:
  137. raise DEVSException(\
  138. "unknown state <%s> in TrafficLight time advance transition function"\
  139. % state)
  140. class Policeman(AtomicDEVS):
  141. """
  142. A policeman producing "toManual" and "toAutonomous" events:
  143. "toManual" when going from "idle" to "working" mode
  144. "toAutonomous" when going from "working" to "idle" mode
  145. """
  146. def __init__(self, name=None):
  147. """
  148. Constructor (parameterizable).
  149. """
  150. # Always call parent class' constructor FIRST:
  151. AtomicDEVS.__init__(self, name)
  152. # STATE:
  153. # Define 'state' attribute (initial sate):
  154. self.state = "idle_at_1"
  155. # ELAPSED TIME:
  156. # Initialize 'elapsed time' attribute if required
  157. # (by default, value is 0.0):
  158. self.elapsed = 0
  159. # PORTS:
  160. # Declare as many input and output ports as desired
  161. # (usually store returned references in local variables):
  162. self.OUT = self.addOutPort(name="OUT")
  163. def intTransition(self):
  164. """
  165. Internal Transition Function.
  166. The policeman works forever, so only one mode.
  167. """
  168. state = self.state
  169. if state == "idle_at_1":
  170. return "working_at_1"
  171. elif state == "working_at_1":
  172. return "moving_from_1_to_2"
  173. elif state == "moving_from_1_to_2":
  174. return "idle_at_2"
  175. elif state == "idle_at_2":
  176. return "working_at_2"
  177. elif state == "working_at_2":
  178. return "moving_from_2_to_1"
  179. elif state == "moving_from_2_to_1":
  180. return "idle_at_1"
  181. else:
  182. raise DEVSException(\
  183. "unknown state <%s> in Policeman internal transition function"\
  184. % state)
  185. def outputFnc(self):
  186. """
  187. Output Funtion.
  188. """
  189. # Send messages (events) to a subset of the atomic-DEVS'
  190. # output ports by means of the 'poke' method, i.e.:
  191. # The content of the messages is based (typically) on current State.
  192. state = self.state
  193. if state == "idle_at_1":
  194. # Will start working
  195. return {self.OUT: ["toManual"]}
  196. elif state == "working_at_1":
  197. # Will have to put it in autonomous mode again
  198. return {self.OUT: ["toAutonomous"]}
  199. elif state == "moving_from_1_to_2":
  200. # Will simply stand idle while waiting
  201. return {}
  202. elif state == "idle_at_2":
  203. return {self.OUT: ["toManual"]}
  204. elif state == "working_at_2":
  205. return {self.OUT: ["toAutonomous"]}
  206. elif state == "moving_from_2_to_1":
  207. return {}
  208. else:
  209. raise DEVSException(\
  210. "unknown state <%s> in Policeman internal transition function"\
  211. % state)
  212. def timeAdvance(self):
  213. """
  214. Time-Advance Function.
  215. """
  216. # Compute 'ta', the time to the next scheduled internal transition,
  217. # based (typically) on current State.
  218. state = self.state
  219. if "idle" in state:
  220. return 50
  221. elif "working" in state:
  222. return 100
  223. elif "moving" in state:
  224. return 150
  225. else:
  226. raise DEVSException(\
  227. "unknown state <%s> in Policeman time advance function"\
  228. % state)
  229. def modelTransition(self, state):
  230. if self.state == "moving_from_1_to_2":
  231. state["destination"] = "2"
  232. return True
  233. elif self.state == "moving_from_2_to_1":
  234. state["destination"] = "1"
  235. return True
  236. else:
  237. return False
  238. class TrafficSystem(CoupledDEVS):
  239. def __init__(self, name=None):
  240. """
  241. A simple traffic system consisting of a Policeman and a TrafficLight.
  242. """
  243. # Always call parent class' constructor FIRST:
  244. CoupledDEVS.__init__(self, name)
  245. # Declare the coupled model's output ports:
  246. # Autonomous, so no output ports
  247. # Declare the coupled model's sub-models:
  248. # The Policeman generating interrupts
  249. self.policeman = self.addSubModel(Policeman(name="policeman"))
  250. # Two TrafficLights
  251. self.trafficLight1 = self.addSubModel(TrafficLight(name="trafficLight1"))
  252. self.trafficLight2 = self.addSubModel(TrafficLight(name="trafficLight2"))
  253. # Only connect to the first traffic light
  254. self.connectPorts(self.policeman.OUT, self.trafficLight1.INTERRUPT)
  255. def modelTransition(self, state):
  256. # Policeman triggered a mode, so switch the connection
  257. if state["destination"] == "1":
  258. self.disconnectPorts(self.policeman.OUT, self.trafficLight2.INTERRUPT)
  259. self.connectPorts(self.policeman.OUT, self.trafficLight1.INTERRUPT)
  260. elif state["destination"] == "2":
  261. self.disconnectPorts(self.policeman.OUT, self.trafficLight1.INTERRUPT)
  262. self.connectPorts(self.policeman.OUT, self.trafficLight2.INTERRUPT)
  263. # Don't propagate
  264. return False