Browse Source

Got rid of OutputListener class.

Joeri Exelmans 5 years ago
parent
commit
740c37c7eb
2 changed files with 50 additions and 112 deletions
  1. 16 70
      src/sccd/runtime/controller.py
  2. 34 42
      test/test.py

+ 16 - 70
src/sccd/runtime/controller.py

@@ -1,54 +1,14 @@
 import threading
+import queue
 from typing import Dict, List
 from sccd.runtime.event_queue import EventQueue, EventQueueDeque, Timestamp
-from queue import Queue, Empty
 from sccd.runtime.event import *
 from sccd.runtime.object_manager import ObjectManager
 from sccd.runtime.infinity import INFINITY
 
-class OutputListener:
-    def __init__(self):
-        self.queue = Queue() # queue of lists of event objects
-        self.done = False
-
-    """ Called at the end of a big step with a list of events.
-
-        Parameters
-        ----------
-        events: list of Event objects """
-    def signal_output(self, events):
-        assert not self.done
-        self.queue.put_nowait(("output", events))
-
-    def signal_exception(self, exception):
-        assert not self.done
-        self.queue.put_nowait(("exception", exception))
-
-    def signal_done(self):
-        assert not self.done
-        self.done = True
-        self.queue.put_nowait(("done", None))
-
-    """ Fetch next element without blocking.
-        If no element is available, None is returned. """
-    def fetch_nonblocking(self):
-        try:
-            return self.queue.get_nowait()
-        except Empty:
-            return None
-
-    """ Fetch next element from listener, blocking until an element is available.
-        If the given timeout is exceeded, None is returned.
-
-        Parameters
-        ----------
-        timeout: Max time to block (in seconds). None if allowed to block forever. """
-    def fetch_blocking(self, timeout=None):
-        try:
-            return self.queue.get(True, timeout);
-        except Empty:
-            return None
-
+# The Controller class is a primitive that can be used to build backends of any kind:
+# Threads, integration with existing event loop, game loop, test framework, ...
+# The Controller class itself is NOT thread-safe.
 class Controller:
     # Data class
     class EventQueueEntry:
@@ -67,13 +27,12 @@ class Controller:
         self.object_manager = ObjectManager(model)
         self.queue: EventQueue[EventQueueEntry] = EventQueue()
 
-        # keep track of input ports
-        self.output_listeners: Dict[str, List[OutputListener]] = {} # dictionary from port name to list of OutputListener objects
-
         self.simulated_time = 0 # integer
         self.initialized = False
 
-    def addInput(self, event: Event, time_offset = 0):
+    # time_offset: the offset relative to the current simulated time
+    # (the timestamp given in the last call to run_until)
+    def add_input(self, event: Event, time_offset = 0):
             if event.name == "":
                 raise Exception("Input event can't have an empty name.")
         
@@ -85,22 +44,8 @@ class Controller:
             # potentially responding to the event
             self.queue.add(self.simulated_time+time_offset, Controller.EventQueueEntry(event, self.object_manager.instances))
 
-    def createOutputListener(self, ports):
-        listener = OutputListener()
-        self.addOutputListener(ports, listener)
-        return listener
-
-    def addOutputListener(self, ports, listener):
-        if len(ports) == 0:
-            # add to all the ports
-            if self.model.outports:
-                self.addOutputListener(self.model.outports, listener)
-        else:
-            for port in ports:
-                self.output_listeners.setdefault(port, []).append(listener)
-
     # The following 2 methods are the basis of any kind of event loop,
-    # regardless of the platform (Threads, integration with existing event loop, game loop, test framework, ...)
+    # regardless of the platform ()
 
     # Get timestamp of next entry in event queue
     def next_wakeup(self) -> Timestamp:
@@ -108,24 +53,24 @@ class Controller:
 
     # Run until given timestamp.
     # Simulation continues until there are no more due events wrt timestamp and until all instances are stable.
-    def run_until(self, now: Timestamp):
+    # Output generated while running is written to 'pipe'.
+    def run_until(self, now: Timestamp, pipe: queue.Queue):
 
         unstable = []
 
         # Helper. Put big step output events in the event queue or add them to the right output listeners.
         def process_big_step_output(events: List[OutputEvent]):
-            listener_events = {}
+            pipe_events = []
             for e in events:
                 if isinstance(e.target, InstancesTarget):
                     self.queue.add(self.simulated_time+e.time_offset, Controller.EventQueueEntry(e.event, e.target.instances))
                 elif isinstance(e.target, OutputPortTarget):
                     assert (e.time_offset == 0) # cannot combine 'after' with 'output port'
-                    for listener in self.output_listeners[e.target.outport]:
-                        listener_events.setdefault(listener, []).append(e.event)
+                    pipe_events.append(e.event)
                 else:
                     raise Exception("Unexpected type:", e.target)
-            for listener, events in listener_events.items():
-                listener.signal_output(events)
+            if pipe_events:
+                pipe.put(pipe_events, block=True, timeout=None)
 
         # Helper. Let all unstable instances execute big steps until they are stable
         def run_unstable():
@@ -139,7 +84,8 @@ class Controller:
                     if instance.is_stable():
                         del unstable[i]
             if had_unstable:
-                print("all stabilized.")
+                # print("all stabilized.")
+                pass
 
         if now < self.simulated_time:
             raise Exception("Simulated time can only increase!")

+ 34 - 42
test/test.py

@@ -3,6 +3,7 @@ import importlib
 import unittest
 import argparse
 import threading
+import queue
 
 from sccd.compiler.sccdc import generate
 from sccd.compiler.generic_generator import Platforms
@@ -24,7 +25,6 @@ class PyTestCase(unittest.TestCase):
         return self.name
 
     def runTest(self):
-        print()
         # Get src_file and target_file modification times
         src_file_mtime = os.path.getmtime(self.src_file)
         target_file_mtime = 0
@@ -34,6 +34,7 @@ class PyTestCase(unittest.TestCase):
             pass
 
         if src_file_mtime > target_file_mtime:
+            # (Re-)Compile test
             os.makedirs(os.path.dirname(self.target_file), exist_ok=True)
             try:
                 generate(self.src_file, self.target_file, "python", Platforms.Threads)
@@ -41,81 +42,72 @@ class PyTestCase(unittest.TestCase):
                 self.skipTest("meant for different target language.")
                 return
 
+        # Load compiled test
         module = importlib.import_module(os.path.join(BUILD_DIR, self.name).replace(os.path.sep, "."))
         inputs = module.Test.input_events
         expected = module.Test.expected_events # list of lists of Event objects
-
         model = module.Model()
-        controller = Controller(model)
-
-        output_ports = set()
-        expected_result = [] # what happens here is basically a deep-copy of the list-of-lists, why?
-        for s in expected:
-            slot = []
-            for event in s:
-                slot.append(event)
-                output_ports.add(event.port)
-            if slot:
-                expected_result.append(slot)
 
-        output_listener = controller.createOutputListener(list(output_ports))
+        controller = Controller(model)
 
         # generate input
         if inputs:
             for i in inputs:
-                controller.addInput(Event(i.name, i.port, i.parameters), int(i.time_offset* 1000))
+                controller.add_input(Event(i.name, i.port, i.parameters), int(i.time_offset))
+
+        pipe = queue.Queue()
 
-        def run_model():
+        def model_thread():
             try:
-                # run as-fast-as-possible, always advancing time to the next item in event queue, no sleeping
-                # the call returns when the event queue is empty
-                controller.run_until(INFINITY)
+                # Run as-fast-as-possible, always advancing time to the next item in event queue, no sleeping.
+                # The call returns when the event queue is empty and therefore the simulation is finished.
+                controller.run_until(INFINITY, pipe)
             except Exception as e:
-                output_listener.signal_exception(e)
+                pipe.put(e, block=True, timeout=None)
                 return
-            output_listener.signal_done()
+            pipe.put(None, block=True, timeout=None)
 
         # start the controller
-        thread = threading.Thread(target=run_model)
+        thread = threading.Thread(target=model_thread)
         thread.start()
 
         # check output
         slot_index = 0
         while True:
-            what, arg = output_listener.fetch_blocking()
-            if what == "exception":
+            output = pipe.get(block=True, timeout=None)
+            if isinstance(output, Exception):
                 thread.join()
-                raise arg
-            elif what == "done":
-                self.assertEqual(slot_index, len(expected_result), "Less output was received than expected.")
+                raise output # Exception was caught in Controller thread, throw it here instead.
+            elif output is None:
+                self.assertEqual(slot_index, len(expected), "Less output was received than expected.")
                 thread.join()
                 return
-            elif what == "output":
-                self.assertLess(slot_index, len(expected_result), "More output was received than expected.")
-                output_events = arg
-                slot = expected_result[slot_index]
-                print("slot:", slot_index, ", events: ", output_events)
+            else:
+                self.assertLess(slot_index, len(expected), "More output was received than expected.")
+                exp_slot = expected[slot_index]
+                # print("slot:", slot_index, ", events: ", output)
+
+                self.assertEqual(len(exp_slot), len(output), "Slot %d length differs: Expected %s, but got %s instead." % (slot_index, exp_slot, output))
 
                 # sort both expected and actual lists of events before comparing,
                 # in theory the set of events at the end of a big step is unordered
                 key_f = lambda e: "%s.%s"%(e.port, e.name)
-                slot.sort(key=key_f)
-                output_events.sort(key=key_f)
+                exp_slot.sort(key=key_f)
+                output.sort(key=key_f)
 
-                self.assertEqual(len(slot), len(output_events), "Slot %d: Expected output events: %s, instead got: %s" % (slot_index, str(slot), str(output_events)))
-                for (expected, actual) in zip(slot, output_events):
+                for (exp_event, event) in zip(exp_slot, output):
                     matches = True
-                    if expected.name != actual.name :
+                    if exp_event.name != event.name :
                         matches = False
-                    if expected.port != actual.port :
+                    if exp_event.port != event.port :
                         matches = False
-                    if len(expected.parameters) != len(actual.parameters) :
+                    if len(exp_event.parameters) != len(event.parameters) :
                         matches = False
-                    for index in range(len(expected.parameters)) :
-                        if expected.parameters[index] !=  actual.parameters[index]:
+                    for index in range(len(exp_event.parameters)) :
+                        if exp_event.parameters[index] !=  event.parameters[index]:
                             matches = False
 
-                self.assertTrue(matches, self.src_file + ", expected results slot " + str(slot_index) + " mismatch. Expected " + str(expected) + ", but got " + str(actual) +  " instead.") # no match found in the options
+                self.assertTrue(matches, "Slot %d entry differs: Expected %s, but got %s instead." % (slot_index, exp_slot, output))
                 slot_index += 1
 
 if __name__ == '__main__':