trafficLightModel.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. # -*- coding: Latin-1 -*-
  2. ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
  3. # trafficLight.py --- simple Traffic Light example
  4. # --------------------------------
  5. # October 2005
  6. # Hans Vangheluwe
  7. # McGill University (Montréal)
  8. # --------------------------------
  9. ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
  10. # Add the directory where pydevs lives to Python's import path
  11. import sys
  12. # Import code for DEVS model representation:
  13. from pypdevs.infinity import *
  14. from pypdevs.DEVS import *
  15. # Import for uniform random number generators
  16. from random import uniform
  17. from random import randint
  18. # ====================================================================== #
  19. class TrafficLightMode:
  20. """Encapsulates the system's state
  21. """
  22. ###
  23. def __init__(self, current="red"):
  24. """Constructor (parameterizable).
  25. """
  26. self.set(current)
  27. def set(self, value="red"):
  28. self.__colour=value
  29. def get(self):
  30. return self.__colour
  31. def __str__(self):
  32. return self.get()
  33. class TrafficLight(AtomicDEVS):
  34. """A traffic light
  35. """
  36. ###
  37. def __init__(self, name=None):
  38. """Constructor (parameterizable).
  39. """
  40. # Always call parent class' constructor FIRST:
  41. AtomicDEVS.__init__(self, name)
  42. # STATE:
  43. # Define 'state' attribute (initial sate):
  44. self.state = TrafficLightMode("red")
  45. # PORTS:
  46. # Declare as many input and output ports as desired
  47. # (usually store returned references in local variables):
  48. self.INTERRUPT = self.addInPort(name="INTERRUPT")
  49. self.OBSERVED = self.addOutPort(name="OBSERVED")
  50. ###
  51. def extTransition(self, inputs):
  52. """External Transition Function."""
  53. # Compute the new state 'Snew' based (typically) on current
  54. # State, Elapsed time parameters and calls to 'self.peek(self.IN)'.
  55. #input = self.peek(self.INTERRUPT)
  56. input = inputs[self.INTERRUPT][0]
  57. state = self.state.get()
  58. if input == "toManual":
  59. if state == "manual":
  60. # staying in manual mode
  61. return TrafficLightMode("manual")
  62. if state in ("red", "green", "yellow"):
  63. return TrafficLightMode("manual")
  64. else:
  65. raise DEVSException(\
  66. "unknown state <%s> in TrafficLight external transition function"\
  67. % state)
  68. if input == "toAutonomous":
  69. if state == "manual":
  70. return TrafficLightMode("red")
  71. else:
  72. raise DEVSException(\
  73. "unknown state <%s> in TrafficLight external transition function"\
  74. % state)
  75. raise DEVSException(\
  76. "unknown input <%s> in TrafficLight external transition function"\
  77. % input)
  78. ###
  79. def intTransition(self):
  80. """Internal Transition Function.
  81. """
  82. state = self.state.get()
  83. if state == "red":
  84. return TrafficLightMode("green")
  85. elif state == "green":
  86. return TrafficLightMode("yellow")
  87. elif state == "yellow":
  88. return TrafficLightMode("red")
  89. else:
  90. raise DEVSException(\
  91. "unknown state <%s> in TrafficLight internal transition function"\
  92. % state)
  93. ###
  94. def outputFnc(self):
  95. """Output Funtion.
  96. """
  97. # A colourblind observer sees "grey" instead of "red" or "green".
  98. # BEWARE: ouput is based on the OLD state
  99. # and is produced BEFORE making the transition.
  100. # We'll encode an "observation" of the state the
  101. # system will transition to !
  102. # Send messages (events) to a subset of the atomic-DEVS'
  103. # output ports by means of the 'poke' method, i.e.:
  104. # The content of the messages is based (typically) on current State.
  105. state = self.state.get()
  106. if state == "red":
  107. return {self.OBSERVED: ["grey"]}
  108. #self.poke(self.OBSERVED, "grey")
  109. # NOT self.poke(self.OBSERVED, "grey")
  110. elif state == "green":
  111. return {self.OBSERVED: ["yellow"]}
  112. #self.poke(self.OBSERVED, "yellow")
  113. # NOT self.poke(self.OBSERVED, "grey")
  114. elif state == "yellow":
  115. return {self.OBSERVED: ["grey"]}
  116. #self.poke(self.OBSERVED, "grey")
  117. # NOT self.poke(self.OBSERVED, "yellow")
  118. else:
  119. raise DEVSException(\
  120. "unknown state <%s> in TrafficLight external transition function"\
  121. % state)
  122. ###
  123. def timeAdvance(self):
  124. """Time-Advance Function.
  125. """
  126. # Compute 'ta', the time to the next scheduled internal transition,
  127. # based (typically) on current State.
  128. state = self.state.get()
  129. if state == "red":
  130. return 3
  131. elif state == "green":
  132. return 2
  133. elif state == "yellow":
  134. return 1
  135. elif state == "manual":
  136. return INFINITY
  137. else:
  138. raise DEVSException(\
  139. "unknown state <%s> in TrafficLight time advance transition function"\
  140. % state)
  141. # ====================================================================== #
  142. class PolicemanMode:
  143. """Encapsulates the Policeman's state
  144. """
  145. ###
  146. def __init__(self, current="idle"):
  147. """Constructor (parameterizable).
  148. """
  149. self.set(current)
  150. def set(self, value="idle"):
  151. self.__mode=value
  152. def get(self):
  153. return self.__mode
  154. def __str__(self):
  155. return self.get()
  156. class Policeman(AtomicDEVS):
  157. """A policeman producing "toManual" and "toAutonomous" events:
  158. "toManual" when going from "idle" to "working" mode
  159. "toAutonomous" when going from "working" to "idle" mode
  160. """
  161. ###
  162. def __init__(self, name=None):
  163. """Constructor (parameterizable).
  164. """
  165. # Always call parent class' constructor FIRST:
  166. AtomicDEVS.__init__(self, name)
  167. # STATE:
  168. # Define 'state' attribute (initial sate):
  169. self.state = PolicemanMode("idle")
  170. # ELAPSED TIME:
  171. # Initialize 'elapsed time' attribute if required
  172. # (by default, value is 0.0):
  173. self.elapsed = 0
  174. # PORTS:
  175. # Declare as many input and output ports as desired
  176. # (usually store returned references in local variables):
  177. self.OUT = self.addOutPort(name="OUT")
  178. ###
  179. # Autonomous system (no input ports),
  180. # so no External Transition Function required
  181. #
  182. ###
  183. def intTransition(self):
  184. """Internal Transition Function.
  185. The policeman works forever, so only one mode.
  186. """
  187. state = self.state.get()
  188. if state == "idle":
  189. return PolicemanMode("working")
  190. elif state == "working":
  191. return PolicemanMode("idle")
  192. else:
  193. raise DEVSException(\
  194. "unknown state <%s> in Policeman internal transition function"\
  195. % state)
  196. ###
  197. def outputFnc(self):
  198. """Output Funtion.
  199. """
  200. # Send messages (events) to a subset of the atomic-DEVS'
  201. # output ports by means of the 'poke' method, i.e.:
  202. # The content of the messages is based (typically) on current State.
  203. state = self.state.get()
  204. if state == "idle":
  205. return {self.OUT: ["toManual"]}
  206. #self.poke(self.OUT, "toManual")
  207. elif state == "working":
  208. return {self.OUT: ["toAutonomous"]}
  209. #self.poke(self.OUT, "toAutonomous")
  210. else:
  211. raise DEVSException(\
  212. "unknown state <%s> in Policeman output function"\
  213. % state)
  214. ###
  215. def timeAdvance(self):
  216. """Time-Advance Function.
  217. """
  218. # Compute 'ta', the time to the next scheduled internal transition,
  219. # based (typically) on current State.
  220. state = self.state.get()
  221. if state == "idle":
  222. return 200
  223. elif state == "working":
  224. return 100
  225. else:
  226. raise DEVSException(\
  227. "unknown state <%s> in Policeman time advance function"\
  228. % state)
  229. # ====================================================================== #
  230. class TrafficSystem(CoupledDEVS):
  231. def __init__(self, name=None):
  232. """ A simple traffic system consisting of a Policeman and a TrafficLight.
  233. """
  234. # Always call parent class' constructor FIRST:
  235. CoupledDEVS.__init__(self, name)
  236. # Declare the coupled model's output ports:
  237. # Autonomous, so no output ports
  238. #self.OUT = self.addOutPort(name="OUT")
  239. # Declare the coupled model's sub-models:
  240. # The Policeman generating interrupts
  241. self.policeman = self.addSubModel(Policeman(name="policeman"))
  242. # The TrafficLight
  243. self.trafficLight = self.addSubModel(TrafficLight(name="trafficLight"))
  244. # Only connect ...
  245. self.connectPorts(self.policeman.OUT, self.trafficLight.INTERRUPT)
  246. #self.connectPorts(self.trafficLight.OBSERVED, self.OUT)
  247. def select(self, immList):
  248. """Give the Policeman highest priority.
  249. Note how the technique used below can
  250. be generalized to encode priorities based on model type.
  251. To distinguish between models of the same type,
  252. the name or unique ID should be used.
  253. """
  254. # return the first Policeman instance in the immList
  255. for i in range(len(immList)):
  256. #if immList[i].__class__ == TrafficLight:
  257. if immList[i].__class__ == Policeman:
  258. return immList[i]
  259. # if no Policeman instances found, return the last entry
  260. return immList[-1]
  261. # Alternative: randomly choose among imminent submodels
  262. #return immList[randint(0,len(immList))]
  263. # If the select method is not defined, the immList[0]
  264. # will be chosen by default.
  265. # ====================================================================== #