eventloop.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. from abc import *
  2. from sccd.realtime.time import *
  3. from sccd.controller.controller import *
  4. import queue
  5. ScheduledID = Any
  6. # The interface for declaring 3rd party event loop implementations
  7. @dataclass
  8. class EventLoopImplementation(ABC):
  9. @abstractmethod
  10. def time_unit(self) -> Duration:
  11. pass
  12. @abstractmethod
  13. def schedule(self, timeout: int, callback: Callable[[],None]) -> ScheduledID:
  14. pass
  15. @abstractmethod
  16. def cancel(self, id: ScheduledID):
  17. pass
  18. # Event loop "platform".
  19. # This class is NOT thread-safe.
  20. class EventLoop:
  21. def __init__(self, controller: Controller, event_loop: EventLoopImplementation, time_impl: TimeImplementation = DefaultTimeImplementation):
  22. delta = controller.cd.get_delta()
  23. self.timer = Timer(time_impl, unit=delta) # will give us timestamps in model unit
  24. self.controller = controller
  25. self.event_loop = event_loop
  26. # got to convert from model unit to eventloop native unit for scheduling
  27. self.to_event_loop_unit = lambda x: int(get_conversion_f(delta, event_loop.time_unit())(x))
  28. # ID of currently scheduled task.
  29. # The actual type of this attribute depends on the event loop implementation.
  30. self.scheduled = None
  31. # Keeps the model responsive if we cannot keep up with wallclock time.
  32. self.purposefully_behind = 0
  33. def _wakeup(self):
  34. self.controller.run_until(self.timer.now() + self.purposefully_behind)
  35. # back to sleep
  36. next_wakeup = self.controller.next_wakeup()
  37. if next_wakeup is not None:
  38. sleep_duration = next_wakeup - self.timer.now()
  39. if sleep_duration < 0:
  40. self.purposefully_behind = sleep_duration
  41. sleep_duration = 0
  42. else:
  43. self.purposefully_behind = 0
  44. self.scheduled = self.event_loop.schedule(self.to_event_loop_unit(sleep_duration), self._wakeup)
  45. else:
  46. self.scheduled = None
  47. def start(self):
  48. self.timer.start()
  49. self._wakeup()
  50. def now(self):
  51. return self.timer.now() + self.purposefully_behind
  52. # Add input event with timestamp 'now'
  53. def add_input_now(self, port, event_name, params=[]):
  54. self.controller.add_input(
  55. timestamp=self.now(), port=port, event_name=event_name, params=params)
  56. if self.scheduled:
  57. self.event_loop.cancel(self.scheduled)
  58. self.event_loop.schedule(0, self._wakeup)
  59. # Extension to the EventLoop class with a thread-safe method for adding input events.
  60. # Allows other threads to add input to the Controller, which is useful when doing blocking IO.
  61. # It is probably cleaner to do async IO and use the regular EventLoop class instead.
  62. # Input events added in a thread-safe manner are added to a separate (thread-safe) queue. A bit hackish, this queue is regularly checked (polled) for new events from the 3rd party (e.g. Tk) event loop.
  63. # Perhaps a better alternative to polling is Yentl's tk.event_generate solution.
  64. class ThreadSafeEventLoop(EventLoop):
  65. def __init__(self, controller: Controller, event_loop: EventLoopImplementation, time_impl: TimeImplementation = DefaultTimeImplementation):
  66. super().__init__(controller, event_loop, time_impl)
  67. # thread-safe queue
  68. self.queue = queue.Queue()
  69. # check regularly if queue contains new events
  70. self.poll_interval = duration(100, Millisecond) // event_loop.time_unit()
  71. # override
  72. def _wakeup(self):
  73. while True:
  74. try:
  75. timestamp, port, event_name, params = self.queue.get_nowait()
  76. except queue.Empty:
  77. break
  78. self.controller.add_input(timestamp, port, event_name, params)
  79. self.controller.run_until(self.timer.now() + self.purposefully_behind)
  80. next_wakeup = self.controller.next_wakeup()
  81. if next_wakeup is not None:
  82. sleep_duration = next_wakeup - self.timer.now()
  83. if sleep_duration < 0:
  84. self.purposefully_behind = sleep_duration
  85. sleep_duration = 0
  86. else:
  87. self.purposefully_behind = 0
  88. self.scheduled = self.event_loop.schedule(min(self.to_event_loop_unit(sleep_duration), self.poll_interval), self._wakeup)
  89. else:
  90. self.scheduled = self.event_loop.schedule(self.poll_interval, self._wakeup)
  91. # Safe to call this method from any thread
  92. def add_input_now_threadsafe(self, port, event_name, params=[]):
  93. self.queue.put((self.now(), port, event_name, params))