controller.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import queue
  2. import dataclasses
  3. from typing import Dict, List, Optional
  4. from sccd.controller.event_queue import *
  5. from sccd.statechart.dynamic.event import *
  6. from sccd.controller.object_manager import *
  7. from sccd.util.debug import print_debug
  8. from sccd.model.model import *
  9. @dataclasses.dataclass
  10. class InputEvent:
  11. name: str
  12. port: str
  13. params: List[Any]
  14. time_offset: Duration
  15. Timestamp = int
  16. # The Controller class is a primitive that can be used to build backends of any kind:
  17. # Threads, integration with existing event loop, game loop, test framework, ...
  18. # The Controller class itself is NOT thread-safe.
  19. class Controller:
  20. @dataclasses.dataclass(eq=False, frozen=True)
  21. class EventQueueEntry:
  22. event: Event
  23. targets: List[Instance]
  24. def __init__(self, model: AbstractModel):
  25. self.model = model
  26. self.object_manager = ObjectManager(model)
  27. self.queue: EventQueue[Timestamp, EventQueueEntry] = EventQueue()
  28. self.simulated_time = 0 # integer
  29. self.initialized = False
  30. self.model.globals.assert_ready()
  31. # print_debug("model delta is %s" % str(self.model.globals.delta))
  32. # First call to 'run_until' method initializes
  33. self.run_until = self._initialize
  34. def _duration_to_time_offset(self, d: Duration) -> int:
  35. if self.model.globals.delta == duration(0):
  36. return 0
  37. return d // self.model.globals.delta
  38. def add_input(self, input: InputEvent):
  39. if input.name == "":
  40. raise Exception("Input event can't have an empty name.")
  41. # try:
  42. # self.model.globals.inports.get_id(input.port)
  43. # except KeyError as e:
  44. # raise Exception("No such port: '%s'" % input.port) from e
  45. try:
  46. event_id = self.model.globals.events.get_id(input.name)
  47. except KeyError as e:
  48. raise Exception("No such event: '%s'" % input.name) from e
  49. offset = self._duration_to_time_offset(input.time_offset)
  50. e = Event(
  51. id=event_id,
  52. name=input.name,
  53. port=input.port,
  54. params=input.params)
  55. # For now, add events received on input ports to all instances.
  56. # In the future, we can optimize this by keeping a mapping from port name to a list of instances
  57. # potentially responding to the event
  58. self.queue.add(self.simulated_time+offset,
  59. Controller.EventQueueEntry(e, self.object_manager.instances))
  60. # Get timestamp of next entry in event queue
  61. def next_wakeup(self) -> Optional[Timestamp]:
  62. return self.queue.earliest_timestamp()
  63. # Returns duration since start
  64. def get_simulated_duration(self) -> Duration:
  65. return (self.model.globals.delta * self.simulated_time)
  66. # Helper. Put big step output events in the event queue or add them to the right output listeners.
  67. def _process_big_step_output(self, events: List[OutputEvent], pipe: queue.Queue):
  68. pipe_events = []
  69. for e in events:
  70. if isinstance(e.target, InstancesTarget):
  71. offset = self._duration_to_time_offset(e.time_offset)
  72. self.queue.add(self.simulated_time + offset, Controller.EventQueueEntry(e.event, e.target.instances))
  73. elif isinstance(e.target, OutputPortTarget):
  74. assert (e.time_offset == duration(0)) # cannot combine 'after' with 'output port'
  75. pipe_events.append(e.event)
  76. else:
  77. raise Exception("Unexpected type:", e.target)
  78. if pipe_events:
  79. pipe.put(pipe_events, block=True, timeout=None)
  80. def _initialize(self, now: Optional[Timestamp], pipe: queue.Queue):
  81. # first run...
  82. # initialize the object manager, in turn initializing our default class
  83. # and adding the generated events to the queue
  84. for i in self.object_manager.instances:
  85. events = i.initialize()
  86. self._process_big_step_output(events, pipe)
  87. print_debug("initialized. time is now %s" % str(self.get_simulated_duration()))
  88. # Next call to 'run_until' will call '_run_until'
  89. self.run_until = self._run_until
  90. # Let's try it out :)
  91. self.run_until(now, pipe)
  92. # Run until the event queue has no more due events wrt given timestamp and until all instances are stable.
  93. # If no timestamp is given (now = None), run until event queue is empty.
  94. def _run_until(self, now: Optional[Timestamp], pipe: queue.Queue):
  95. # Actual "event loop"
  96. for timestamp, entry in self.queue.due(now):
  97. if timestamp != self.simulated_time:
  98. # make time leap
  99. self.simulated_time = timestamp
  100. print_debug("\ntime is now %s" % str(self.get_simulated_duration()))
  101. # run all instances for whom there are events
  102. for instance in entry.targets:
  103. output = instance.big_step([entry.event])
  104. # print_debug("completed big step (time = %s)" % str(self.model.globals.delta * self.simulated_time))
  105. self._process_big_step_output(output, pipe)
  106. self.simulated_time = now