Sfoglia il codice sorgente

Add new chatroom example. Added threads-backend and threadsafe eventloop-backend.

Joeri Exelmans 5 anni fa
parent
commit
4bb02bb8e3

+ 0 - 0
examples/chatclient/lib/__init__.py


+ 103 - 0
examples/chatclient/lib/network_client.py

@@ -0,0 +1,103 @@
+import threading
+import socket
+import sys
+import io
+import queue
+from dataclasses import dataclass
+from typing import *
+
+SERVERS = [ "localhost:9000", "localhost:9001" ]
+
+# returns the number of hardcoded servers of the client
+def get_nr_of_servers():
+    return len(SERVERS)
+
+# gets the information of the server with the index provided
+def get_server(i):
+    return SERVERS[i]
+
+from sccd.action_lang.static.types import *
+
+SCCD_EXPORTS = {
+    "get_nr_of_servers": (get_nr_of_servers, SCCDFunction([], SCCDInt)),
+    "get_server": (get_server, SCCDFunction([SCCDInt], SCCDString)),
+}
+
+
+from sccd.controller.controller import *
+from sccd.realtime.eventloop import *
+
+class NetworkClient:
+    def __init__(self, eventloop: ThreadSafeEventLoop):
+        self.eventloop = eventloop
+        self.queue = queue.Queue() # output events from the statechart (for us to handle) are added to this queue
+        self.recv_thread = None
+
+    # Starts the network client in a new thread.
+    # Networking stuff must run in its own thread, because socket IO is blocking.
+    def start(self):
+        def event_handler_thread():
+            while True:
+                name, params = self.queue.get()
+
+                if name == "connect":
+                    address = params[0]
+                    host, port = address.split(':')
+                    port = int(port)
+                    # print("NetworkClient: attempting to connect...")
+                    try:
+                        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                        sock.connect((host, port))
+
+                        def recv_thread():
+                            file = sock.makefile()
+                            for line in file:
+                                if line.startswith("ACK JOIN"):
+                                    self.eventloop.add_input_now_threadsafe("network", "joined")
+                                elif line.startswith("ACK LEAVE"):
+                                    self.eventloop.add_input_now_threadsafe("network", "left")
+                                elif line.startswith("MSG"):
+                                    msg = line[4:]
+                                    self.eventloop.add_input_now_threadsafe("network", "receive_message", [msg])
+                                elif line.startswith("ALIVE"):
+                                    self.eventloop.add_input_now_threadsafe("network", "alive")
+
+                        self.recv_thread = threading.Thread(target=recv_thread)
+                        self.recv_thread.daemon = True
+                        self.recv_thread.start()
+                        # print("NetworkClient: connected: started recv_thread")
+
+                        self.eventloop.add_input_now_threadsafe("network", "connected")
+                    except ConnectionError:
+                        pass
+
+
+                elif name == "disconnect":
+                    sock.close()
+
+                    # print("NetworkClient: waiting for recv_thread...")
+                    self.recv_thread.join()
+                    self.recv_thread = None
+                    # print("NetworkClient: recv_thread is done.")
+
+                    self.eventloop.add_input_now_threadsafe("network", "disconnected")
+
+                elif name == "join":
+                    sock.send(("JOIN " + str(params[0]) + "\n").encode('utf-8'))
+
+                elif name == "leave":
+                    sock.send("LEAVE\n".encode('utf-8'))
+
+                elif name == "poll":
+                    sock.send("POLL\n".encode('utf-8'))
+
+                elif name == "send_message":
+                    sock.send(("MSG " + str(params[0]) + "\n").encode('utf-8'))
+
+
+        t = threading.Thread(target=event_handler_thread)
+        t.daemon = True
+        t.start()
+
+    def add_input(self, name, params):
+        self.queue.put((name, params))

+ 52 - 0
examples/chatclient/lib/scrollable_frame.py

@@ -0,0 +1,52 @@
+## Source: http://tkinter.unpythonic.net/wiki/VerticalScrolledFrame
+
+from tkinter import *
+
+class VerticalScrolledFrame(Frame):
+    """A pure Tkinter scrollable frame that actually works!
+
+    * Use the 'interior' attribute to place widgets inside the scrollable frame
+    * Construct and pack/place/grid normally
+    * This frame only allows vertical scrolling
+    
+    """
+    def __init__(self, parent, *args, **kw):
+        Frame.__init__(self, parent, *args, **kw)            
+
+        # create a canvas object and a vertical scrollbar for scrolling it
+        vscrollbar = Scrollbar(self, orient=VERTICAL)
+        vscrollbar.pack(fill=Y, side=RIGHT, expand=FALSE)
+        canvas = Canvas(self, bd=0, highlightthickness=0,
+                        yscrollcommand=vscrollbar.set)
+        canvas.pack(side=LEFT, fill=BOTH, expand=TRUE)
+        vscrollbar.config(command=canvas.yview)
+
+        # reset the view
+        canvas.xview_moveto(0)
+        canvas.yview_moveto(0)
+
+        # create a frame inside the canvas which will be scrolled with it
+        self.interior = interior = Frame(canvas)
+        interior_id = canvas.create_window(0, 0, window=interior,
+                                           anchor=NW)
+
+        # track changes to the canvas and frame width and sync them,
+        # also updating the scrollbar
+        def _configure_interior(event):
+            # update the scrollbars to match the size of the inner frame
+            size = (interior.winfo_reqwidth(), interior.winfo_reqheight())
+            canvas.config(scrollregion="0 0 %s %s" % size)
+            if interior.winfo_reqwidth() != canvas.winfo_width():
+                # update the canvas's width to fit the inner frame
+                canvas.config(width=interior.winfo_reqwidth())
+            #NOTE my own addition
+            canvas.yview_moveto(1)
+        interior.bind('<Configure>', _configure_interior)
+
+        def _configure_canvas(event):
+            if interior.winfo_reqwidth() != canvas.winfo_width():
+                # update the inner frame's width to fill the canvas
+                canvas.itemconfigure(interior_id, width=canvas.winfo_width())
+        canvas.bind('<Configure>', _configure_canvas)
+
+        return

+ 96 - 0
examples/chatclient/lib/ui.py

@@ -0,0 +1,96 @@
+import tkinter as tk
+from lib.scrollable_frame import VerticalScrolledFrame
+from sccd.realtime.threads_platform import ThreadsPlatform
+
+class ChatWindowGUI(tk.Tk):
+    def __init__(self):
+        tk.Tk.__init__(self)
+        self.resizable(width=tk.FALSE, height=tk.FALSE)
+        self.width = 230
+        self.height = 100
+        self.labelwidth = 30
+
+        self.frame = tk.Frame(self)
+        self.frame.focus_set()
+        self.chat_field = VerticalScrolledFrame(self.frame, bd='2', height=self.height, width=self.width, relief=tk.RIDGE)
+        tk.Label(self.chat_field.interior, text='SCCD Chat Client -- Tk version', justify=tk.LEFT, anchor=tk.NW, width=self.labelwidth).pack()
+        self.tk_buffer = tk.StringVar()
+        input_frame = tk.Frame(self.frame, bd='2', height=100, width=self.width, relief=tk.RIDGE)
+        self.input_text = tk.Label(input_frame, textvar=self.tk_buffer, anchor=tk.NW, justify=tk.LEFT, wraplength=self.width, width=self.labelwidth, background='grey')
+        self.chat_field.pack(anchor=tk.NW)
+        input_frame.pack(anchor=tk.NW, fill=tk.X)
+        self.input_text.pack(anchor=tk.NW, fill=tk.X)
+        self.frame.pack(anchor=tk.NW)
+    
+    def redraw_buffer(self, text):
+        self.tk_buffer.set(text)
+    
+    def setColor(self, color):
+        self.input_text.configure(background=color)
+        
+    def addMessage(self, msg, color):
+        tk.Label(self.chat_field.interior, text=msg, anchor=tk.NW, justify=tk.LEFT, foreground=color, wraplength=230, width=30).pack(anchor=tk.NW)
+
+colors = {'info': 'black', 'local_message': 'red', 'remote_message': 'blue', 'warning': 'yellow'}
+window = ChatWindowGUI()
+buf = ""
+
+def init(eventloop):
+    global window
+
+    def on_key_press(key):
+        eventloop.add_input_now(port="ui", event_name="input", params=[key.char])
+
+    window.bind('<Key>', on_key_press)
+    # window = ChatWindowGUI(on_key_press)
+
+#     shows the message in the chat window, with the specified type (as a string, either "info", "local_message", or "remote_message"))
+def add_message(msg: str, type: str):
+    window.addMessage(msg, colors[type])
+
+#     adds a string to the input field and visualizes the change
+def append_to_buffer(char: str):
+    global buf
+    buf += char
+    window.redraw_buffer(buf)
+
+#     removes the final character from the input field and visualizes the change
+def remove_last_in_buffer():
+    global buf
+    buf = buf[:-1]
+    window.redraw_buffer(buf)
+
+#     clears the input field and visualizes the change
+def clear_input():
+    global buf
+    buf = ""
+    window.redraw_buffer(buf)
+
+#     color the input field in the 'join' mode
+def input_join():
+    window.setColor('green')
+
+#     color the input field in the 'message' mode
+def input_msg():
+    window.setColor('white')
+
+#     color the input field in the 'command' mode
+def input_command():
+    window.setColor('grey')
+
+#     returns the current content of the input field
+def get_buffer() -> str:
+    return buf
+
+from sccd.action_lang.static.types import *
+
+SCCD_EXPORTS = {
+    "add_message": (add_message, SCCDFunction([SCCDString, SCCDString])),
+    "append_to_buffer": (append_to_buffer, SCCDFunction([SCCDString])),
+    "remove_last_in_buffer": (remove_last_in_buffer, SCCDFunction([])),
+    "clear_input": (clear_input, SCCDFunction([])),
+    "input_join": (input_join, SCCDFunction([])),
+    "input_msg": (input_msg, SCCDFunction([])),
+    "input_command": (input_command, SCCDFunction([])),
+    "get_buffer": (get_buffer, SCCDFunction([], SCCDString)),
+}

+ 33 - 0
examples/chatclient/lib/utils.py

@@ -0,0 +1,33 @@
+
+#     removes the last character from the argument string
+def remove_last_char(inp: str) -> str:
+    return inp[:-1]
+
+#     if the argument string represents an integer, returns its integer value
+def stoi(inp: str) -> int:
+    return int(inp)
+
+#     returns true if the argument string represents an integer, else false
+def is_numerical(inp: str) -> bool:
+    try:
+        int(inp)
+        return True
+    except ValueError:
+        print("could not turn into int:", inp)
+        return False
+
+def is_backspace(char):
+    return char == "\b"
+
+def is_enter(char):
+    return char == "\r" # Tkinter uses this symbol for 'enter' key press
+
+from sccd.action_lang.static.types import *
+
+SCCD_EXPORTS = {
+    "remove_last_char": (remove_last_char, SCCDFunction([SCCDString], SCCDString)),
+    "stoi": (stoi, SCCDFunction([SCCDString], SCCDInt)),
+    "is_numerical": (is_numerical, SCCDFunction([SCCDString], SCCDBool)),
+    "is_backspace": (is_backspace, SCCDFunction([SCCDString], SCCDBool)),
+    "is_enter": (is_enter, SCCDFunction([SCCDString], SCCDBool)),
+}

+ 218 - 0
examples/chatclient/model_chatclient.xml

@@ -0,0 +1,218 @@
+<single_instance_cd>
+  <delta>1 ms</delta>
+
+  <statechart>
+    <datamodel>
+      import sccd.action_lang.lib.utils;
+      
+      import lib.network_client;
+      import lib.ui;
+      import lib.utils;
+
+      # Statechart's variables
+      curr_server = 0;
+      room_number = 0;
+      reconnected = False;
+    </datamodel>
+
+    <inport name="network">
+      <event name="connected"/>
+      <event name="disconnected"/>
+      <event name="joined"/>
+      <event name="left"/>
+      <event name="receive_message"/>
+      <event name="alive"/>
+    </inport>
+
+    <inport name="ui">
+      <event name="input"/>
+    </inport>
+
+    <outport name="network">
+      <event name="connect"/>
+      <event name="disconnect"/>
+      <event name="join"/>
+      <event name="leave"/>
+      <event name="send_message"/>
+      <event name="poll"/>
+    </outport>
+
+    <root>
+      <parallel id="p">
+        <!-- main orthogonal region -->
+        <state id="main" initial="disconnected">
+          <state id="disconnected" initial="initial">
+            <onentry>
+              <code> input_command(); </code>
+            </onentry>
+            <state id="initial">
+              <transition after="100 ms" target="../trying_connect">
+                <code> add_message("trying to connect to server " + get_server(curr_server), "info"); </code>
+                <raise event="connect">
+                  <param expr="get_server(curr_server)"/>
+                </raise>
+              </transition>
+            </state>
+            <state id="trying_connect">
+              <transition event="connected" target="/p/main/connected">
+                <code> add_message("connected to server " + get_server(curr_server), "info"); </code>
+              </transition>
+              <transition after="5 s" target="../initial">
+                <code>
+                  add_message("unable to connect to server " + get_server(curr_server), "info");
+                  curr_server = (curr_server + 1) % get_nr_of_servers();
+                </code>
+              </transition>
+            </state>
+          </state>
+
+          <state id="reconnecting" initial="initial">
+            <onentry>
+              <code> input_command(); </code>
+            </onentry>
+            <state id="initial">
+              <transition after="100 ms" target="../trying_connect">
+                <code> add_message("trying to connect to server " + get_server(curr_server), "info"); </code>
+                <raise event="connect">
+                  <param expr="get_server(curr_server)"/>
+                </raise>
+              </transition>
+            </state>
+            <state id="trying_connect">
+              <transition event="connected" target="/p/main/connected/joining">
+                <code> add_message("connected to server" + get_server(curr_server), "info"); </code>
+              </transition>
+              <transition after="5 s" target="../initial">
+                  <code> add_message("unable to connect to server " + get_server(curr_server), "info"); </code>
+              </transition>
+            </state>
+
+            <!-- ?? -->
+          </state>
+
+          <state id="connected" initial="connected">
+            <transition event="do_disconnect" target="../reconnecting">
+              <code> add_message("disconnected", "info"); </code>
+            </transition>
+
+            <state id="leaving">
+              <transition event="left" target="../connected">
+                <code> add_message("left room", "info"); </code>
+              </transition>
+            </state>
+
+            <state id="connected">
+              <transition event="input(char: str)" cond='char == "j"' target="../getting_roomnumber">
+                <code> input_join(); </code>
+              </transition>
+            </state>
+
+            <state id="getting_roomnumber">
+              <transition event="input(char: str)" cond="is_backspace(char)" target=".">
+                <code> remove_last_in_buffer(); </code>
+              </transition>
+              <transition event="input(char: str)" cond="is_enter(char)" target="../joining">
+                <code>
+                  room_number = stoi(get_buffer());
+                  clear_input();
+                  input_command();
+                </code>
+              </transition>
+              <transition event="input(char: str)" cond="is_numerical(char)" target=".">
+                <code> append_to_buffer(char); </code>
+              </transition>
+              <transition event="input(char: str)" cond="not is_numerical(char)" target=".">
+                <code> add_message("only numerical input allowed!", "warning"); </code>
+              </transition>
+            </state>
+
+            <state id="joining">
+              <onentry>
+                <raise event="join">
+                  <param expr="room_number"/>
+                </raise>
+              </onentry>
+              <onexit>
+              </onexit>
+
+              <transition event="joined" target="../joined/H">
+                <code> add_message("joined room " + int_to_str(room_number), "info"); </code>
+              </transition>
+            </state>
+
+            <state id="joined" initial="initial">
+              <state id="initial">
+                <!--<onentry>
+                  <code> input_command(); </code>
+                </onentry>-->
+
+                <transition event="input(char: str)" cond='char == "m"' target="../entering_message"/>
+                <transition event="input(char: str)" cond='char == "l"' target="../../leaving">
+                  <raise event="leave"/>
+                  <code> input_command(); </code>
+                </transition>
+              </state>
+
+              <state id="entering_message">
+                <onentry>
+                  <code> input_msg(); </code>
+                </onentry>
+                <!-- explicit (document order) priority used here: -->
+                <transition event="input(char: str)" cond="is_backspace(char)" target=".">
+                  <code> remove_last_in_buffer(); </code>
+                </transition>
+                <transition event="input(char: str)" cond="is_enter(char)" target="../initial">
+                  <raise event="send_message">
+                    <param expr="get_buffer()"/>
+                  </raise>
+                  <code>
+                    add_message(get_buffer(), "local_message");
+                    clear_input();
+                    input_command();
+                  </code>
+                </transition>
+                <transition event="input(char: str)" target=".">
+                  <code> append_to_buffer(char); </code>
+                </transition>
+              </state>
+
+              <history id="H" type="shallow"/>
+            </state>
+          </state>
+        </state>
+
+        <!-- polling orthogonal region -->
+        <state id="polling" initial="initial">
+          <state id="initial">
+            <transition event="connected" target="../polling"/>
+          </state>
+          <state id="polling">
+            <transition after="1 s" target="../expecting_answer">
+              <raise event="poll"/>
+              <!-- <code> print("sent POLL"); </code> -->
+            </transition>
+          </state>
+          <state id="expecting_answer">
+            <transition event="alive" target="../polling">
+              <!-- <code> print("got ALIVE"); </code> -->
+            </transition>
+            <transition after="2 s" target="../initial">
+              <raise event="do_disconnect"/>
+              <code> print("polling timeout... disconnect"); </code>
+            </transition>
+          </state>
+        </state>
+
+        <!-- listening orthogonal region -->
+        <state id="listening">
+          <state id="l">
+            <transition event="receive_message(msg:str)" target=".">
+              <code> add_message(msg, "remote_message"); </code>
+            </transition>
+          </state>
+        </state>
+      </parallel>
+    </root>
+
+  </statechart>
+</single_instance_cd>

+ 33 - 0
examples/chatclient/run_client.py

@@ -0,0 +1,33 @@
+from sccd.cd.parser.xml import *
+from sccd.controller.controller import *
+import threading
+import queue
+from sccd.realtime.eventloop import *
+from sccd.realtime.tkinter import *
+
+def main():
+    cd = load_cd("model_chatclient.xml")
+
+    def on_output(event: OutputEvent):
+        if event.port == "network":
+            network.add_input(event.name, event.params)
+
+    from lib import ui, network_client
+
+    controller = Controller(cd, output_callback=on_output)
+    eventloop = ThreadSafeEventLoop(controller, TkInterImplementation(ui.window))
+
+    ui.init(eventloop)
+    network = network_client.NetworkClient(eventloop)
+
+    # This starts the network client in a new thread.
+    network.start()
+
+    # This only sets the 'start time' to the current wall-clock time, initializes the statechart and lets it run for a bit (in this thread) if there are already due events (events with timestamp zero). Then returns.
+    eventloop.start()
+
+    # This takes control of the current thread and runs tk's event loop in it.
+    ui.window.mainloop()
+
+if __name__ == '__main__':
+    main()

+ 82 - 0
examples/chatclient/run_server.py

@@ -0,0 +1,82 @@
+import threading
+import socket
+import sys
+import io
+import collections
+
+BUFFER_SIZE = 1024
+
+if __name__ == "__main__":
+    port = int(sys.argv[1])
+    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+
+    # Thread-safe bidirectional one-to-many mapping between room number and client socket
+    class RoomMapping:
+        def __init__(self):
+            self.client2room = {}
+            self.room2clients = collections.defaultdict(set)
+            self.lock = threading.Lock()
+
+        def join(self, client, room):
+            with self.lock:
+                self.room2clients[room].add(client)
+                self.client2room[client] = room
+
+        def leave(self, client):
+            with self.lock:
+                room = self.client2room[client]
+                self.room2clients[room].remove(client)
+                del self.client2room[client]
+
+        def get_clients_in_same_room(self, client):
+            with self.lock:
+                room = self.client2room[client]
+                return list(self.room2clients[room])
+
+    mapping = RoomMapping()
+
+    def send_line(conn, msg):
+        conn.send((msg+"\n").encode('utf-8'))
+
+    def client_thread(conn, address):
+        with conn:
+            file = conn.makefile();
+            for line in file:
+                if line.startswith("POLL"):
+                    send_line(conn, "ALIVE")
+
+                elif line.startswith("JOIN"):
+                    print("JOIN EVENT SERVER " + line)
+                    room_number = int(line[5:])
+                    mapping.join(conn, room_number)
+                    send_line(conn, "ACK JOIN " + str(room_number))
+                    # print("joined...", mapping.client2room)
+
+                elif line.startswith("LEAVE"):
+                    mapping.leave(conn)
+                    send_line(conn, "ACK LEAVE")
+                    # print("left...", mapping.client2room)
+
+                elif line.startswith("MSG"):
+                    # print("got MSG: ", line)
+                    conns = mapping.get_clients_in_same_room(conn)
+                    for c in conns:
+                        if c is not conn:
+                            send_line(c, line)
+
+                else:
+                    print("Received line does not match protocol: ", line)
+            
+        print("Closed client connection: %s:%s" % address)
+
+    with s:
+        s.bind(("localhost", port))
+        s.listen(5)
+        print("Accepting client connections.")
+
+        while True:
+            conn, address = s.accept()
+            print("Accepted client connection: %s:%s" % address)
+            t = threading.Thread(target=client_thread, args=(conn,address))
+            t.daemon = True
+            t.start()

+ 1 - 3
examples/digitalwatch/run.py

@@ -15,9 +15,7 @@ def main():
     gui = DigitalWatchGUI(topLevel)
 
     def on_gui_event(event: str):
-        controller.add_input(
-            timestamp=eventloop.now(), event_name=event, port="in", params=[])
-        eventloop.interrupt()
+        eventloop.add_input_now(port="in", event_name=event)
 
     gui.controller.send_event = on_gui_event
 

+ 57 - 14
src/sccd/realtime/eventloop.py

@@ -1,6 +1,7 @@
 from abc import *
 from sccd.realtime.time import *
 from sccd.controller.controller import *
+import queue
 
 ScheduledID = Any
 
@@ -19,7 +20,8 @@ class EventLoopImplementation(ABC):
     def cancel(self, id: ScheduledID):
         pass
 
-# Event loop "platform"
+# Event loop "platform".
+# This class is NOT thread-safe.
 class EventLoop:
     def __init__(self, controller: Controller, event_loop: EventLoopImplementation, time_impl: TimeImplementation = DefaultTimeImplementation):
         delta = controller.cd.get_delta()
@@ -38,19 +40,20 @@ class EventLoop:
         self.purposefully_behind = 0
 
     def _wakeup(self):
-        self.controller.run_until(self.timer.now() - self.purposefully_behind)
+        self.controller.run_until(self.timer.now() + self.purposefully_behind)
 
         # back to sleep
-        now = self.timer.now()
         next_wakeup = self.controller.next_wakeup()
+
         if next_wakeup is not None:
-            sleep_duration = self.to_event_loop_unit(next_wakeup - now)
+            sleep_duration = next_wakeup - self.timer.now()
             if sleep_duration < 0:
-                self.purposefully_behind = now - next_wakeup
+                self.purposefully_behind = sleep_duration
                 sleep_duration = 0
             else:
                 self.purposefully_behind = 0
-            self.scheduled = self.event_loop.schedule(sleep_duration, self._wakeup)
+
+            self.scheduled = self.event_loop.schedule(self.to_event_loop_unit(sleep_duration), self._wakeup)
         else:
             self.scheduled = None
 
@@ -59,15 +62,55 @@ class EventLoop:
         self._wakeup()
 
     def now(self):
-        return self.timer.now() - self.purposefully_behind
-
-    # Uncomment if ever needed:
-    # Does not mix well with interrupt().
-    # def pause(self):
-    #     self.timer.pause()
-    #     self.event_loop.cancel()(self.scheduled)
+        return self.timer.now() + self.purposefully_behind
 
-    def interrupt(self):
+    # Add input event with timestamp 'now'
+    def add_input_now(self, port, event_name, params=[]):
+        self.controller.add_input(
+            timestamp=self.now(), port=port, event_name=event_name, params=params)
         if self.scheduled:
             self.event_loop.cancel(self.scheduled)
         self.event_loop.schedule(0, self._wakeup)
+
+
+# Extension to the EventLoop class with a thread-safe method for adding input events.
+# Allows other threads to add input to the Controller, which is useful when doing blocking IO.
+# It is probably cleaner to do async IO and use the regular EventLoop class instead.
+# Input events added in a thread-safe manner are added to a separate (thread-safe) queue. A bit hackish, this queue is regularly checked (polled) for new events from the 3rd party (e.g. Tk) event loop.
+# Perhaps a better alternative to polling is Yentl's tk.event_generate solution.
+class ThreadSafeEventLoop(EventLoop):
+    def __init__(self, controller: Controller, event_loop: EventLoopImplementation, time_impl: TimeImplementation = DefaultTimeImplementation):
+        super().__init__(controller, event_loop, time_impl)
+
+        # thread-safe queue
+        self.queue = queue.Queue()
+
+        # check regularly if queue contains new events
+        self.poll_interval = duration(100, Millisecond) // event_loop.time_unit()
+
+    # override
+    def _wakeup(self):
+        while True:
+            try:
+                timestamp, port, event_name, params = self.queue.get_nowait()
+            except queue.Empty:
+                break
+            self.controller.add_input(timestamp, port, event_name, params)
+
+        self.controller.run_until(self.timer.now() + self.purposefully_behind)
+
+        next_wakeup = self.controller.next_wakeup()
+        if next_wakeup is not None:
+            sleep_duration = next_wakeup - self.timer.now()
+            if sleep_duration < 0:
+                self.purposefully_behind = sleep_duration
+                sleep_duration = 0
+            else:
+                self.purposefully_behind = 0
+            self.scheduled = self.event_loop.schedule(min(self.to_event_loop_unit(sleep_duration), self.poll_interval), self._wakeup)
+        else:
+            self.scheduled = self.event_loop.schedule(self.poll_interval, self._wakeup)
+
+    # Safe to call this method from any thread
+    def add_input_now_threadsafe(self, port, event_name, params=[]):
+        self.queue.put((self.now(), port, event_name, params))

+ 16 - 12
src/sccd/realtime/threads_platform.py

@@ -2,7 +2,6 @@ from sccd.realtime.time import *
 from sccd.controller.controller import *
 from sccd.util.duration import *
 import threading
-import queue
 
 # Thread-safe real-time Controller execution in a thread
 class ThreadsPlatform:
@@ -10,33 +9,33 @@ class ThreadsPlatform:
     def __init__(self, controller: Controller):
         self.controller = controller
         self.timer = Timer(impl=DefaultTimeImplementation, unit=self.controller.cd.get_delta())
-        self.lock = threading.Lock()
+        self.lock = threading.Lock() # A queue would also work, but because of Python's GIL, a queue would not perform better.
         self.condition = threading.Condition()
 
+        # keep simulation responsive even if computer cannot keep up
+        self.purposefully_behind = 0
+
     def create_controller_thread(self) -> threading.Thread:
         def thread():
             # condition object expects fractions of seconds
             to_condition_wait_time = get_conversion_f_float(self.controller.cd.get_delta(), duration(1, Second))
 
-            # keep simulation responsive even if computer cannot keep up
-            purposefully_behind = 0
 
-            # simulation starts "now"
+            # simulation starts "now" (wall-clock time)
             self.timer.start()
 
             while True:
-                # simulate, then sleep:
-
                 with self.lock:
-                    self.controller.run_until(self.timer.now() + purposefully_behind) # may take a while
+                    self.controller.run_until(self.timer.now() + self.purposefully_behind) # may take a while
                     next_wakeup = self.controller.next_wakeup()
 
                 if next_wakeup is not None:
                     sleep_duration = next_wakeup - self.timer.now()
                     if sleep_duration < 0:
-                        purposefully_behind = sleep_duration
+                        self.purposefully_behind = sleep_duration
+                        sleep_duration = 0
                     else:
-                        purposefully_behind = 0
+                        self.purposefully_behind = 0
                     with self.condition:
                         self.condition.wait(to_condition_wait_time(sleep_duration))
                 else:
@@ -45,8 +44,13 @@ class ThreadsPlatform:
 
         return threading.Thread(target=thread)
 
-    def add_input(self, event_name, port, params=[]):
+    def now(self):
+        return self.timer.now() + self.purposefully_behind
+
+    # Add an input event with timestamp "now" (wall-clock time)
+    # Safe to call this method from any thread.
+    def add_input_now(self, port, event_name, params=[]):
         with self.lock:
-            self.controller.add_input(timestamp=self.timer.now(), event_name=event_name, port=port, params=params)
+            self.controller.add_input(timestamp=self.timer.now(), port=port, event_name=event_name, params=params)
         with self.condition:
             self.condition.notify() # wake up controller thread if sleeping

+ 4 - 1
src/sccd/statechart/static/tree.py

@@ -355,11 +355,14 @@ def optimize_tree(root: State) -> StateTree:
             after_children=[
                 set_descendants,
                 calculate_effective_targets,
+            ])
+
+        visit_tree(root, lambda s: s.children,
+            after_children=[
                 deal_with_history,
                 freeze,
             ])
 
-
         for t in transition_list:
             # Arena can be computed statically. First compute Lowest-common ancestor:
             # Intersection between source & target ancestors, last member in depth-first sorted state list.