Browse Source

Split each task to its own statechart

Yentl Van Tendeloo 8 years ago
parent
commit
dbc6c24330
3 changed files with 324 additions and 118 deletions
  1. 180 118
      hybrid_server/classes/mvkcontroller.xml
  2. 143 0
      hybrid_server/classes/task.xml
  3. 1 0
      hybrid_server/server.xml

+ 180 - 118
hybrid_server/classes/mvkcontroller.xml

@@ -1,6 +1,7 @@
 <class name="MvKController">
     <relationships>
         <association name="to_mvi" class="Server" min="1" max="1"/>
+        <association name="tasks" class="Task"/>
     </relationships>
     <constructor>
         <parameter name="params"/>
@@ -9,6 +10,7 @@
             self.mvs = ModelverseState("../bootstrap/bootstrap.m.gz")
             # Enable Garbage Collection
             self.mvs.GC = True
+
             self.root = self.mvs.read_root()
 
             # Instantiate the kernel.
@@ -65,20 +67,14 @@
                 else:
                     print("warning: unknown kernel option '%s'." % opt)
 
-            self.all_failed = False
-            self.timeout = False
+            self.port = int(sys.argv[1])
 
-            self.tasks = set()
+            self.sc_map = {}
+            self.forward = None
+            self.HTTP_reply = None
             self.input_queue = defaultdict(list)
             self.output_queue = defaultdict(list)
-            self.source = None
-            self.port = int(sys.argv[1])
-            self.count = 0
             
-            self.debugged_tasks = set()
-            self.debug_info = {}
-            self.done_something = False
-
             self.mvs_operations = {
                     "CN": self.mvs.create_node,
                     "CE": self.mvs.create_edge,
@@ -148,121 +144,135 @@
         </state>
 
         <parallel id="running">
+            <state id="forward_inputs">
+                <state id="forward">
+                    <transition cond="set(self.input_queue) &amp; set(self.sc_map)" target=".">
+                        <script>
+                            self.input_task = (set(self.input_queue) &amp; set(self.sc_map)).pop()
+                            value = self.input_queue[self.input_task].pop(0)
+                            if not self.input_queue[self.input_task]:
+                                del self.input_queue[self.input_task]
+                        </script>
+                        <raise event="input" scope="narrow" target="self.sc_map[self.input_task]">
+                            <parameter expr="value"/>
+                        </raise>
+                    </transition>
+                </state>
+            </state>
+
+            <state id="forward_outputs">
+                <state id="forward">
+                    <transition cond="set(self.output_queue) &amp; set(self.sc_map)" target=".">
+                        <script>
+                            self.output_task = (set(self.output_queue) &amp; set(self.sc_map)).pop()
+                            value = self.output_queue[self.output_task].pop(0)
+                            if not self.output_queue[self.output_task]:
+                                del self.output_queue[self.output_task]
+                        </script>
+                        <raise event="output" scope="narrow" target="self.sc_map[self.output_task]">
+                            <parameter expr="value"/>
+                        </raise>
+                    </transition>
+                </state>
+            </state>
+
             <state id="wait_for_requests">
                 <state id="wait">
                     <transition event="from_mvi" target=".">
                         <parameter name="source"/>
                         <parameter name="data"/>
                         <script>
+                            self.source = source
                             # No JSON encoding necessary, as it is not complex
                             try:
-                                self.done_something = False
                                 if data["op"] == "set_input":
                                     if "value" in data:
                                         value = [json.loads(data["value"])]
                                     else:
                                         value = json.loads(data["data"])
-                                    self.input_queue[data["taskname"]].append((source, value))
+                                    self.forward = (data["taskname"], "input", value)
+                                    self.HTTP_reply = "OK"
                                 elif data["op"] == "get_output":
-                                    self.output_queue[data["taskname"]].append(source)
-                                elif data["op"] == "attach_debugger":
-                                    self.debugged_tasks.add(data["taskname"])
-                                    self.done_something = True
-                                    self.source = source
-                                    self.debug_info[data["taskname"]] = {'state': 'running', 'breakpoints': []}
-                                elif data["op"] == "detach_debugger":
-                                    self.debugged_tasks.discard(data["taskname"])
-                                    self.done_something = True
-                                    self.source = source
-                                    del self.debug_info[data["taskname"]]
+                                    self.forward = (data["taskname"], "output", self.source)
+                                    self.HTTP_reply = None
                                 elif data["op"] == "pause":
-                                    if data["taskname"] in self.debugged_tasks:
-                                        self.debug_info[data["taskname"]]['state'] = 'paused'
-                                    self.done_something = True
-                                    self.source = source
+                                    #TODO
+                                    self.HTTP_reply = "NotImplemented"
                                 elif data["op"] == "resume":
-                                    if data["taskname"] in self.debugged_tasks:
-                                        self.debug_info[data["taskname"]]['state'] = 'running'
-                                    self.done_something = True
-                                    self.source = source
-                                elif data["op"] == "step_over":
-                                    pass
-                                    self.done_something = True
-                                    self.source = source
-                                elif data["op"] == "step_into":
-                                    pass
-                                    self.done_something = True
-                                    self.source = source
+                                    #TODO
+                                    self.HTTP_reply = "NotImplemented"
                                 else:
-                                    raise Exception("DROPPING unknown operation: " + str(data["op"]))
-                            except ValueError:
-                                print("Error when deserializing request: " + str(data))
-                                raise
+                                    self.HTTP_reply = "Unknown command: %s" % data["op"]
+                            except ValueError as e:
+                                self.HTTP_reply = "Error when deserializing request: %s" % traceback.format_exc()
+                            except Exception as e:
+                                self.HTTP_reply = "Unknown Modelverse Error: %s" % traceback.format_exc()
                         </script>
                     </transition>
 
-                    <transition cond="self.done_something" target=".">
+                    <transition cond="not self.forward and self.HTTP_reply is not None" target=".">
                         <raise event="HTTP_input" scope="narrow" target="'to_mvi/%s' % self.source">
-                            <parameter expr="json.dumps(True)"/>
+                            <parameter expr="json.dumps(self.HTTP_reply)"/>
                         </raise>
                         <script>
-                            self.done_something = False
+                            self.HTTP_reply = None
+                            self.forward = None
                         </script>
                     </transition>
-                </state>
-            </state>
 
-            <state id="execution" initial="execution">
-                <state id="execution">
-                    <onentry>
+                    <transition cond="self.forward and self.forward[1] == 'input' and self.forward[0] in self.sc_map" target=".">
+                        <raise event="input" scope="narrow" target="self.sc_map[self.forward[0]]">
+                            <parameter expr="self.forward[2]"/>
+                        </raise>
+                        <raise event="HTTP_input" scope="narrow" target="'to_mvi/%s' % self.source">
+                            <parameter expr="json.dumps(self.HTTP_reply)"/>
+                        </raise>
                         <script>
-                            #start = time.time()
-                            self.outputs = []
-                            self.timeout = False
-                            if self.tasks:
-                                task = self.tasks.pop()
-                                if not task in self.debugged_tasks or self.debug_info[task]['state'] == 'running':
-                                    # Check if there are values to input
-                                    while self.input_queue[task]:
-                                        source, args = self.input_queue[task].pop(0)
-                                        for args_entry in args:
-                                            self.execute_modelverse(task, "set_input", [args_entry])
-
-                                        self.outputs.append((source, "OK"))
-                                        self.all_failed = False
-
-                                    if task in self.debugged_tasks:
-                                        start_time = -float('inf')
-                                    else:
-                                        start_time = time.time()
-                                    # Grant each task some milliseconds of execution
-                                    while (time.time() - start_time &lt; 0.05):
-                                        self.execute_modelverse(task, "execute_rule", [])
+                            print("SEND INPUT")
+                            self.HTTP_reply = None
+                            self.forward = None
+                        </script>
+                    </transition>
 
-                                        if not self.mvk.success:
-                                            # Blocking or broken, so quit already to stop wasting CPU
-                                            break
+                    <transition cond="self.forward and self.forward[1] == 'input' and self.forward[0] not in self.sc_map" target=".">
+                        <raise event="HTTP_input" scope="narrow" target="'to_mvi/%s' % self.source">
+                            <parameter expr="json.dumps(self.HTTP_reply)"/>
+                        </raise>
+                        <script>
+                            print("QUEUE INPUT")
+                            self.input_queue[self.forward[0]].append(self.forward[2])
+                            self.HTTP_reply = None
+                            self.forward = None
+                        </script>
+                    </transition>
 
-                                        # Could at least execute one instruction, so mark it as "not failed"
-                                        self.all_failed = False
+                    <transition cond="self.forward and self.forward[1] == 'output' and self.forward[0] in self.sc_map" target=".">
+                        <raise event="output" scope="narrow" target="self.sc_map[self.forward[0]]">
+                            <parameter expr="self.forward[2]"/>
+                        </raise>
+                        <script>
+                            self.HTTP_reply = None
+                            self.forward = None
+                        </script>
+                    </transition>
 
-                                    # Perform output if there is anything
-                                    while self.output_queue[task]:
-                                        self.execute_modelverse(task, "get_output", [])
-                                        if self.mvk.success:
-                                            self.outputs.append((self.output_queue[task].pop(0), self.mvk.returnvalue))
-                                            self.all_failed = False
-                                        else:
-                                            break
+                    <transition cond="self.forward and self.forward[1] == 'output' and self.forward[0] not in self.sc_map" target=".">
+                        <script>
+                            self.output_queue[self.forward[0]].append(self.forward[2])
+                            self.HTTP_reply = None
+                            self.forward = None
+                        </script>
+                    </transition>
+                </state>
+            </state>
 
-                            else:
-                                if self.count >= 2000:
-                                    self.count = 0
-                                    self.mvs.purge()
-                                else:
-                                    self.count += 1
-                                    self.mvs.garbage_collect()
+            <parallel id="execution">
+                <state id="refresh_tasks" initial="refresh_tasks">
+                    <state id="refresh_tasks">
+                        <onentry>
+                            <script>
                                 out = self.mvs.read_outgoing(self.root)
+                                tasks = set()
                                 for m in out:
                                     src, task = self.mvs.read_edge(m)
                                     outgoing = self.mvs.read_outgoing(m)
@@ -271,38 +281,90 @@
                                     name = self.mvs.read_value(dest)
                                     if name.startswith("__"):
                                         continue
-                                    self.tasks.add(name)
-                                self.timeout = self.all_failed
-                                self.all_failed = True
-                        </script>
-                    </onentry>
+                                    tasks.add(name)
+                                    print("Saw " + str(name))
 
-                    <transition cond="self.outputs" target="../process_data"/>
-                    <transition cond="not self.timeout and not self.outputs" target="../timeout_none"/>
-                    <transition cond="self.timeout and not self.outputs" target="../timeout_long"/>
-                </state>
+                                self.new_tasks = set([i for i in tasks if i not in self.sc_map])
+                                self.old_tasks = set([i for i in self.sc_map if i not in tasks])
+                            </script>
+                        </onentry>
 
-                <state id="timeout_long">
-                    <transition after="self.sccd_yield() + 0.05" target="../execution"/>
-                </state>
+                        <transition cond="self.new_tasks" target="../create_SC"/>
+                        <transition cond="not self.new_tasks and self.old_tasks" target="../remove_SC"/>
+                        <transition after="1" target="."/>
+                    </state>
+
+                    <state id="create_SC" initial="creating">
+                        <state id="creating">
+                            <onentry>
+                                <script>
+                                    self.task = self.new_tasks.pop()
+                                    print("SPAWN TASK: " + str(self.task))
+                                </script>
+                                <raise event="create_instance" scope="cd">
+                                    <parameter expr="'tasks'"/>
+                                    <parameter expr="'Task'"/>
+                                    <parameter expr="self.task"/>
+                                    <parameter expr="self.mvs_operations"/>
+                                    <parameter expr="self.mvk"/>
+                                </raise>
+                            </onentry>
+
+                            <transition event="instance_created" target="../created">
+                                <parameter name="instancename"/>
+                                <raise scope="cd" event="start_instance">
+                                    <parameter expr="instancename" />
+                                </raise>
+                                <script>
+                                    self.sc_map[self.task] = instancename
+                                </script>
+                            </transition>
+                        </state>
+                        <state id="created">
+                            <transition cond="self.new_tasks" target="../creating"/>
+                            <transition cond="not self.new_tasks and self.old_tasks" target="../../remove_SC"/>
+                            <transition cond="not self.new_tasks and not self.old_tasks" target="../../refresh_tasks"/>
+                        </state>
+                    </state>
+
+                    <state id="remove_SC">
+                        <transition cond="self.old_tasks" target=".">
+                            <script>
+                                task = self.old_tasks.pop()
+                            </script>
+                            <raise scope="cd" event="delete_instance">
+                                <parameter expr="self.sc_map[task]"/>
+                            </raise>
+                            <script>
+                                del self.sc_map[task]
+                            </script>
+                        </transition>
 
-                <state id="timeout_none">
-                    <transition after="self.sccd_yield()" target="../execution"/>
+                        <transition cond="not self.old_tasks" target="../refresh_tasks"/>
+                    </state>
                 </state>
 
-                <state id="process_data">
-                    <transition cond="self.outputs" target=".">
-                        <script>
-                            destination, value = self.outputs.pop()
-                        </script>
-                        <raise event="HTTP_input" scope="narrow" target="'to_mvi/%s' % destination">
-                            <parameter expr="json.dumps(value)"/>
-                        </raise>
-                    </transition>
+                <state id="mvs_GC" initial="suspend_tasks">
+                    <state id="suspend_tasks">
+                        <onentry>
+                            <raise scope="broad" event="pause_task"/>
+                        </onentry>
+
+                        <transition target="../mvs_GC"/>
+                    </state>
 
-                    <transition cond="not self.outputs" target="../execution"/>
+                    <state id="mvs_GC">
+                        <onentry>
+                            <script>
+                                print("GC")
+                                self.mvs.garbage_collect()
+                            </script>
+                            <raise scope="broad" event="resume_task"/>
+                        </onentry>
+                        <transition after="self.sccd_yield() + 10" target="."/>
+                    </state>
                 </state>
-            </state>
+            </parallel>
 
             <state id="remove_sockets">
                 <state id="remove_sockets">

+ 143 - 0
hybrid_server/classes/task.xml

@@ -0,0 +1,143 @@
+<class name="Task">
+    <relationships>
+        <association name="parent" class="MvKController" min="1" max="1"/>
+    </relationships>
+    <constructor>
+        <parameter name="taskname"/>
+        <parameter name="mvs_operations"/>
+        <parameter name="mvk"/>
+        <body>
+            <![CDATA[
+            self.taskname = taskname
+            self.mvs_operations = mvs_operations
+            self.mvk = mvk
+
+            self.failed = False
+            self.output_queue = []
+            self.outputs = []
+            ]]>
+        </body>
+
+    </constructor>
+
+    <method name="execute_modelverse">
+        <parameter name="taskname"/>
+        <parameter name="operation"/>
+        <parameter name="params"/>
+        <body>
+            <![CDATA[
+            reply = None
+            commands = []
+            mvk = self.mvk
+            mvs_operations = self.mvs_operations
+            try:
+                while 1:
+                    commands = mvk.execute_yields(taskname, operation, params, reply)
+                    if commands is None:
+                        break
+                    reply = [mvs_operations[command[0]](*(command[1])) for command in commands]
+            except:
+                print("ERROR: " + str(self.mvk.debug_info.get(taskname, "Unknown taskname")))
+                #TODO delete self, as the task has crashed!
+                return False
+            return True
+            ]]>
+        </body>
+    </method>
+
+    <scxml initial="start">
+        <parallel id="start">
+            <state id="execution" initial="running">
+                <state id="running" initial="executing">
+                    <transition event="pause_task" target="../suspended"/>
+
+                    <state id="executing">
+                        <onentry>
+                            <script>
+                                start_time = time.time()
+                                # Grant each task some milliseconds of execution
+                                while (time.time() - start_time &lt; 0.05):
+                                    if not self.execute_modelverse(self.taskname, "execute_rule", []):
+                                        # Failed!
+                                        self.failed = True
+                                        break
+
+                                    if not self.mvk.success:
+                                        # Blocking or broken, so quit already to stop wasting CPU
+                                        break
+
+                                if not self.failed:
+                                    # Perform output if there is anything
+                                    while self.output_queue:
+                                        if self.execute_modelverse(self.taskname, "get_output", []):
+                                            if self.mvk.success:
+                                                self.outputs.append((self.output_queue.pop(0), self.mvk.returnvalue))
+                                            else:
+                                                # No output left in Mv, so break
+                                                break
+                                        else:
+                                            self.failed = True
+                                            break
+                            </script>
+                        </onentry>
+
+                        <transition cond="self.failed" target="../../failed"/>
+                        <transition cond="self.mvk.success" target="."/>
+                        <transition cond="not self.mvk.success" target="../yielded"/>
+                    </state>
+
+                    <state id="yielded">
+                        <!-- TODO increase timeout duration -->
+                        <transition after="self.sccd_yield() + 0.2" target="../executing"/>
+                        <transition event="processed_input" target="../executing"/>
+                    </state>
+                </state>
+
+                <state id="suspended">
+                    <state id="suspended">
+                        <transition event="resume" target="../../running"/>
+                    </state>
+                </state>
+
+                <state id="failed">
+                    <state id="failed">
+                        <!-- TODO delete task -->
+                    </state>
+                </state>
+            </state>
+
+            <state id="process_events">
+                <state id="process_events">
+                    <transition event="input" target=".">
+                        <parameter name="params"/>
+                        <script>
+                            print("GOT INPUT: " + str(params))
+                            for args_entry in params:
+                                if not self.execute_modelverse(self.taskname, "set_input", [args_entry]):
+                                    # Failed!
+                                    self.failed = True
+                                    break
+                        </script>
+                    </transition>
+
+                    <transition event="output" target=".">
+                        <parameter name="params"/>
+                        <script>
+                            self.output_queue.append(params)
+                        </script>
+                    </transition>
+
+                    <transition cond="self.outputs" target=".">
+                        <script>
+                            source, value = self.outputs.pop(0)
+                            print("Source: " + str(source))
+                        </script>
+                        <raise event="HTTP_input" scope="narrow" target="'parent/to_mvi/%s' % source">
+                            <parameter expr="json.dumps(value)"/>
+                        </raise>
+                    </transition>
+                </state>
+            </state>
+        </parallel>
+    </scxml>
+</class>

+ 1 - 0
hybrid_server/server.xml

@@ -28,4 +28,5 @@
     <class src="classes/mvkcontroller.xml" default="true"/>
     <class src="classes/server.xml"/>
     <class src="classes/socket.xml"/>
+    <class src="classes/task.xml"/>
 </diagram>