123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- import queue
- import dataclasses
- from typing import Dict, List, Optional
- from sccd.controller.event_queue import *
- from sccd.statechart.dynamic.event import *
- from sccd.controller.object_manager import *
- from sccd.util.debug import print_debug
- from sccd.model.model import *
- @dataclasses.dataclass
- class InputEvent:
- name: str
- port: str
- params: List[Any]
- time_offset: Duration
- Timestamp = int
- # The Controller class is a primitive that can be used to build backends of any kind:
- # Threads, integration with existing event loop, game loop, test framework, ...
- # The Controller class itself is NOT thread-safe.
- class Controller:
- @dataclasses.dataclass(eq=False, frozen=True)
- class EventQueueEntry:
- event: Event
- targets: List[Instance]
- def __init__(self, model: AbstractModel):
- self.model = model
- self.object_manager = ObjectManager(model)
- self.queue: EventQueue[Timestamp, EventQueueEntry] = EventQueue()
- self.simulated_time = 0 # integer
- self.initialized = False
- self.model.globals.assert_ready()
- # print_debug("model delta is %s" % str(self.model.globals.delta))
- # First call to 'run_until' method initializes
- self.run_until = self._initialize
- def _duration_to_time_offset(self, d: Duration) -> int:
- if self.model.globals.delta == duration(0):
- return 0
- return d // self.model.globals.delta
- def add_input(self, input: InputEvent):
- if input.name == "":
- raise Exception("Input event can't have an empty name.")
-
- # try:
- # self.model.globals.inports.get_id(input.port)
- # except KeyError as e:
- # raise Exception("No such port: '%s'" % input.port) from e
- try:
- event_id = self.model.globals.events.get_id(input.name)
- except KeyError as e:
- raise Exception("No such event: '%s'" % input.name) from e
- offset = self._duration_to_time_offset(input.time_offset)
- e = Event(
- id=event_id,
- name=input.name,
- port=input.port,
- params=input.params)
- # For now, add events received on input ports to all instances.
- # In the future, we can optimize this by keeping a mapping from port name to a list of instances
- # potentially responding to the event
- self.queue.add(self.simulated_time+offset,
- Controller.EventQueueEntry(e, self.object_manager.instances))
- # Get timestamp of next entry in event queue
- def next_wakeup(self) -> Optional[Timestamp]:
- return self.queue.earliest_timestamp()
- # Returns duration since start
- def get_simulated_duration(self) -> Duration:
- return (self.model.globals.delta * self.simulated_time)
- # Helper. Put big step output events in the event queue or add them to the right output listeners.
- def _process_big_step_output(self, events: List[OutputEvent], pipe: queue.Queue):
- pipe_events = []
- for e in events:
- if isinstance(e.target, InstancesTarget):
- offset = self._duration_to_time_offset(e.time_offset)
- self.queue.add(self.simulated_time + offset, Controller.EventQueueEntry(e.event, e.target.instances))
- elif isinstance(e.target, OutputPortTarget):
- assert (e.time_offset == duration(0)) # cannot combine 'after' with 'output port'
- pipe_events.append(e.event)
- else:
- raise Exception("Unexpected type:", e.target)
- if pipe_events:
- pipe.put(pipe_events, block=True, timeout=None)
- def _initialize(self, now: Optional[Timestamp], pipe: queue.Queue):
- # first run...
- # initialize the object manager, in turn initializing our default class
- # and adding the generated events to the queue
- for i in self.object_manager.instances:
- events = i.initialize()
- self._process_big_step_output(events, pipe)
- print_debug("initialized. time is now %s" % str(self.get_simulated_duration()))
- # Next call to 'run_until' will call '_run_until'
- self.run_until = self._run_until
- # Let's try it out :)
- self.run_until(now, pipe)
- # Run until the event queue has no more due events wrt given timestamp and until all instances are stable.
- # If no timestamp is given (now = None), run until event queue is empty.
- def _run_until(self, now: Optional[Timestamp], pipe: queue.Queue):
- # Actual "event loop"
- for timestamp, entry in self.queue.due(now):
- if timestamp != self.simulated_time:
- # make time leap
- self.simulated_time = timestamp
- print_debug("\ntime is now %s" % str(self.get_simulated_duration()))
- # run all instances for whom there are events
- for instance in entry.targets:
- output = instance.big_step([entry.event])
- # print_debug("completed big step (time = %s)" % str(self.model.globals.delta * self.simulated_time))
- self._process_big_step_output(output, pipe)
- self.simulated_time = now
|