import sys import time import uuid sys.path.append("kernel/") sys.path.append("state/") sys.path.append("interface/HUTN") from modelverse_state.main import ModelverseState as MvS from modelverse_kernel.main import ModelverseKernel as MvK from modelverse_kernel.primitives import SleepKernel import modelverse_jit.jit as jit from hutn_compiler.compiler import main as do_compile from pypdevs.minimal import AtomicDEVS, CoupledDEVS, Simulator PROFILE = False import json import random COMPILER_PATH = "interface/HUTN" sys.path.append(COMPILER_PATH) from hutn_compiler.compiler import main as do_compile import os def clean_code(code): if code == "": return code code_fragments = code.split("\n") code_fragments = [i.rstrip() for i in code_fragments if i.strip() != ""] code_fragments = [i.replace(" ", "\t") for i in code_fragments] initial_tabs = min([len(i) - len(i.lstrip("\t")) for i in code_fragments]) code_fragments = [i[initial_tabs:] for i in code_fragments] code_fragments.append("") code = "\n".join(code_fragments) return code.encode('ascii', 'replace') def compile_model(temp_file): compiled = do_compile(temp_file, COMPILER_PATH + "/grammars/modelling.g", "M") return ["__LOCAL__"] + compiled trans_map = { "CN": "create_node", "CE": "create_edge", "CNV": "create_nodevalue", "CD": "create_dict", "RV": "read_value", "RO": "read_outgoing", "RI": "read_incoming", "RE": "read_edge", "RD": "read_dict", "RDN": "read_dict_node", "RDNE": "read_dict_node_edge", "RDE": "read_dict_edge", "RRD": "read_reverse_dict", "RR": "read_root", "RDK": "read_dict_keys", "DE": "delete_edge", "DN": "delete_node", "GC": "purge", } def translate(operation): return trans_map[operation] class MvSState(object): def __init__(self): self.queue = [] self.output = None self.mvs = MvS("bootstrap/bootstrap.m.gz") self.timer = float("inf") class ModelverseState(AtomicDEVS): def __init__(self, read_root, create_node, create_edge, create_nodevalue, create_dict, read_value, read_outgoing, read_incoming, read_edge, read_dict, read_dict_keys, read_dict_edge, read_dict_node, read_dict_node_edge, read_reverse_dict, delete_node, delete_edge, purge): AtomicDEVS.__init__(self, "MvS") self.timings = { "read_root": read_root, "create_node": create_node, "create_edge": create_edge, "create_nodevalue": create_nodevalue, "create_dict": create_dict, "read_value": read_value, "read_outgoing": read_outgoing, "read_incoming": read_incoming, "read_edge": read_edge, "read_dict": read_dict, "read_dict_keys": read_dict_keys, "read_dict_edge": read_dict_edge, "read_dict_node": read_dict_node, "read_dict_node_edge": read_dict_node_edge, "read_reverse_dict": read_reverse_dict, "delete_node": delete_node, "delete_edge": delete_edge, "purge": purge, } self.state = MvSState() self.from_mvk = self.addInPort("from_MvK") self.to_mvk = self.addOutPort("to_MvK") def extTransition(self, inputs): self.state.timer -= self.elapsed empty = len(self.state.queue) == 0 self.state.queue.extend(inputs[self.from_mvk]) if empty: # First message, so set the timer # And already compute the result so it is ready to output self.state.output = [] self.state.timer = 0.0 for v in self.state.queue[0]: f = getattr(self.state.mvs, translate(v[0])) start = time.time() self.state.output.append(f(*v[1])) if PROFILE: print("%s: %.17f" % (translate(v[0]), time.time() - start)) self.state.timer += self.timings[translate(v[0])] else: # Just append the message to process pass return self.state def outputFnc(self): return {self.to_mvk: [self.state.output]} def intTransition(self): self.state.queue.pop(0) self.state.output = [] if len(self.state.queue) > 0: self.state.timer = 0.0 # Value contains a list of operations to do # So do them and calculate how long it takes for v in self.state.queue[0]: f = getattr(self.state.mvs, translate(v[0])) start = time.time() self.state.output.append(f(*v[1])) if PROFILE: print("%s: %.17f" % (translate(v[0]), time.time() - start)) self.state.timer += self.timings[translate(v[0])] else: self.state.timer = float("inf") return self.state def timeAdvance(self): return self.state.timer class MvKState(object): def __init__(self, rule_generation): self.mvk = None self.waiting = False self.inputs = {} self.outputs = {} self.tasks = [] self.reply = None self.phase = None self.commands = None self.root = None self.current_task = None self.loaded_primitives = False self.execution_counter = 0 self.rule_generation = rule_generation self.current_time = 0.0 self.start_task_time = 0.0 self.execution_rounds = 0 def __str__(self): return "\nMvK: %s\n" % self.mvk + \ "waiting: %s\n" % self.waiting + \ "inputs: %s\n" % self.inputs + \ "outputs: %s\n" % self.outputs + \ "tasks: %s\n" % self.tasks + \ "reply: %s\n" % self.reply + \ "phase: %s\n" % self.phase + \ "commands: %s\n" % self.commands + \ "root: %s\n" % self.root + \ "current task: %s\n" % self.current_task + \ "execution counter: %s\n" class ModelverseKernel(AtomicDEVS): def __init__(self, time_per_phase, rule_generation): AtomicDEVS.__init__(self, "MvK") self.state = MvKState(rule_generation) self.from_mvi = self.addInPort("from_MvI") self.from_mvs = self.addInPort("from_MvS") self.to_mvi = self.addOutPort("to_MvI") self.to_mvs = self.addOutPort("to_MvS") self.time_per_phase = time_per_phase def extTransition(self, inputs): self.state.current_time += self.elapsed if self.from_mvi in inputs: # Got input from MvI, so we queue it for inp in inputs[self.from_mvi]: taskname = inp[0] data = inp[1] if data is not None: self.state.inputs.setdefault(taskname, []).extend(data) else: self.state.outputs.setdefault(taskname, []).append(None) if self.from_mvs in inputs: # Got input from MvS, so we can continue processing for mvs_input in inputs[self.from_mvs]: if self.state.mvk is None: # No MvK, so set it with the root we have just received (or should have received) self.state.root = mvs_input[0] self.state.mvk = MvK(self.state.root) self.state.mvk.jit.set_function_body_compiler(jit.compile_function_body_fast) else: self.state.reply = mvs_input self.state.waiting = False return self.state def intTransition(self): self.state.current_time += self.timeAdvance() was_empty = len(self.state.tasks) == 0 if self.state.commands is not None: self.state.commands = None return self.state if self.state.mvk is not None: self.state.mvk.returnvalue = None if self.state.mvk is None: # Initializing self.state.waiting = True elif not self.state.loaded_primitives: commands = self.state.mvk.execute_yields("", "load_primitives", [], self.state.reply) if commands is None: self.state.loaded_primitives = True self.state.reply = None else: self.state.waiting = True self.state.commands = commands else: # Are initialized and have work to do if len(self.state.tasks) == 0: # Read out new set of tasks first if self.state.reply is None: self.state.execution_rounds += 1 commands = [("RDK", [self.state.root])] if self.state.execution_rounds > 1000: commands.append(("GC", [])) self.state.execution_rounds = 0 else: self.state.tasks = self.state.reply[0] commands = None elif self.state.phase == "init_task": if self.state.reply is None: commands = [("RV", [self.state.tasks[0]])] else: self.state.current_task = self.state.reply[0] self.state.start_task_time = self.state.current_time if self.state.current_task.startswith("__"): # Don't process this task and force termination of task self.state.phase = "output" commands = None elif self.state.phase == "input": # Process inputs if self.state.inputs.get(self.state.current_task, None): value = self.state.inputs[self.state.current_task][0] start = time.time() commands = self.state.mvk.execute_yields(self.state.current_task, "set_input", [value], self.state.reply) #print("EXECUTEYIELDSI %s %.17f" % (self.state.current_task, time.time() - start)) #print(commands) if PROFILE: print("rule_generation: %.17f" % ((time.time() - start))) #self.state.rule_generation = time.time() - start self.state.mvk.returnvalue = None if commands is None: self.state.inputs[self.state.current_task].pop(0) else: commands = None elif self.state.phase == "computation": try: start = time.time() commands = self.state.mvk.execute_yields(self.state.current_task, "execute_rule", [], self.state.reply) #print("EXECUTEYIELDS %s %.17f" % (self.state.current_task, time.time() - start)) #print(commands) if PROFILE: print("rule_generation: %.17f" % ((time.time() - start))) except SleepKernel: commands = None self.state.mvk.success = False else: self.state.mvk.success = True elif self.state.phase == "output": start = time.time() commands = self.state.mvk.execute_yields(self.state.current_task, "get_output", [], self.state.reply) #print("EXECUTEYIELDSO %s %.17f" % (self.state.current_task, time.time() - start)) #print(commands) if PROFILE: print("rule_generation: %.17f" % ((time.time() - start))) #self.state.rule_generation = time.time() - start else: raise Exception("Phase: " + str(self.state.phase)) # Advance phase if commands is None: if was_empty: self.state.phase = "init_task" elif self.state.phase == "init_task": self.state.phase = "input" elif self.state.phase == "input": self.state.phase = "computation" elif self.state.phase == "computation": if (not self.state.mvk.success) or (self.state.current_time - self.state.start_task_time > self.time_per_phase): self.state.phase = "output" elif self.state.phase == "output": self.state.tasks.pop(0) self.state.phase = "init_task" self.state.waiting = False self.state.reply = None else: self.state.waiting = True # Send the commands to the MvS self.state.commands = commands return self.state def outputFnc(self): outputs = {} if self.state.mvk is None: # Ask the root first outputs[self.to_mvs] = [[("RR", [])]] elif self.state.waiting: outputs[self.to_mvs] = [self.state.commands] if self.state.mvk and self.state.mvk.returnvalue is not None: outputs[self.to_mvi] = [(self.state.current_task, self.state.mvk.returnvalue)] return outputs def timeAdvance(self): if self.state.commands is not None: return self.state.rule_generation elif self.state.waiting: return float("inf") elif self.state.mvk is None: return 0 else: return 0 class MvIState(): def __init__(self): self.operations = [] self.additional_operations = [] self.keyed_operations = {} self.task_to_spawner = {} self.task_to_operation = {} self.output = {} self.blocked = True self.finished = False self.send_operations = {} class ModelverseInterface(AtomicDEVS): def __init__(self, taskname, operations, finish_on, additional_operations=[], keyed_operations={}): AtomicDEVS.__init__(self, "MvI_%s" % taskname) self.state = MvIState() if taskname == "task_manager": self.state.blocked = False self.state.operations = operations self.state.additional_operations = additional_operations self.state.keyed_operations = keyed_operations self.state.create_additional_task = [] self.taskname = taskname self.finish_on = finish_on self.to_mvk = self.addOutPort("to_MvK") self.from_mvk = self.addInPort("from_MvK") def intTransition(self): self.state.create_additional_task = [] if not self.state.send_operations: if self.state.operations[0] is not None: self.state.operations.pop(0) else: for k in self.state.send_operations.keys(): if self.state.send_operations[k]: self.state.send_operations[k][0] = None self.state.blocked = True return self.state def extTransition(self, inputs): for inp in inputs[self.from_mvk]: print(inp) self.state.blocked = False self.state.output.setdefault(inp[0], []).append(inp[1]) if inp[0] == self.taskname and inp[1] == self.finish_on: self.state.finished = True elif inp[0] == self.taskname and self.state.operations[0] is None: # We have to block for now, and modify a model first... prev_output = self.state.output[inp[0]][-1] if prev_output.startswith("Please edit this model before sending next input: "): _, model_name = prev_output.split("Please edit this model before sending next input: ", 1) new_taskname = str(uuid.uuid4()) self.state.send_operations[new_taskname] = [[], ["admin"], ["admin"], ["quiet"], ["model_modify", model_name, ""]] + self.state.additional_operations[0] + [["exit"], ["exit"]] self.state.create_additional_task.append(new_taskname) self.state.task_to_operation[new_taskname] = None self.state.task_to_spawner[new_taskname] = None elif prev_output.startswith("Spawned activity on task: "): _, task_name = prev_output.split("Spawned activity on task: ", 1) self.state.blocked = True self.state.task_to_spawner[task_name] = None elif prev_output.startswith("Finished task: "): self.state.blocked = True elif " : " in prev_output: task_name, _ = prev_output.split(" : ", 1) self.state.blocked = True self.state.task_to_spawner[task_name] = None self.state.operations.insert(0, None) elif prev_output == "Success": self.state.operations = [["echo", "FINISHED"]] self.state.blocked = False elif inp[0] != self.taskname: # Got some output on another task # If the task is not registered yet, it is likely not important to communicate with it, so ignore it #print("Send operations: " + str(self.state.send_operations)) if inp[0] in self.state.send_operations: self.state.send_operations[inp[0]].pop(0) if inp[1].startswith("Please edit this model before sending next input: "): _, model_name = inp[1].split("Please edit this model before sending next input: ", 1) new_taskname = str(uuid.uuid4()) self.state.send_operations[new_taskname] = [[], ["admin"], ["admin"], ["quiet"], ["model_modify", model_name, ""]] + self.state.keyed_operations.get(self.state.task_to_operation[inp[0]], []) + [["exit"], ["exit"]] #print("DO EXIT2") self.state.create_additional_task.append(new_taskname) self.state.task_to_spawner[new_taskname] = inp[0] elif inp[1].startswith("Please perform manual operation "): _, op_name = inp[1].split("Please perform manual operation ", 1) self.state.task_to_operation[inp[0]] = op_name[1:-1] if inp[0] in self.state.send_operations and len(self.state.send_operations[inp[0]]) == 0: del self.state.send_operations[inp[0]] # At the end of these operations, so finish up! if inp[0] in self.state.task_to_spawner: if self.state.task_to_spawner[inp[0]] is not None: self.state.keyed_operations.pop(self.state.task_to_operation[self.state.task_to_spawner[inp[0]]], None) self.state.additional_operations.insert(0, []) self.state.send_operations[self.state.task_to_spawner[inp[0]]] = ["__continue__"] else: self.state.additional_operations.pop(0) self.state.operations.pop(0) return self.state def outputFnc(self): if self.state.send_operations: outp = [] if self.state.create_additional_task: outp.append(('task_manager', self.state.create_additional_task)) for k, v in self.state.send_operations.items(): if v and v[0] is not None: outp.append((k, v[0])) if outp: print("SEND " + str(outp)) return {self.to_mvk: outp} else: return {} elif self.state.operations and self.state.operations[0] is not None: print("SEND " + str([(self.taskname, self.state.operations[0])])) return {self.to_mvk: [(self.taskname, self.state.operations[0])]} else: return {} def timeAdvance(self): if self.state.blocked: return float("inf") else: return 0.0 class NetworkState(object): def __init__(self): self.processing = [] self.timer = float("inf") class Network(AtomicDEVS): def __init__(self, name, latency, bandwidth): AtomicDEVS.__init__(self, name) self.state = NetworkState() self.input_port = self.addInPort("input_port") self.output_port = self.addOutPort("output_port") self.latency = latency self.bandwidth = bandwidth def intTransition(self): self.state.processing.pop(0) if self.state.processing: self.state.timer = (len(json.dumps(self.state.processing[0])) * 8 / float(self.bandwidth) + self.latency) else: self.state.timer = float("inf") return self.state def extTransition(self, inputs): self.state.timer -= self.elapsed if self.state.timer == float("inf"): self.state.timer = 0 for v in inputs[self.input_port]: self.state.processing.append(v) # NOTE data is in bytes, while bandwidth is in bits, so multiply by 8 if len(self.state.processing) > 0: self.state.timer = (len(json.dumps(self.state.processing[0])) * 8 / float(self.bandwidth) + self.latency) return self.state def outputFnc(self): return {self.output_port: [self.state.processing[0]]} def timeAdvance(self): return self.state.timer class System(CoupledDEVS): def __init__(self, taskname, operations, mvi_additional, mvi_keyed, finish_on, rule_generation, time_per_phase, mvi2mvk_latency, mvi2mvk_bandwidth, mvk2mvs_latency, mvk2mvs_bandwidth, mvs2mvk_latency, mvs2mvk_bandwidth, mvk2mvi_latency, mvk2mvi_bandwidth, read_root, create_node, create_edge, create_nodevalue, create_dict, read_value, read_outgoing, read_incoming, read_edge, read_dict, read_dict_keys, read_dict_edge, read_dict_node, read_dict_node_edge, read_reverse_dict, delete_node, delete_edge, purge): CoupledDEVS.__init__(self, "System") self.mvi_manager = self.addSubModel(ModelverseInterface(\ taskname = "task_manager", operations = [[taskname]], finish_on = None, )) self.mvi = self.addSubModel(ModelverseInterface(\ taskname = taskname, operations = operations, finish_on = finish_on, additional_operations = mvi_additional, keyed_operations = mvi_keyed, )) self.mvk = self.addSubModel(ModelverseKernel(\ time_per_phase = time_per_phase, rule_generation = rule_generation, )) self.mvs = self.addSubModel(ModelverseState(\ read_root = read_root, create_node = create_node, create_edge = create_edge, create_nodevalue = create_nodevalue, create_dict = create_dict, read_value = read_value, read_outgoing = read_outgoing, read_incoming = read_incoming, read_edge = read_edge, read_dict = read_dict, read_dict_keys = read_dict_keys, read_dict_edge = read_dict_edge, read_dict_node = read_dict_node, read_dict_node_edge = read_dict_node_edge, read_reverse_dict = read_reverse_dict, delete_node = delete_node, delete_edge = delete_edge, purge = purge, )) self.mvi2mvk = self.addSubModel(Network(\ name = "mvi2mvk", latency = mvi2mvk_latency, bandwidth = mvi2mvk_bandwidth, )) self.mvk2mvs = self.addSubModel(Network(\ name = "mvk2mvs", latency = mvk2mvs_latency, bandwidth = mvk2mvs_bandwidth, )) self.mvs2mvk = self.addSubModel(Network(\ name = "mvs2mvk", latency = mvs2mvk_latency, bandwidth = mvs2mvk_bandwidth, )) self.mvk2mvi = self.addSubModel(Network(\ name = "mvk2mvi", latency = mvk2mvi_latency, bandwidth = mvk2mvi_bandwidth, )) self.connectPorts(self.mvi_manager.to_mvk, self.mvk.from_mvi) self.connectPorts(self.mvi.to_mvk, self.mvi2mvk.input_port) self.connectPorts(self.mvi2mvk.output_port, self.mvk.from_mvi) self.connectPorts(self.mvk.to_mvs, self.mvk2mvs.input_port) self.connectPorts(self.mvk2mvs.output_port, self.mvs.from_mvk) self.connectPorts(self.mvs.to_mvk, self.mvs2mvk.input_port) self.connectPorts(self.mvs2mvk.output_port, self.mvk.from_mvs) self.connectPorts(self.mvk.to_mvi, self.mvk2mvi.input_port) self.connectPorts(self.mvk2mvi.output_port, self.mvi.from_mvk) def simulate(supplied_args): random.seed(1) taskname = "test_task" operations = json.loads(open("model/operations", 'r').read()) #operations = json.loads(open("model/operations_simple", 'r').read()) additional_operations = [[], # revise_req [], # revise_environment [], # revise_plant [], # revise_control [], # revise_query [], # revise_architecture [], # make_initial_models [["instantiate_edge", "Association", "PLANT2EPN_link", "PW_Plant/State", "Encapsulated_PetriNet/Place"], ["instantiate_edge", "Association", "PLANT2EPN_tlink", "PW_Plant/Transition", "Encapsulated_PetriNet/Transition"]], # plant_to_EPN [["instantiate_edge", "Association", "CTRL2EPN_link", "PW_Control/State", "Encapsulated_PetriNet/Place"], ["instantiate_edge", "Association", "CTRL2EPN_tlink", "PW_Control/Transition", "Encapsulated_PetriNet/Transition"]], # control_to_EPN [["instantiate_edge", "Association", "ENV2EPN_link", "PW_Environment/Event", "Encapsulated_PetriNet/Place"]], # environment_to_EPN [["instantiate_edge", "Association", "EPN2PN_place_link", "Encapsulated_PetriNet/Place", "PetriNet/Place"], ["instantiate_edge", "Association", "EPN2PN_transition_link", "Encapsulated_PetriNet/Transition", "PetriNet/Transition"]], # EPN_to_PN [], # match [], # reachability [], # bfs [], # merge ] #additional_operations = [[]] keyed_operations = {"models/revise_req": [["upload"], compile_model("models/requirements_model.mvc")], "models/revise_plant": [["upload"], compile_model("models/plant_model.mvc")], "models/revise_environment": [["upload"], compile_model("models/environment_model.mvc")], "models/revise_control": [["upload"], compile_model("models/control_model.mvc")], "models/revise_query": [["upload"], compile_model("models/query_model.mvc")], "models/revise_architecture": [["upload"], compile_model("models/architecture_model.mvc")], } #keyed_operations = {"models/modify_lang": [["instantiate_node", "PN/Class", ""]]} finish_on = "FINISHED" args = { "taskname": taskname, "operations": operations, "finish_on": finish_on, "mvi_additional": additional_operations, "mvi_keyed": keyed_operations, "mvi2mvk_latency": 0.0000001, "mvi2mvk_bandwidth": 50000000000, "mvk2mvs_latency": 0.0000001, "mvk2mvs_bandwidth": 50000000000, "mvs2mvk_latency": 0.0000001, "mvs2mvk_bandwidth": 50000000000, "mvk2mvi_latency": 0.0000001, "mvk2mvi_bandwidth": 50000000000, "time_per_phase": 0.05, # Automatically filled in from calibration results, just here to prevent crashes (results for my UA desktop) "read_root": 0.00001406669616699, "create_node": 0.00000379181167487, "create_edge": 0.00000601282282066, "create_nodevalue": 0.00000501364247391, "create_dict": 0.00001028065706205, "read_value": 0.00000388661630500, "read_outgoing": 0.00000520600098073, "read_incoming": 0.00000645903181994, "read_edge": 0.00000449162172644, "read_dict": 0.00000460127038355, "read_dict_keys": 0.00001678063432883, "read_dict_node": 0.00001020808859528, "read_dict_edge": 0.00000642558526942, "read_dict_node_edge": 0.0, "read_reverse_dict": 0.00002557890755790, "delete_node": 0.00004755891187096, "delete_edge": 0.00000683382081240, "rule_generation": 0.00001543215873893, "purge": 7.0, } with open("calibration/averages", 'r') as param_file: for l in param_file: op, t = l.split(": ") op = op.strip() args[op] = float(t) local_args = dict(args) local_args.update(supplied_args) model = System(**local_args) sim = Simulator(model) sim.setTerminationCondition(lambda t, m: m.mvi.state.finished) #sim.setVerbose() start = time.time() tn = sim.simulate() return (tn, time.time() - start) if __name__ == "__main__": import sys args = {} i = 1 while i < len(sys.argv): args[sys.argv[i]] = float(sys.argv[i+1]) i += 2 ts, te = simulate(args) print("Execution time %s" % ts) print("Simulation time %s" % te)