controller.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. import queue
  2. import dataclasses
  3. from typing import Dict, List, Optional
  4. from sccd.controller.event_queue import *
  5. from sccd.execution.event import *
  6. from sccd.controller.object_manager import *
  7. from sccd.util.debug import print_debug
  8. from sccd.model.model import *
  9. @dataclasses.dataclass
  10. class InputEvent:
  11. name: str
  12. port: str
  13. parameters: List[Any]
  14. time_offset: Duration
  15. # The Controller class is a primitive that can be used to build backends of any kind:
  16. # Threads, integration with existing event loop, game loop, test framework, ...
  17. # The Controller class itself is NOT thread-safe.
  18. class Controller:
  19. @dataclasses.dataclass(eq=False, frozen=True)
  20. class EventQueueEntry:
  21. event: Event
  22. targets: List[Instance]
  23. def __init__(self, model: AbstractModel):
  24. self.model = model
  25. self.object_manager = ObjectManager(model)
  26. self.queue: EventQueue[EventQueueEntry] = EventQueue()
  27. self.simulated_time = 0 # integer
  28. self.initialized = False
  29. self.model.globals.assert_ready()
  30. # print_debug("model delta is %s" % str(self.model.globals.delta))
  31. def _duration_to_time_offset(self, d: Duration) -> int:
  32. if self.model.globals.delta == duration(0):
  33. return 0
  34. return d // self.model.globals.delta
  35. def add_input(self, input: InputEvent):
  36. if input.name == "":
  37. raise Exception("Input event can't have an empty name.")
  38. # try:
  39. # self.model.globals.inports.get_id(input.port)
  40. # except KeyError as e:
  41. # raise Exception("No such port: '%s'" % input.port) from e
  42. try:
  43. event_id = self.model.globals.events.get_id(input.name)
  44. except KeyError as e:
  45. raise Exception("No such event: '%s'" % input.name) from e
  46. offset = self._duration_to_time_offset(input.time_offset)
  47. e = Event(
  48. id=event_id,
  49. name=input.name,
  50. port=input.port,
  51. parameters=input.parameters)
  52. # For now, add events received on input ports to all instances.
  53. # In the future, we can optimize this by keeping a mapping from port name to a list of instances
  54. # potentially responding to the event
  55. self.queue.add(self.simulated_time+offset,
  56. Controller.EventQueueEntry(e, self.object_manager.instances))
  57. # Get timestamp of next entry in event queue
  58. def next_wakeup(self) -> Optional[Timestamp]:
  59. return self.queue.earliest_timestamp()
  60. # Returns duration since start
  61. def get_simulated_duration(self) -> Duration:
  62. return (self.model.globals.delta * self.simulated_time)
  63. # Run until the event queue has no more due events wrt given timestamp and until all instances are stable.
  64. # If no timestamp is given (now = None), run until event queue is empty.
  65. def run_until(self, now: Optional[Timestamp], pipe: queue.Queue):
  66. # unstable: List[Instance] = []
  67. # Helper. Put big step output events in the event queue or add them to the right output listeners.
  68. def process_big_step_output(events: List[OutputEvent]):
  69. pipe_events = []
  70. for e in events:
  71. if isinstance(e.target, InstancesTarget):
  72. offset = self._duration_to_time_offset(e.time_offset)
  73. self.queue.add(self.simulated_time + offset, Controller.EventQueueEntry(e.event, e.target.instances))
  74. elif isinstance(e.target, OutputPortTarget):
  75. assert (e.time_offset == 0) # cannot combine 'after' with 'output port'
  76. pipe_events.append(e.event)
  77. else:
  78. raise Exception("Unexpected type:", e.target)
  79. if pipe_events:
  80. pipe.put(pipe_events, block=True, timeout=None)
  81. if not self.initialized:
  82. self.initialized = True
  83. # first run...
  84. # initialize the object manager, in turn initializing our default class
  85. # and adding the generated events to the queue
  86. for i in self.object_manager.instances:
  87. stable, events = i.initialize(self.simulated_time)
  88. process_big_step_output(events)
  89. # if not stable:
  90. # unstable.append(i)
  91. print_debug("initialized. time is now %s" % str(self.get_simulated_duration()))
  92. # Actual "event loop"
  93. # TODO: What is are the right semantics for this loop?
  94. # Should we stabilize every object after it has made a big step?
  95. # Should we only stabilize when there are no more events?
  96. # Should we never stabilize?
  97. # Should this be a semantic option?
  98. # while unstable or self.queue.is_due(now):
  99. # 1. Handle events
  100. for timestamp, entry in self.queue.due(now):
  101. # check if there's a time leap
  102. if timestamp is not self.simulated_time:
  103. # before every "time leap", continue to run instances until they are stable.
  104. # if not do_stabilize():
  105. # return
  106. # make time leap
  107. self.simulated_time = timestamp
  108. print_debug("\ntime is now %s" % str(self.get_simulated_duration()))
  109. # run all instances for whom there are events
  110. for instance in entry.targets:
  111. stable, output = instance.big_step(timestamp, [entry.event])
  112. # print_debug("completed big step (time = %s)" % str(self.model.globals.delta * self.simulated_time))
  113. process_big_step_output(output)
  114. # if not stable:
  115. # unstable.append(instance)
  116. # 2. No more due events -> stabilize
  117. # if not do_stabilize():
  118. # return
  119. self.simulated_time = now