Bläddra i källkod

add testing framework

Joeri Exelmans 8 månader sedan
förälder
incheckning
f6e38b017d
5 ändrade filer med 267 tillägg och 21 borttagningar
  1. 4 4
      microwave/lib/gui.py
  2. 15 9
      microwave/lib/simulator.py
  3. 160 0
      microwave/lib/test.py
  4. 29 8
      microwave/run.py
  5. 59 0
      microwave/test.py

+ 4 - 4
microwave/lib/gui.py

@@ -146,15 +146,15 @@ class GUI(tkinter.Frame):
             audio.stop_running()
 
     def handle_event(self, event, param):
-        if event == "turnMagnetronOn":
+        if event == "turn_magnetron_on":
             self.running = True
             self.refresh_background()
-        elif event == "turnMagnetronOff":
+        elif event == "turn_magnetron_off":
             self.running = False
             self.refresh_background()
-        elif event == "setDisplayedTime":
+        elif event == "set_displayed_time":
             self.setTime(param)
-        elif event == "ringBell":
+        elif event == "ring_bell":
             audio.play_bell()
                              
     def setTime(self, time: int):

+ 15 - 9
microwave/lib/simulator.py

@@ -21,7 +21,7 @@ class Controller:
 
     def start(self):
         now = time.perf_counter_ns()
-        self.simulated_time = now
+        # self.simulated_time = 0
         self.start_time = now
 
     def add_input(self, timestamp, sc, event, value=None):
@@ -32,6 +32,10 @@ class Controller:
             callback = raise_method
         self.add_input_lowlevel(timestamp, callback, event)
 
+    def add_input_relative(self, sc, event, value=None, time_offset=0):
+        timestamp = self.simulated_time + time_offset
+        self.add_input(timestamp, sc, event, value)
+
     def add_input_lowlevel(self, timestamp, callback, debug):
         e = QueueEntry(timestamp, callback, debug)
         self.event_queue.append(e)
@@ -53,7 +57,7 @@ class Controller:
         return self.event_queue[-1].timestamp
 
     def get_simtime_seconds(self):
-        return (self.simulated_time - self.start_time) / 1000000000
+        return (self.simulated_time) / 1000000000
 
 # Our own timer service, used by the statechart.
 # Much better than YAKINDU's pathetic timer service.
@@ -99,8 +103,10 @@ class RealTimeSimulation:
 
     def add_input(self, sc, event, value=None):
         now = time.perf_counter_ns() + self.purposefully_behind
-        self.controller.add_input(now, sc, event, value)
+        timestamp = now - self.controller.start_time
+        self.controller.add_input(timestamp, sc, event, value)
         self.interrupt()
+        return timestamp
 
     def interrupt(self):
         if self.scheduled_id is not None:
@@ -109,15 +115,15 @@ class RealTimeSimulation:
         if self.controller.have_event():
             now = time.perf_counter_ns()
             earliest = self.controller.get_earliest()
-            sleep_time = earliest - now
+            sleep_time = earliest - (now - self.controller.start_time)
 
-            if sleep_time < 0:
-                self.purposefully_behind = sleep_time
-            else:
-                self.purposefully_behind = 0
+            # if sleep_time < 0:
+            #     self.purposefully_behind = sleep_time
+            # else:
+            #     self.purposefully_behind = 0
 
             def callback():
-                self.controller.run_until(now + self.purposefully_behind)
+                self.controller.run_until((now - self.controller.start_time) + self.purposefully_behind)
                 self.update_callback()
                 self.interrupt()
 

+ 160 - 0
microwave/lib/test.py

@@ -0,0 +1,160 @@
+from lib.simulator import Controller, TimerService
+from difflib import ndiff
+
+# Can we ignore event in 'trace' at position 'idx' with respect to idempotency?
+def can_ignore(trace, idx, IDEMPOTENT):
+    (timestamp, event_name, value) = trace[idx]
+    if event_name in IDEMPOTENT:
+        # If the same event occurred earlier, with the same parameter value, then this event can be ignored:
+        for (earlier_timestamp, earlier_event_name, earlier_value) in reversed(trace[0:idx]):
+            if (earlier_event_name, earlier_value) == (event_name, value):
+                # same event name and same parameter value (timestamps allowed to differ)
+                return True
+            elif event_name == earlier_event_name:
+                # same event name, but different parameter value:
+                # stop looking into the past:
+                break
+        # If the same event occurs later event, but with the same timestamp, this event is overwritten and can be ignored:
+        for (later_timestamp, later_event_name, later_value) in trace[idx+1:]:
+            if (later_timestamp, later_event_name) == (timestamp, event_name):
+                # if a later event with same name and timestamp occurs, ours will be overwritten:
+                return True
+            if later_timestamp != timestamp:
+                # no need to look further into the future:
+                break
+    return False
+
+def preprocess_trace(trace, INITIAL, IDEMPOTENT):
+    # Prepend trace with events that set assumed initial state:
+    result = [(0, event_name, value) for (event_name, value) in INITIAL] + trace
+    # Remove events that have no effect:
+    while True:
+        filtered = [tup for (idx, tup) in enumerate(result) if not can_ignore(result, idx, IDEMPOTENT)]
+        # Keep on filtering until no more events could be removed:
+        if len(filtered) == len(result):
+            return filtered
+        result = filtered
+
+def compare_traces(expected, actual):
+    i = 0
+    while i < len(expected) and i < len(actual):
+        # Compare tuples:
+        if expected[i] != actual[i]:
+            print("Traces differ!")
+            # print("expected: (%i, \"%s\", %s)" % expected[i])
+            # print("actual: (%i, \"%s\", %s)" % actual[i])
+            return False
+        i += 1
+    if len(expected) != len(actual):
+        print("Traces have different length:")
+        print("expected length: %i" % len(expected))
+        print("actual length: %i" % len(actual))
+        return False
+    print("Traces match.")
+    return True
+
+def print_trace(trace, indent=0):
+    print("[")
+    for (timestamp, event_name, value) in trace:
+        print((" "*indent)+"    (%i, \"%s\", %s)," % (timestamp, event_name, value))
+    print((" "*indent)+"]", end='')
+
+def run_test(input_trace, expected_output_trace, statechart_class, INITIAL, IDEMPOTENT, verbose=False):
+    # simulation will be run until timestamp of last event in expected output trace:
+    if len(expected_output_trace) > 0:
+        last_output_event_timestamp = expected_output_trace[-1][0]
+    else:
+        last_output_event_timestamp = 0
+    actual_output_trace = []
+
+    controller = Controller()
+    sc = statechart_class()
+    sc.timer_service = TimerService(controller)
+
+    class LoggingObserver():
+        def __init__(self, event_name):
+            self.event_name = event_name
+
+        def next(self, value=None):
+            tup = (controller.simulated_time, self.event_name, value)
+            actual_output_trace.append(tup)
+
+    for attr in sc.__dict__:
+        if attr.endswith("_observable"):
+            sc.__dict__[attr].subscribe(LoggingObserver(attr[:-11])) # strip '_observable' suffix from attr
+
+    # Put all input events into event queue:
+    for tup in input_trace:
+        (timestamp, event_name, value) = tup
+        controller.add_input(timestamp, sc, event_name)
+
+    controller.start() # this only sets the 'start_time' attribute
+    sc.enter() # enter default state(s)
+    controller.run_until(last_output_event_timestamp) # this actually runs the simulation
+
+    # clean_expected = preprocess_trace(expected_output_trace, INITIAL, IDEMPOTENT)
+    # clean_actual   = preprocess_trace(actual_output_trace, INITIAL, IDEMPOTENT)
+
+    clean_expected = expected_output_trace
+    clean_actual = actual_output_trace
+    
+    def print_diff():
+        # The diff printed will be a diff of the 'raw' traces, not of the cleaned up traces
+        # A diff of the cleaned up traces would be confusing to the user.
+        have_plus = False
+        have_minus = False
+        have_useless = False
+        for diffline in ndiff(
+                [str(tup)+'\n' for tup in expected_output_trace],
+                [str(tup)+'\n' for tup in actual_output_trace],
+                charjunk=None,
+            ):
+            symbol = diffline[0]
+            if symbol == '+':
+                have_plus = True
+            if symbol == '-':
+                have_minus = True
+            if symbol == '?':
+                continue
+            rest = diffline[2:-1] # drop last character (=newline)
+            useless_line = (
+                   symbol == '-' and rest not in [str(tup) for tup in clean_expected]
+                or symbol == '+' and rest not in [str(tup) for tup in clean_actual]
+                # or symbol == ' ' and rest not in [str(tup) for tup in clean_actual]
+            )
+            if useless_line:
+                print(" (%s) %s" % (symbol, rest))
+                have_useless = True
+            else:
+                print("  %s  %s" % (symbol, rest))
+
+        if have_minus or have_plus or have_useless:
+            print("Legend:")
+        if have_minus:
+            print("  -: expected, but did not happen")
+        if have_plus:
+            print("  +: happened, but was not expected")
+        if have_useless:
+            print("  (-) or (+): indicates a \"useless event\" (because it has no effect), either in expected output (-) or in actual output (+).")
+            print("\n\"Useless events\" are ignored by the comparison algorithm, and will never cause your test to fail. In this assignment, your solution is allowed to contain useless events.")
+
+    if not compare_traces(clean_expected, clean_actual):
+        print("Raw diff between expected and actual output event trace:")
+        print_diff()
+        return False
+    elif verbose:
+        print_diff()
+    return True
+
+# verbose: even print a trace if the test succeeded.
+def run_scenarios(statechart_class, SCENARIOS, INITIAL, IDEMPOTENT, verbose=False):
+    passed = True
+    for scenario in SCENARIOS:
+        print("\nRunning scenario:", scenario['name'])
+        if not run_test(scenario['input_trace'], scenario['output_trace'], statechart_class, INITIAL, IDEMPOTENT, verbose):
+            passed = False
+
+    if passed:
+        print("All scenarios passed.")
+    else:
+        print("Some scenarios failed.")

+ 29 - 8
microwave/run.py

@@ -1,9 +1,11 @@
 import sys
 import importlib.util
+import atexit
 
 import tkinter
 from tkinter.constants import BOTH, NO
 from lib import gui, simulator
+from lib.test import print_trace
 
 def my_import(name, path):
     spec = importlib.util.spec_from_file_location(name, path)
@@ -13,6 +15,9 @@ def my_import(name, path):
     return module
 
 def run_demo(model_module, time_scale):
+    input_trace = []
+    output_trace = []
+
     sc = model_module.Statechart()
     controller = simulator.Controller()
     sc.timer_service = simulator.TimerService(controller, time_scale)
@@ -30,7 +35,8 @@ def run_demo(model_module, time_scale):
     sim = simulator.RealTimeSimulation(controller, toplevel, on_update)
 
     def send_event(event: str):
-        sim.add_input(sc, event)
+        timestamp = sim.add_input(sc, event)
+        input_trace.append((timestamp, event, None))
 
     g.send_event = send_event
 
@@ -41,25 +47,40 @@ def run_demo(model_module, time_scale):
         def next(self, value=None):
             print("Time = %03dms - Output event: %s" % (controller.get_simtime_seconds()*1000, self.event))
             g.handle_event(self.event, value)
-
-    sc.turn_magnetron_on_observable.subscribe(MyObserver("turnMagnetronOn"))
-    sc.turn_magnetron_off_observable.subscribe(MyObserver("turnMagnetronOff"))
-    sc.set_displayed_time_observable.subscribe(MyObserver("setDisplayedTime"))
-    sc.ring_bell_observable.subscribe(MyObserver("ringBell"))
+            output_trace.append((controller.simulated_time, self.event, value))
+
+    sc.turn_magnetron_on_observable.subscribe(MyObserver("turn_magnetron_on"))
+    sc.turn_magnetron_off_observable.subscribe(MyObserver("turn_magnetron_off"))
+    sc.set_displayed_time_observable.subscribe(MyObserver("set_displayed_time"))
+    sc.ring_bell_observable.subscribe(MyObserver("ring_bell"))
+
+    def print_exit_trace():
+        print("Exiting...")
+        print("Full trace (you can add this to the SCENARIOS in test.py)...")
+        print("{")
+        print("    \"name\": \"interactive\",")
+        print("    \"input_trace\": ", end='')
+        print_trace(input_trace, 4)
+        print(",")
+        print("    \"output_trace\": ", end='')
+        print_trace(output_trace, 4)
+        print(",")
+        print("}")
+
+    atexit.register(print_exit_trace)
 
     controller.start()
     sc.enter()
     sim.interrupt() # schedule first wakeup
     tk.mainloop()
 
-
 if __name__ == "__main__":
     if len(sys.argv) < 2:
         print("Usage:")
         print("  python run.py MODELNAME [TIME_SCALE]")
         print("")
         print("Example:")
-        print("  python run.py 50_ChildLock 2.0")
+        print("  python run.py 50_History 2.0")
         sys.exit(1)
 
     model_name = sys.argv[1]

+ 59 - 0
microwave/test.py

@@ -0,0 +1,59 @@
+import sys
+import importlib.util
+
+import tkinter
+from tkinter.constants import BOTH, NO
+from lib import gui, simulator
+from lib import test as testing_framework
+from run import my_import
+
+SCENARIOS = [
+{
+    "name": "magnetron shuts off when door opened",
+    "input_trace": [
+        (1739451957, "start_pressed", None),
+        (4262358465, "door_opened", None),
+    ],
+    "output_trace": [
+        (1739451957, "set_displayed_time", 10),
+        (1739451957, "turn_magnetron_on", None),
+        (2739451957, "set_displayed_time", 9),
+        (3739451957, "set_displayed_time", 8),
+        (4262358465, "turn_magnetron_off", None),
+    ],
+}
+]
+
+IDEMPOTENT = [
+    "turn_magnetron_on",
+    "turn_magnetron_off",
+    "set_display_time",
+]
+
+INITIAL = [
+    ("turn_magnetron_off", None),
+    ("set_display_time", 0),
+]
+
+if __name__ == "__main__":
+    if len(sys.argv) != 2:
+        print("Usage:")
+        print("  python test.py MODELNAME")
+        print("")
+        print("Example:")
+        print("  python test.py 50_History")
+        sys.exit(1)
+
+    model_name = sys.argv[1]
+
+    model_module = my_import("sc", "YAKINDU_WORKSPACE/" + model_name + "/srcgen/statechart.py")
+    # run_test(model_module)
+
+    for scenario in SCENARIOS:
+        print("Running scenario:", scenario["name"])
+        testing_framework.run_test(scenario["input_trace"],
+            scenario["output_trace"],
+            model_module.Statechart,
+            INITIAL,
+            IDEMPOTENT,
+            verbose=False)