eventloop.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. from sccd.controller.realtime import *
  2. import tkinter
  3. ScheduledID = Any
  4. @dataclass
  5. class EventLoopImplementation(ABC):
  6. @abstractmethod
  7. def time_unit(self) -> Duration:
  8. pass
  9. @abstractmethod
  10. def schedule(self) -> Callable[[int, Callable[[],None]], ScheduledID]:
  11. pass
  12. @abstractmethod
  13. def cancel(self) -> Callable[[ScheduledID], None]:
  14. pass
  15. @dataclass
  16. class TkInterImplementation(EventLoopImplementation):
  17. tk: tkinter.Tk
  18. def time_unit(self) -> Duration:
  19. return duration(1, Millisecond)
  20. def schedule(self) -> Callable[[int, Callable[[],None]], ScheduledID]:
  21. return self.tk.after
  22. def cancel(self) -> Callable[[ScheduledID], None]:
  23. return self.tk.after_cancel
  24. class EventLoop:
  25. def __init__(self, cd, event_loop, output_callback, time_impl=DefaultTimeImplementation):
  26. self.timer = Timer(time_impl, unit=cd.globals.delta) # will give us timestamps in model unit
  27. self.controller = Controller(cd)
  28. self.event_loop = event_loop
  29. self.output_callback = output_callback
  30. self.event_loop_convert = lambda x: int(get_conversion_f(
  31. cd.globals.delta, event_loop.time_unit())(x)) # got to convert from model unit to eventloop native unit for scheduling
  32. self.scheduled = None
  33. self.queue = queue.Queue()
  34. def _wakeup(self):
  35. # run controller - output will accumulate in queue
  36. self.controller.run_until(self.timer.now(), self.queue)
  37. # process output
  38. try:
  39. while True:
  40. big_step_output = self.queue.get_nowait()
  41. self.output_callback(big_step_output)
  42. except queue.Empty:
  43. pass
  44. # go to sleep
  45. # convert our statechart's timestamp to tkinter's (100 us -> 1 ms)
  46. sleep_duration = self.event_loop_convert(
  47. self.controller.next_wakeup() - self.controller.simulated_time)
  48. self.scheduled = self.event_loop.schedule()(sleep_duration, self._wakeup)
  49. # print("sleeping %d ms" % sleep_duration)
  50. def start(self):
  51. self.timer.start()
  52. self._wakeup()
  53. def pause(self):
  54. self.timer.pause()
  55. self.event_loop.cancel()(self.scheduled)
  56. # Add input. Does not automatically 'wake up' the controller if it is sleeping.
  57. # If you want the controller to respond immediately, call 'interrupt'.
  58. def add_input(self, event):
  59. offset = self.controller.simulated_time - self.timer.now()
  60. event.time_offset += offset * self.timer.unit
  61. self.controller.add_input(event)
  62. # Do NOT call while paused!
  63. def interrupt(self):
  64. self.event_loop.cancel()(self.scheduled)
  65. self._wakeup()