import sys import time import uuid sys.path.append("kernel/") sys.path.append("state/") sys.path.append("interface/HUTN") from modelverse_state.main import ModelverseState as MvS from modelverse_kernel.main import ModelverseKernel as MvK from modelverse_kernel.primitives import SleepKernel from hutn_compiler.compiler import main as do_compile from pypdevs.minimal import AtomicDEVS, CoupledDEVS, Simulator PROFILE = False import json import random random.seed(1) COMPILER_PATH = "interface/HUTN" sys.path.append(COMPILER_PATH) from hutn_compiler.compiler import main as do_compile import os def clean_code(code): if code == "": return code code_fragments = code.split("\n") code_fragments = [i.rstrip() for i in code_fragments if i.strip() != ""] code_fragments = [i.replace(" ", "\t") for i in code_fragments] initial_tabs = min([len(i) - len(i.lstrip("\t")) for i in code_fragments]) code_fragments = [i[initial_tabs:] for i in code_fragments] code_fragments.append("") code = "\n".join(code_fragments) return code.encode('ascii', 'replace') def compile_model(temp_file): compiled = do_compile(temp_file, COMPILER_PATH + "/grammars/modelling.g", "M") return ["__LOCAL__"] + compiled def translate(operation): return { "CN": "create_node", "CE": "create_edge", "CNV": "create_nodevalue", "CD": "create_dict", "RV": "read_value", "RO": "read_outgoing", "RI": "read_incoming", "RE": "read_edge", "RD": "read_dict", "RDN": "read_dict_node", "RDNE": "read_dict_node_edge", "RDE": "read_dict_edge", "RRD": "read_reverse_dict", "RR": "read_root", "RDK": "read_dict_keys", "DE": "delete_edge", "DN": "delete_node", }[operation] class MvSState(object): def __init__(self): self.queue = [] self.output = None self.mvs = MvS("bootstrap/bootstrap.m.gz") self.timer = float("inf") class ModelverseState(AtomicDEVS): def __init__(self, read_root, create_node, create_edge, create_nodevalue, create_dict, read_value, read_outgoing, read_incoming, read_edge, read_dict, read_dict_keys, read_dict_edge, read_dict_node, read_dict_node_edge, read_reverse_dict, delete_node, delete_edge): AtomicDEVS.__init__(self, "MvS") self.timings = { "read_root": read_root, "create_node": create_node, "create_edge": create_edge, "create_nodevalue": create_nodevalue, "create_dict": create_dict, "read_value": read_value, "read_outgoing": read_outgoing, "read_incoming": read_incoming, "read_edge": read_edge, "read_dict": read_dict, "read_dict_keys": read_dict_keys, "read_dict_edge": read_dict_edge, "read_dict_node": read_dict_node, "read_dict_node_edge": read_dict_node_edge, "read_reverse_dict": read_reverse_dict, "delete_node": delete_node, "delete_edge": delete_edge, } self.state = MvSState() self.from_mvk = self.addInPort("from_MvK") self.to_mvk = self.addOutPort("to_MvK") def extTransition(self, inputs): self.state.timer -= self.elapsed empty = len(self.state.queue) == 0 self.state.queue.extend(inputs[self.from_mvk]) if empty: # First message, so set the timer # And already compute the result so it is ready to output self.state.output = [] self.state.timer = 0.0 for v in self.state.queue[0]: f = getattr(self.state.mvs, translate(v[0])) start = time.time() self.state.output.append(f(*v[1])) if PROFILE: print("%s: %.17f" % (translate(v[0]), time.time() - start)) self.state.timer += self.timings[translate(v[0])] else: # Just append the message to process pass return self.state def outputFnc(self): return {self.to_mvk: [self.state.output]} def intTransition(self): self.state.queue.pop(0) self.state.output = [] if len(self.state.queue) > 0: self.state.timer = 0.0 # Value contains a list of operations to do # So do them and calculate how long it takes for v in self.state.queue[0]: f = getattr(self.state.mvs, translate(v[0])) start = time.time() self.state.output.append(f(*v[1])) if PROFILE: print("%s: %.17f" % (translate(v[0]), time.time() - start)) self.state.timer += self.timings[translate(v[0])] else: self.state.timer = float("inf") return self.state def timeAdvance(self): return self.state.timer class MvKState(object): def __init__(self, rule_generation): self.mvk = None self.waiting = False self.inputs = {} self.outputs = {} self.tasks = [] self.reply = None self.phase = None self.commands = None self.root = None self.current_task = None self.loaded_primitives = False self.execution_counter = 0 self.rule_generation = rule_generation self.current_time = 0.0 self.start_task_time = 0.0 def __str__(self): return "\nMvK: %s\n" % self.mvk + \ "waiting: %s\n" % self.waiting + \ "inputs: %s\n" % self.inputs + \ "outputs: %s\n" % self.outputs + \ "tasks: %s\n" % self.tasks + \ "reply: %s\n" % self.reply + \ "phase: %s\n" % self.phase + \ "commands: %s\n" % self.commands + \ "root: %s\n" % self.root + \ "current task: %s\n" % self.current_task + \ "execution counter: %s\n" class ModelverseKernel(AtomicDEVS): def __init__(self, time_per_phase, rule_generation): AtomicDEVS.__init__(self, "MvK") self.state = MvKState(rule_generation) self.from_mvi = self.addInPort("from_MvI") self.from_mvs = self.addInPort("from_MvS") self.to_mvi = self.addOutPort("to_MvI") self.to_mvs = self.addOutPort("to_MvS") self.time_per_phase = time_per_phase def extTransition(self, inputs): self.state.current_time += self.elapsed if self.from_mvi in inputs: # Got input from MvI, so we queue it for inp in inputs[self.from_mvi]: taskname = inp[0] data = inp[1] if data is not None: self.state.inputs.setdefault(taskname, []).extend(data) else: self.state.outputs.setdefault(taskname, []).append(None) if self.from_mvs in inputs: # Got input from MvS, so we can continue processing #print(" --> " + str(inputs[self.from_mvs])) for mvs_input in inputs[self.from_mvs]: if self.state.mvk is None: # No MvK, so set it with the root we have just received (or should have received) self.state.root = mvs_input[0] self.state.mvk = MvK(self.state.root) else: self.state.reply = mvs_input self.state.waiting = False return self.state def intTransition(self): self.state.current_time += self.timeAdvance() was_empty = len(self.state.tasks) == 0 if self.state.commands is not None: self.state.commands = None return self.state if self.state.mvk is not None: self.state.mvk.returnvalue = None if self.state.mvk is None: # Initializing self.state.waiting = True elif not self.state.loaded_primitives: commands = self.state.mvk.execute_yields("", "load_primitives", [], self.state.reply) if commands is None: self.state.loaded_primitives = True self.state.reply = None else: self.state.waiting = True self.state.commands = commands else: # Are initialized and have work to do if len(self.state.tasks) == 0: # Read out new set of tasks first if self.state.reply is None: commands = [("RDK", [self.state.root])] else: self.state.tasks = self.state.reply[0] commands = None elif self.state.phase == "init_task": if self.state.reply is None: commands = [("RV", [self.state.tasks[0]])] else: self.state.current_task = self.state.reply[0] #print("Processing task %s at time %s" % (self.state.current_task, self.time_last)) self.state.start_task_time = self.state.current_time if self.state.current_task.startswith("__"): # Don't process this task and force termination of task self.state.phase = "output" commands = None elif self.state.phase == "input": # Process inputs if self.state.inputs.get(self.state.current_task, None): value = self.state.inputs[self.state.current_task][0] start = time.time() commands = self.state.mvk.execute_yields(self.state.current_task, "set_input", [value], self.state.reply) if PROFILE: print("rule_generation: %.17f" % ((time.time() - start))) #self.state.rule_generation = time.time() - start self.state.mvk.returnvalue = None if commands is None: self.state.inputs[self.state.current_task].pop(0) else: commands = None elif self.state.phase == "computation": try: start = time.time() commands = self.state.mvk.execute_yields(self.state.current_task, "execute_rule", [], self.state.reply) if PROFILE: print("rule_generation: %.17f" % ((time.time() - start))) #self.state.rule_generation = time.time() - start except SleepKernel: commands = None self.state.mvk.success = False else: self.state.mvk.success = True elif self.state.phase == "output": start = time.time() commands = self.state.mvk.execute_yields(self.state.current_task, "get_output", [], self.state.reply) if PROFILE: print("rule_generation: %.17f" % ((time.time() - start))) #self.state.rule_generation = time.time() - start else: raise Exception("Phase: " + str(self.state.phase)) # Advance phase if commands is None: if was_empty: self.state.phase = "init_task" elif self.state.phase == "init_task": self.state.phase = "input" elif self.state.phase == "input": self.state.phase = "computation" elif self.state.phase == "computation": if not self.state.mvk.success or (self.state.current_time - self.state.start_task_time > self.time_per_phase): self.state.phase = "output" elif self.state.phase == "output": self.state.tasks.pop(0) self.state.phase = "init_task" self.state.waiting = False self.state.reply = None else: self.state.waiting = True # Send the commands to the MvS self.state.commands = commands #print(" <-- " + str(commands)) return self.state def outputFnc(self): outputs = {} if self.state.mvk is None: # Ask the root first outputs[self.to_mvs] = [[("RR", [])]] elif self.state.waiting: outputs[self.to_mvs] = [self.state.commands] if self.state.mvk and self.state.mvk.returnvalue is not None: outputs[self.to_mvi] = [(self.state.current_task, self.state.mvk.returnvalue)] return outputs def timeAdvance(self): if self.state.commands is not None: return self.state.rule_generation elif self.state.waiting: return float("inf") elif self.state.mvk is None: return 0 else: return 0 class MvIState(): def __init__(self): self.operations = [] self.additional_operations = [] self.keyed_operations = {} self.task_to_spawner = {} self.task_to_operation = {} self.output = {} self.blocked = True self.finished = False self.send_operations = {} class ModelverseInterface(AtomicDEVS): def __init__(self, taskname, operations, finish_on, additional_operations=[], keyed_operations={}): AtomicDEVS.__init__(self, "MvI_%s" % taskname) self.state = MvIState() if taskname == "task_manager": self.state.blocked = False self.state.operations = operations self.state.additional_operations = additional_operations self.state.keyed_operations = keyed_operations self.state.create_additional_task = [] self.taskname = taskname self.finish_on = finish_on self.to_mvk = self.addOutPort("to_MvK") self.from_mvk = self.addInPort("from_MvK") def intTransition(self): self.state.create_additional_task = [] if not self.state.send_operations: self.state.operations.pop(0) else: for k in self.state.send_operations.keys(): if self.state.send_operations[k]: self.state.send_operations[k][0] = None self.state.blocked = True return self.state def extTransition(self, inputs): for inp in inputs[self.from_mvk]: print("Got input: " + str(inp)) self.state.blocked = False self.state.output.setdefault(inp[0], []).append(inp[1]) if inp[0] == self.taskname and inp[1] == self.finish_on: self.state.finished = True elif inp[0] == self.taskname and self.state.operations[0] is None: # We have to block for now, and modify a model first... print("Blocking for now...") prev_output = self.state.output[inp[0]][-1] if prev_output.startswith("Please edit this model before sending next input: "): _, model_name = prev_output.split("Please edit this model before sending next input: ", 1) new_taskname = str(uuid.uuid4()) self.state.send_operations[new_taskname] = [[], ["admin"], ["admin"], ["quiet"], ["model_modify", model_name, ""]] + self.state.additional_operations[0] + [["exit"]] print("Modifying model " + str(model_name)) print("On taskname " + str(new_taskname)) self.state.create_additional_task.append(new_taskname) self.state.task_to_operation[new_taskname] = None self.state.task_to_spawner[new_taskname] = None elif prev_output.startswith("Spawned activity on task: "): print("Spawned activity") _, task_name = prev_output.split("Spawned activity on task: ", 1) self.state.blocked = True self.state.task_to_spawner[task_name] = None # NOTE We now know that there is something to do on an other task, so we just wait for that event to come in print("Adding 3 empty lists") self.state.operations.insert(0, None) self.state.operations.insert(0, None) self.state.operations.insert(0, None) elif prev_output.startswith("Finished task: "): self.state.blocked = True elif " : " in prev_output: task_name, _ = prev_output.split(" : ", 1) print("Got new task to monitor: " + task_name) print("Adding 3 empty lists") self.state.blocked = True self.state.task_to_spawner[task_name] = None # NOTE We now know that there is something to do on an other task, so we just wait for that event to come in self.state.operations.insert(0, None) self.state.operations.insert(0, None) self.state.operations.insert(0, None) else: print("UNKNOWN: " + str(prev_output)) elif inp[0] != self.taskname: # Got some output on another task # If the task is not registered yet, it is likely not important to communicate with it, so ignore it if inp[0] in self.state.send_operations: self.state.send_operations[inp[0]].pop(0) if inp[1].startswith("Please edit this model before sending next input: "): _, model_name = inp[1].split("Please edit this model before sending next input: ", 1) new_taskname = str(uuid.uuid4()) self.state.send_operations[new_taskname] = [[], ["admin"], ["admin"], ["quiet"], ["model_modify", model_name, ""]] + self.state.keyed_operations.get(self.state.task_to_operation[inp[0]], []) + [["exit"]] print("Modifying model " + str(model_name)) print("On taskname " + str(new_taskname)) self.state.create_additional_task.append(new_taskname) self.state.task_to_spawner[new_taskname] = inp[0] elif inp[1].startswith("Please perform manual operation "): _, op_name = inp[1].split("Please perform manual operation ", 1) self.state.task_to_operation[inp[0]] = op_name[1:-1] if inp[0] in self.state.send_operations and not self.state.send_operations[inp[0]]: del self.state.send_operations[inp[0]] # At the end of these operations, so finish up! if inp[0] in self.state.task_to_spawner: if self.state.task_to_spawner[inp[0]] is not None: self.state.keyed_operations.pop(self.state.task_to_operation[self.state.task_to_spawner[inp[0]]], None) self.state.additional_operations.insert(0, []) self.state.send_operations[self.state.task_to_spawner[inp[0]]] = ["__continue__"] else: self.state.additional_operations.pop(0) self.state.operations.pop(0) print("Clear up to the normal task!") return self.state def outputFnc(self): if self.state.send_operations: outp = [] if self.state.create_additional_task: outp.append(('task_manager', self.state.create_additional_task)) for k, v in self.state.send_operations.items(): if v and v[0] is not None: outp.append((k, v[0])) print("REQUEST special: " + str(outp)) if outp: return {self.to_mvk: outp} else: return {} elif self.state.operations and self.state.operations[0] is not None: print("REQUEST: " + str(self.state.operations[0])) return {self.to_mvk: [(self.taskname, self.state.operations[0])]} else: return {} def timeAdvance(self): if self.state.blocked: return float("inf") else: return 0.0 class NetworkState(object): def __init__(self): self.processing = [] self.timer = float("inf") class Network(AtomicDEVS): def __init__(self, name, latency, bandwidth): AtomicDEVS.__init__(self, name) self.state = NetworkState() self.input_port = self.addInPort("input_port") self.output_port = self.addOutPort("output_port") self.latency = latency self.bandwidth = bandwidth def intTransition(self): self.state.processing.pop(0) if self.state.processing: self.state.timer = (len(self.state.processing[0]) * 8 / float(self.bandwidth) + self.latency) else: self.state.timer = float("inf") return self.state def extTransition(self, inputs): self.state.timer -= self.elapsed if self.state.timer == float("inf"): self.state.timer = 0 for v in inputs[self.input_port]: self.state.processing.append(json.dumps(v)) # NOTE data is in bytes, while bandwidth is in bits, so multiply by 8 if len(self.state.processing) > 0: self.state.timer = (len(self.state.processing[0]) * 8 / float(self.bandwidth) + self.latency) return self.state def outputFnc(self): return {self.output_port: [json.loads(self.state.processing[0])]} def timeAdvance(self): #print("Timer: " + str(self.state.timer)) return self.state.timer class System(CoupledDEVS): def __init__(self, taskname, operations, mvi_additional, mvi_keyed, finish_on, rule_generation, time_per_phase, mvi2mvk_latency, mvi2mvk_bandwidth, mvk2mvs_latency, mvk2mvs_bandwidth, mvs2mvk_latency, mvs2mvk_bandwidth, mvk2mvi_latency, mvk2mvi_bandwidth, read_root, create_node, create_edge, create_nodevalue, create_dict, read_value, read_outgoing, read_incoming, read_edge, read_dict, read_dict_keys, read_dict_edge, read_dict_node, read_dict_node_edge, read_reverse_dict, delete_node, delete_edge): CoupledDEVS.__init__(self, "System") self.mvi_manager = self.addSubModel(ModelverseInterface(\ taskname = "task_manager", operations = [[taskname]], finish_on = None, )) self.mvi = self.addSubModel(ModelverseInterface(\ taskname = taskname, operations = operations, finish_on = finish_on, additional_operations = mvi_additional, keyed_operations = mvi_keyed, )) self.mvk = self.addSubModel(ModelverseKernel(\ time_per_phase = time_per_phase, rule_generation = rule_generation, )) self.mvs = self.addSubModel(ModelverseState(\ read_root = read_root, create_node = create_node, create_edge = create_edge, create_nodevalue = create_nodevalue, create_dict = create_dict, read_value = read_value, read_outgoing = read_outgoing, read_incoming = read_incoming, read_edge = read_edge, read_dict = read_dict, read_dict_keys = read_dict_keys, read_dict_edge = read_dict_edge, read_dict_node = read_dict_node, read_dict_node_edge = read_dict_node_edge, read_reverse_dict = read_reverse_dict, delete_node = delete_node, delete_edge = delete_edge, )) self.mvi2mvk = self.addSubModel(Network(\ name = "mvi2mvk", latency = mvi2mvk_latency, bandwidth = mvi2mvk_bandwidth, )) self.mvk2mvs = self.addSubModel(Network(\ name = "mvk2mvs", latency = mvk2mvs_latency, bandwidth = mvk2mvs_bandwidth, )) self.mvs2mvk = self.addSubModel(Network(\ name = "mvs2mvk", latency = mvs2mvk_latency, bandwidth = mvs2mvk_bandwidth, )) self.mvk2mvi = self.addSubModel(Network(\ name = "mvk2mvi", latency = mvk2mvi_latency, bandwidth = mvk2mvi_bandwidth, )) self.connectPorts(self.mvi_manager.to_mvk, self.mvk.from_mvi) self.connectPorts(self.mvi.to_mvk, self.mvi2mvk.input_port) self.connectPorts(self.mvi2mvk.output_port, self.mvk.from_mvi) self.connectPorts(self.mvk.to_mvs, self.mvk2mvs.input_port) self.connectPorts(self.mvk2mvs.output_port, self.mvs.from_mvk) self.connectPorts(self.mvs.to_mvk, self.mvs2mvk.input_port) self.connectPorts(self.mvs2mvk.output_port, self.mvk.from_mvs) self.connectPorts(self.mvk.to_mvi, self.mvk2mvi.input_port) self.connectPorts(self.mvk2mvi.output_port, self.mvi.from_mvk) taskname = "test_task" operations = json.loads(open("model/operations", 'r').read()) #operations = json.loads(open("model/operations_simple", 'r').read()) additional_operations = [[], # revise_req [], # revise_environment [], # revise_plant [], # revise_control [], # revise_query [], # revise_architecture [], # make_initial_models [["instantiate_edge", "Association", "PLANT2EPN_link", "PW_Plant/State", "Encapsulated_PetriNet/Place"], ["instantiate_edge", "Association", "PLANT2EPN_tlink", "PW_Plant/Transition", "Encapsulated_PetriNet/Transition"]], # plant_to_EPN [["instantiate_edge", "Association", "CTRL2EPN_link", "PW_Control/State", "Encapsulated_PetriNet/Place"], ["instantiate_edge", "Association", "CTRL2EPN_tlink", "PW_Control/Transition", "Encapsulated_PetriNet/Transition"]], # control_to_EPN [["instantiate_edge", "Association", "ENV2EPN_link", "PW_Environment/Event", "Encapsulated_PetriNet/Place"]], # environment_to_EPN [["instantiate_edge", "Association", "EPN2PN_place_link", "Encapsulated_PetriNet/Place", "PetriNet/Place"], ["instantiate_edge", "Association", "EPN2PN_transition_link", "Encapsulated_PetriNet/Transition", "PetriNet/Transition"]], # EPN_to_PN [], # match [], # reachability [], # bfs [], # merge ] #additional_operations = [[]] keyed_operations = {"models/revise_req": [["upload"], compile_model("models/requirements_model.mvc")], "models/revise_plant": [["upload"], compile_model("models/plant_model.mvc")], "models/revise_environment": [["upload"], compile_model("models/environment_model.mvc")], "models/revise_control": [["upload"], compile_model("models/control_model.mvc")], "models/revise_query": [["upload"], compile_model("models/query_model.mvc")], "models/revise_architecture": [["upload"], compile_model("models/architecture_model.mvc")], } #keyed_operations = {"models/modify_lang": [["instantiate_node", "PN/Class", ""]]} finish_on = "FINISHED" args = { "taskname": taskname, "operations": operations, "finish_on": finish_on, "mvi_additional": additional_operations, "mvi_keyed": keyed_operations, "mvi2mvk_latency": 0.0000001, "mvi2mvk_bandwidth": 50000000000, "mvk2mvs_latency": 0.0000001, "mvk2mvs_bandwidth": 50000000000, "mvs2mvk_latency": 0.0000001, "mvs2mvk_bandwidth": 50000000000, "mvk2mvi_latency": 0.0000001, "mvk2mvi_bandwidth": 50000000000, "time_per_phase": 0.05, # Automatically filled in from calibration results, just here to prevent crashes (results for my UA desktop) "read_root": 0.00001406669616699, "create_node": 0.00000379181167487, "create_edge": 0.00000601282282066, "create_nodevalue": 0.00000501364247391, "create_dict": 0.00001028065706205, "read_value": 0.00000388661630500, "read_outgoing": 0.00000520600098073, "read_incoming": 0.00000645903181994, "read_edge": 0.00000449162172644, "read_dict": 0.00000460127038355, "read_dict_keys": 0.00001678063432883, "read_dict_node": 0.00001020808859528, "read_dict_edge": 0.00000642558526942, "read_dict_node_edge": 0.0, "read_reverse_dict": 0.00002557890755790, "delete_node": 0.00004755891187096, "delete_edge": 0.00000683382081240, "rule_generation": 0.00001543215873893, } with open("calibration/averages", 'r') as param_file: for l in param_file: op, t = l.split(": ") op = op.strip() args[op] = float(t) model = System(**args) sim = Simulator(model) sim.setTerminationCondition(lambda t, m: m.mvi.state.finished) #sim.setVerbose() tn = sim.simulate() print("Simulation finished at time %s" % tn)