Browse Source

Allow for multiple tasks to be connected

Yentl Van Tendeloo 8 years ago
parent
commit
a156d6e0e1
2 changed files with 37 additions and 31 deletions
  1. 0 7
      unit/test_all.py
  2. 37 24
      wrappers/modelverse.py

+ 0 - 7
unit/test_all.py

@@ -214,13 +214,10 @@ class TestModelverse(unittest.TestCase):
         ctrl = log_output.Controller(log, keep_running=False)
         thrd = threading.Thread(target=ctrl.start)
         thrd.daemon = True
-        print("Start log_output")
         thrd.start()
 
         assert transformation_execute_MT("test/print_pn", {"PetriNet": "test/my_pn"}, {}, (ctrl, "inp", "outp")) == None
-        print("MT OK")
         thrd.join()
-        print("Joined 1")
         assert set(log) == set(['"p1" --> 1',
                                 '"p2" --> 2',
                                 '"p3" --> 3'])
@@ -230,17 +227,13 @@ class TestModelverse(unittest.TestCase):
         assert transformation_execute_MT("test/pn_runtime_to_design", {"PetriNet_Runtime": "test/my_pn_RT"}, {"PetriNet": "test/my_pn"}) == True
 
         log = []
-        print("Ready for 2")
         ctrl = log_output.Controller(log, keep_running=False)
         thrd = threading.Thread(target=ctrl.start)
         thrd.daemon = True
-        print("Start second")
         thrd.start()
 
         assert transformation_execute_MT("test/print_pn", {"PetriNet": "test/my_pn"}, {}, (ctrl, "inp", "outp")) == None
-        print("Joining 2")
         thrd.join()
-        print("Joined 2")
         assert set(log) == set(['"p1" --> 0',
                                 '"p2" --> 1',
                                 '"p3" --> 5'])

+ 37 - 24
wrappers/modelverse.py

@@ -67,33 +67,34 @@ mode = MODE_UNCONNECTED
 prev_mode = None
 current_model = None
 registered_metamodels = {}
-outputs = [None]
+outputs = {}
 ctrl_input = None
 ctrl_output = None
 
-def _output_thread(outputs, taskname):
-    req_out = ctrl_output.addOutputListener("request_out")
+def _output_thread(controller, outputs, taskname):
+    req_out = controller.addOutputListener("request_out")
     my_id = str(uuid.uuid4())
 
     try:
         while 1:
-            ctrl_output.addInput(Event("HTTP_input", "request_in", [urllib.urlencode({"op": "get_output", "taskname": taskname}), my_id]))
+            controller.addInput(Event("HTTP_input", "request_in", [urllib.urlencode({"op": "get_output", "taskname": taskname}), my_id]))
 
             event = req_out.fetch(-1)
             if event.parameters[1] == my_id:
-                outputs.append(json.loads(event.parameters[0]))
+                outputs[taskname].append(json.loads(event.parameters[0]))
     except:
         pass
 
 def _exec_on_statechart(statechart):
     def _exec_sc(controller, inport, outport):
+        global taskname
         op = controller.addOutputListener(outport)
 
         while 1:
-            if len(outputs) > 1:
+            if len(outputs[taskname]) > 1:
                 # Is an output message of the Mv, so put it in the SC
-                del outputs[0]
-                output_event = outputs[0]
+                del outputs[taskname][0]
+                output_event = outputs[taskname][0]
 
                 if output_event == "Success" or output_event == "Failure":
                     # Is a stop event!
@@ -218,22 +219,23 @@ def _output(expected=None, port=None):
         port = taskname
 
     try:
-        while len(outputs) < 2:
+        while len(outputs[port]) < 2:
             time.sleep(0.02)
 
-        del outputs[0]
+        del outputs[port][0]
         #print("[OUT] %s" % outputs[0])
     except:
         raise UnknownError()
 
-    if expected is not None and _last_output() != expected:
-        raise InterfaceMismatch(_last_output(), expected)
-    return outputs[0]
+    if expected is not None and _last_output(port) != expected:
+        raise InterfaceMismatch(_last_output(port), expected)
+    return _last_output(port)
 
 def _last_output(port=None):
     if port is None:
         port = taskname
-    return outputs[0]
+
+    return outputs[port][0]
 
 # Raise common exceptions
 def _handle_output(requested=None, split=False):
@@ -339,11 +341,14 @@ def init(address_param="127.0.0.1:8001", timeout=20.0):
     if ctrl_output is not None:
         ctrl_output.stop()
 
-    addr, port = address_param.split(":", 1)
+    global address
+    global port
+
+    address, port = address_param.split(":", 1)
     port = int(port)
     
-    ctrl_input = _start_http_client(addr, port, timeout)
-    ctrl_output = _start_http_client(addr, port, timeout) 
+    ctrl_input = _start_http_client(address, port, timeout)
+    ctrl_output = _start_http_client(address, port, timeout) 
     controllers = [ctrl_input, ctrl_output]
 
     global mode
@@ -357,16 +362,16 @@ def init(address_param="127.0.0.1:8001", timeout=20.0):
     global outputs
     global taskname
     taskname = task
-    outputs = [None]
+    outputs = {}
 
-    _listen_to_output(task)
+    _listen_to_output(ctrl_output, task)
 
-def _listen_to_output(task):
+def _listen_to_output(controller, task):
     global outputs
-    outputs = [None]
+    outputs[task] = [None]
 
     # This re-assign also diconnects the previous get_output connections to the outputs variable
-    thrd = threading.Thread(target=_output_thread, args=[outputs, task])
+    thrd = threading.Thread(target=_output_thread, args=[controller, outputs, task])
     thrd.daemon = True
     thrd.start()
 
@@ -965,9 +970,17 @@ def read_association_destination(model_name, ID):
 def service_register(name, function):
     """Register a function as a service with a specific name."""
 
-    def service_process(port):
+    def service_process(service_task):
+        global address
+        global port
+
+        ctrl = _start_http_client(address, port, 10.0) 
+        _listen_to_output(ctrl, service_task)
         while 1:
-            thrd = threading.Thread(target=function, args=[service_get(port)])
+            client_task = service_get(service_task)
+            ctrl = _start_http_client(address, port, 10.0) 
+            _listen_to_output(ctrl, client_task)
+            thrd = threading.Thread(target=function, args=[client_task])
             thrd.daemon = True
             thrd.start()