|
@@ -7,6 +7,9 @@ from sccd.controller.object_manager import *
|
|
|
from sccd.util.debug import print_debug
|
|
|
from sccd.cd.cd import *
|
|
|
|
|
|
+def _dummy_output_callback(output_event):
|
|
|
+ pass
|
|
|
+
|
|
|
# The Controller class is a primitive that can be used to build backends of any kind:
|
|
|
# Threads, integration with existing event loop, game loop, test framework, ...
|
|
|
# The Controller class itself is NOT thread-safe.
|
|
@@ -16,17 +19,27 @@ class Controller:
|
|
|
@dataclasses.dataclass(eq=False, frozen=True)
|
|
|
class EventQueueEntry:
|
|
|
__slots__ = ["event", "targets"]
|
|
|
- event: Event
|
|
|
+ event: InternalEvent
|
|
|
targets: List[Instance]
|
|
|
|
|
|
- def __init__(self, cd: AbstractCD):
|
|
|
+
|
|
|
+ def __init__(self, cd: AbstractCD, output_callback: Callable[[List[OutputEvent]],None] = _dummy_output_callback):
|
|
|
+ cd.globals.assert_ready()
|
|
|
self.cd = cd
|
|
|
- self.object_manager = ObjectManager(cd)
|
|
|
- self.queue: EventQueue[int, EventQueueEntry] = EventQueue()
|
|
|
|
|
|
self.simulated_time = 0 # integer
|
|
|
|
|
|
- self.cd.globals.assert_ready()
|
|
|
+ def schedule_after(after, event, instances):
|
|
|
+ entry = Controller.EventQueueEntry(event, instances)
|
|
|
+ self.queue.add(self.simulated_time + after, entry)
|
|
|
+ return entry
|
|
|
+
|
|
|
+ def cancel_after(entry):
|
|
|
+ self.queue.remove(entry)
|
|
|
+
|
|
|
+ self.object_manager = ObjectManager(cd, output_callback, schedule_after, cancel_after)
|
|
|
+
|
|
|
+ self.queue: EventQueue[int, EventQueueEntry] = EventQueue()
|
|
|
|
|
|
if DEBUG:
|
|
|
self.cd.print()
|
|
@@ -36,86 +49,66 @@ class Controller:
|
|
|
self.run_until = self._run_until_w_initialize
|
|
|
|
|
|
|
|
|
- def add_input(self, input: Event, time_offset: int):
|
|
|
- if input.name == "":
|
|
|
- raise Exception("Input event can't have an empty name.")
|
|
|
-
|
|
|
- try:
|
|
|
- self.cd.globals.inports.get_id(input.port)
|
|
|
- except KeyError as e:
|
|
|
- raise Exception("No such port: '%s'" % input.port) from e
|
|
|
+ def get_model_delta(self) -> Duration:
|
|
|
+ return self.cd.globals.delta
|
|
|
|
|
|
- try:
|
|
|
- event_id = self.cd.globals.events.get_id(input.name)
|
|
|
- except KeyError as e:
|
|
|
- raise Exception("No such event: '%s'" % input.name) from e
|
|
|
+ def _schedule(self, timestamp: int, event: InternalEvent, instances: List[Instance]):
|
|
|
+ self.queue.add(timestamp, Controller.EventQueueEntry(event, instances))
|
|
|
|
|
|
- input.id = event_id
|
|
|
+ def _inport_to_instances(self, port: str) -> List[Instance]:
|
|
|
+ try:
|
|
|
+ self.cd.globals.inports.get_id(port)
|
|
|
+ except KeyError as e:
|
|
|
+ raise Exception("No such port: '%s'" % port) from e
|
|
|
|
|
|
- # For now, add events received on input ports to all instances.
|
|
|
- # In the future, we can optimize this by keeping a mapping from port name to a list of instances
|
|
|
- # potentially responding to the event
|
|
|
- self.queue.add(self.simulated_time + time_offset,
|
|
|
- Controller.EventQueueEntry(input, self.object_manager.instances))
|
|
|
+ # For now, we just broadcast all input events.
|
|
|
+ # We don't even check if the event is allowed on the input port.
|
|
|
+ # TODO: multicast event only to instances that subscribe to this port.
|
|
|
+ return self.object_manager.instances
|
|
|
+
|
|
|
+ def add_input(self, timestamp: int, port: str, event_name: str, params = []):
|
|
|
+ try:
|
|
|
+ event_id = self.cd.globals.events.get_id(event_name)
|
|
|
+ except KeyError as e:
|
|
|
+ raise Exception("No such event: '%s'" % event_name) from e
|
|
|
+
|
|
|
+ instances = self._inport_to_instances(port)
|
|
|
+ event = InternalEvent(event_id, event_name, params)
|
|
|
+
|
|
|
+ self._schedule(timestamp, event, instances)
|
|
|
|
|
|
# Get timestamp of next entry in event queue
|
|
|
def next_wakeup(self) -> Optional[int]:
|
|
|
return self.queue.earliest_timestamp()
|
|
|
|
|
|
- # Returns duration since start
|
|
|
- def get_simulated_duration(self) -> Duration:
|
|
|
- return (self.cd.globals.delta * self.simulated_time)
|
|
|
-
|
|
|
- def _run_until_w_initialize(self, now: Optional[int], pipe: queue.Queue):
|
|
|
+ def _run_until_w_initialize(self, now: Optional[int]):
|
|
|
# first run...
|
|
|
# initialize the object manager, in turn initializing our default class
|
|
|
# and adding the generated events to the queue
|
|
|
for i in self.object_manager.instances:
|
|
|
- events = i.initialize()
|
|
|
- self._process_big_step_output(events, pipe)
|
|
|
- print_debug("initialized. time is now %s" % str(self.get_simulated_duration()))
|
|
|
+ i.initialize()
|
|
|
+ if DEBUG:
|
|
|
+ print("initialized.")
|
|
|
|
|
|
# Next call to 'run_until' will call '_run_until'
|
|
|
self.run_until = self._run_until
|
|
|
|
|
|
# Let's try it out :)
|
|
|
- self.run_until(now, pipe)
|
|
|
+ self.run_until(now)
|
|
|
|
|
|
# Run until the event queue has no more due events wrt given timestamp and until all instances are stable.
|
|
|
# If no timestamp is given (now = None), run until event queue is empty.
|
|
|
- def _run_until(self, now: Optional[int], pipe: queue.Queue):
|
|
|
+ def _run_until(self, now: Optional[int]):
|
|
|
# Actual "event loop"
|
|
|
for timestamp, entry in self.queue.due(now):
|
|
|
if timestamp != self.simulated_time:
|
|
|
# make time leap
|
|
|
self.simulated_time = timestamp
|
|
|
- print_debug("\ntime is now %s" % str(self.get_simulated_duration()))
|
|
|
+ if DEBUG:
|
|
|
+ print("\ntime is now %s" % str(self.cd.globals.delta * self.simulated_time))
|
|
|
# run all instances for whom there are events
|
|
|
for instance in entry.targets:
|
|
|
- output = instance.big_step([entry.event])
|
|
|
+ instance.big_step([entry.event])
|
|
|
# print_debug("completed big step (time = %s)" % str(self.cd.globals.delta * self.simulated_time))
|
|
|
- self._process_big_step_output(output, pipe)
|
|
|
|
|
|
self.simulated_time = now
|
|
|
-
|
|
|
- # Helper. Put big step output events in the event queue or add them to the right output listeners.
|
|
|
- def _process_big_step_output(self, events: List[OutputEvent], pipe: queue.Queue):
|
|
|
- pipe_events = []
|
|
|
- for e in events:
|
|
|
- if isinstance(e.target, InstancesTarget):
|
|
|
- # offset = self._duration_to_time_offset(e.time_offset)
|
|
|
- offset = e.time_offset
|
|
|
- self.queue.add(self.simulated_time + offset, Controller.EventQueueEntry(e.event, e.target.instances))
|
|
|
- elif isinstance(e.target, OutputPortTarget):
|
|
|
- assert (e.time_offset == duration(0)) # cannot combine 'after' with 'output port'
|
|
|
- pipe_events.append(e.event)
|
|
|
- else:
|
|
|
- raise Exception("Unexpected type:", e.target)
|
|
|
- if pipe_events:
|
|
|
- pipe.put(pipe_events, block=True, timeout=None)
|
|
|
-
|
|
|
- # Helper
|
|
|
- def _duration_to_time_offset(self, d: Duration) -> int:
|
|
|
- if self.cd.globals.delta == duration(0):
|
|
|
- return 0
|
|
|
- return d // self.cd.globals.delta
|