trafficlight_gui.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. # Author: Joeri Exelmans
  2. import tkinter
  3. from srcgen import statechart
  4. from lib.common import Controller, TimerService, print_trace
  5. import sys, os
  6. sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../src')))
  7. from yakindu.rx import Observer
  8. import time
  9. TIME_SCALE = 1
  10. def scaled_now():
  11. return time.perf_counter_ns() * TIME_SCALE
  12. OFF_COLOR = '#333333'
  13. # Runs the Controller as close as possible to the wall-clock.
  14. # Depending on how fast your computer is, simulated time will always run a tiny bit behind wall-clock time, but this error will NOT grow over time.
  15. class RealTimeSimulation:
  16. def __init__(self, controller, tk, update_callback, scale=1.0):
  17. self.controller = controller
  18. self.tk = tk
  19. self.update_callback = update_callback
  20. self.scale = scale
  21. self.scheduled_id = None
  22. self.purposefully_behind = 0
  23. def add_input(self, sc, event, value=None):
  24. now = scaled_now() + self.purposefully_behind
  25. self.controller.add_input(now, sc, event, value)
  26. self.interrupt()
  27. return now - self.controller.start_time
  28. def interrupt(self):
  29. if self.scheduled_id is not None:
  30. self.tk.after_cancel(self.scheduled_id)
  31. if self.controller.have_event():
  32. now = scaled_now()
  33. earliest = self.controller.get_earliest()
  34. sleep_time = (earliest - now) // TIME_SCALE
  35. if sleep_time < 0:
  36. self.purposefully_behind = sleep_time
  37. else:
  38. self.purposefully_behind = 0
  39. def callback():
  40. # print("run_until...")
  41. self.controller.run_until(now + self.purposefully_behind)
  42. self.update_callback()
  43. self.interrupt()
  44. # print("sleeping for", int(sleep_time / 1000000), "ms")
  45. self.scheduled_id = self.tk.after(int(sleep_time / 1000000), callback)
  46. else:
  47. print("sleeping until woken up")
  48. if __name__ == "__main__":
  49. # In these arrays, we will keep tuples (timestamp, event_name) of all input and output events, only to print them out at the end.
  50. input_trace = []
  51. output_trace = []
  52. toplevel = tkinter.Tk()
  53. toplevel.resizable(0,0)
  54. string_simtime = tkinter.StringVar(toplevel, '0.000')
  55. # For some reason, we have to initialize all 3 statecharts in code (instead of just the 'system' statechart)
  56. sc = statechart.Statechart()
  57. controller = Controller()
  58. sc.timer_service = TimerService(controller)
  59. # Callback function to display the simulated time:
  60. def update_callback():
  61. string_simtime.set('{:.3f}'.format((controller.simulated_time - controller.start_time) / 1000000000))
  62. sim = RealTimeSimulation(controller, toplevel, update_callback, scale=10.0)
  63. # Callback function for generating input events
  64. def generateInput(event_name):
  65. print("input event:", event_name)
  66. timestamp = sim.add_input(sc, event_name)
  67. tup = (timestamp, event_name, None) # our input events don't have a parameter
  68. input_trace.append(tup)
  69. # Create widgets....
  70. toplevel.title("Traffic Light")
  71. sim_frame = tkinter.Frame(toplevel)
  72. tkinter.Label(sim_frame, text="Simulated time is now:").pack(side=tkinter.LEFT)
  73. tkinter.Entry(sim_frame, state='readonly', width=8, textvariable=string_simtime, justify=tkinter.RIGHT).pack(side=tkinter.LEFT)
  74. tkinter.Label(sim_frame, text="s").pack(side=tkinter.LEFT)
  75. sim_frame.pack(side=tkinter.TOP)
  76. canvas = tkinter.Canvas(toplevel, bd=0, bg='#000000', width=100, height=300)
  77. red = canvas.create_oval(10,10,90,90, fill=OFF_COLOR)
  78. yellow = canvas.create_oval(10,110,90,190, fill=OFF_COLOR)
  79. green = canvas.create_oval(10,210,90,290, fill=OFF_COLOR)
  80. canvas.pack()
  81. # Buttons and LED display
  82. button_frame = tkinter.Frame(toplevel)
  83. button = tkinter.Button(button_frame, text="TOGGLE MODE")
  84. # Send event to our SC when button pressed or released:
  85. button.bind("<ButtonPress>", lambda _: generateInput("button_pressed"))
  86. button.bind("<ButtonRelease>", lambda _: generateInput("button_released"))
  87. button.pack(side=tkinter.LEFT)
  88. button_car = tkinter.Button(button_frame, text="PRETEND CAR DETECTED", command=lambda: generateInput("car_detected"))
  89. button_car.pack(side=tkinter.RIGHT)
  90. led_frame = tkinter.Frame(button_frame)
  91. string_led = tkinter.StringVar()
  92. led_entry = tkinter.Entry(led_frame, state='readonly', width=8, textvariable=string_led, justify=tkinter.RIGHT)
  93. led_entry.grid(row=0, column=1)
  94. led_frame.pack(side=tkinter.RIGHT)
  95. button_frame.pack()
  96. # Handle output events...
  97. # Bunch of callback functions
  98. def setRed(value):
  99. if value:
  100. canvas.itemconfigure(red, fill='#ff2222')
  101. else:
  102. canvas.itemconfigure(red, fill=OFF_COLOR)
  103. def setYellow(value):
  104. if value:
  105. canvas.itemconfigure(yellow, fill='#ffff22')
  106. else:
  107. canvas.itemconfigure(yellow, fill=OFF_COLOR)
  108. def setGreen(value):
  109. if value:
  110. canvas.itemconfigure(green, fill='#22ff22')
  111. else:
  112. canvas.itemconfigure(green, fill=OFF_COLOR)
  113. def setLed(value):
  114. if value:
  115. string_led.set("LED ON")
  116. led_entry.config(fg='red')
  117. else:
  118. string_led.set("LED OFF")
  119. led_entry.config(fg='grey')
  120. setLed(False)
  121. # Little helper class that turns a callback function into an YAKINDU-'Observer'.
  122. class CallbackObserver(Observer):
  123. def __init__(self, callback, event_name):
  124. self.callback = callback
  125. self.event_name = event_name
  126. def next(self, value=None):
  127. print("output event: %s(%s)" % (self.event_name, value))
  128. tup = (controller.simulated_time - controller.start_time, self.event_name, value)
  129. output_trace.append(tup)
  130. if value is None:
  131. self.callback()
  132. else:
  133. self.callback(value)
  134. # Bind output events to callbacks
  135. sc.set_red_observable.subscribe(CallbackObserver(setRed, "set_red"))
  136. sc.set_yellow_observable.subscribe(CallbackObserver(setYellow, "set_yellow"))
  137. sc.set_green_observable.subscribe(CallbackObserver(setGreen, "set_green"))
  138. sc.set_led_observable.subscribe(CallbackObserver(setLed, "set_led"))
  139. # Enter default states, start main loop...
  140. controller.start(scaled_now())
  141. sc.enter()
  142. sim.interrupt() # schedule first wakeup
  143. toplevel.mainloop()
  144. # Exiting...
  145. print("Exiting...")
  146. print("Full trace (you can add this to the SCENARIOS in test.py)...")
  147. print("{")
  148. print(" \"name\": \"interactive\",")
  149. print(" \"input_trace\": ", end='')
  150. print_trace(input_trace, 4)
  151. print(",")
  152. print(" \"output_trace\": ", end='')
  153. print_trace(output_trace, 4)
  154. print(",")
  155. print("}")