import sys sys.path.append("kernel/") sys.path.append("state/") sys.path.append("interface/HUTN") from modelverse_state.main import ModelverseState as MvS from modelverse_kernel.main import ModelverseKernel as MvK from hutn_compiler.compiler import main as do_compile from hutn_compiler.linker import link from pypdevs.DEVS import AtomicDEVS, CoupledDEVS from pypdevs.simulator import Simulator import json import random random.seed(1) max_used_id = 0 def get_object_constructor(code, filename): with open(".code.alc", "w") as f: f.write(code) f.flush() constructors = do_compile(".code.alc", "interface/HUTN/grammars/actionlanguage.g", "CS") constructors = [3, "upload", filename, str(random.random()), True] + constructors + [False] return constructors def link_code(main_function, taskname, objects): return [3, "link_and_load"] + objects + ["", main_function] def translate(operation): return { "CN": "create_node", "CE": "create_edge", "CNV": "create_nodevalue", "CD": "create_dict", "RV": "read_value", "RO": "read_outgoing", "RI": "read_incoming", "RE": "read_edge", "RD": "read_dict", "RDN": "read_dict_node", "RDNE": "read_dict_node_edge", "RDE": "read_dict_edge", "RRD": "read_reverse_dict", "RR": "read_root", "RDK": "read_dict_keys", "DE": "delete_edge", "DN": "delete_node", }[operation] class MvSState(object): def __init__(self): self.queue = [] self.output = None self.mvs = MvS("bootstrap/bootstrap.m.gz") self.timer = float("inf") class ModelverseState(AtomicDEVS): def __init__(self, read_root, create_node, create_edge, create_nodevalue, create_dict, read_value, read_outgoing, read_incoming, read_edge, read_dict, read_dict_keys, read_dict_edge, read_dict_node, read_dict_node_edge, read_reverse_dict, delete_node, delete_edge): AtomicDEVS.__init__(self, "MvS") self.timings = { "read_root": read_root, "create_node": create_node, "create_edge": create_edge, "create_nodevalue": create_nodevalue, "create_dict": create_dict, "read_value": read_value, "read_outgoing": read_outgoing, "read_incoming": read_incoming, "read_edge": read_edge, "read_dict": read_dict, "read_dict_keys": read_dict_keys, "read_dict_edge": read_dict_edge, "read_dict_node": read_dict_node, "read_dict_node_edge": read_dict_node_edge, "read_reverse_dict": read_reverse_dict, "delete_node": delete_node, "delete_edge": delete_edge, } self.state = MvSState() self.from_mvk = self.addInPort("from_MvK") self.to_mvk = self.addOutPort("to_MvK") def extTransition(self, inputs): self.state.timer -= self.elapsed empty = len(self.state.queue) == 0 self.state.queue.extend(inputs[self.from_mvk]) if empty: # First message, so set the timer # And already compute the result so it is ready to output self.state.output = [] self.state.timer = 0.0 for v in self.state.queue[0]: self.state.output.append(getattr(self.state.mvs, translate(v[0]))(*v[1])) self.state.timer += self.timings[translate(v[0])]() else: # Just append the message to process pass return self.state def outputFnc(self): return {self.to_mvk: [self.state.output]} def intTransition(self): self.state.queue.pop(0) self.state.output = [] if len(self.state.queue) > 0: self.state.timer = 0.0 # Value contains a list of operations to do # So do them and calculate how long it takes for v in self.state.queue[0]: self.state.output.append(getattr(self.state.mvs, translate(v[0]))(*v[1])[0]) self.state.timer += self.timings[translate(v[0])]() else: self.state.timer = float("inf") return self.state def timeAdvance(self): return self.state.timer class MvKState(object): def __init__(self): self.mvk = None self.waiting = False self.inputs = {} self.outputs = {} self.tasks = [] self.reply = None self.phase = None self.commands = None self.root = None self.current_task = None self.loaded_primitives = False self.execution_counter = 0 self.all_failed = True def __str__(self): return "\nMvK: %s\n" % self.mvk + \ "waiting: %s\n" % self.waiting + \ "inputs: %s\n" % self.inputs + \ "outputs: %s\n" % self.outputs + \ "tasks: %s\n" % self.tasks + \ "reply: %s\n" % self.reply + \ "phase: %s\n" % self.phase + \ "commands: %s\n" % self.commands + \ "root: %s\n" % self.root + \ "current task: %s\n" % self.current_task + \ "execution counter: %s\n" class ModelverseKernel(AtomicDEVS): def __init__(self, rules_per_phase): AtomicDEVS.__init__(self, "MvK") self.state = MvKState() self.from_mvi = self.addInPort("from_MvI") self.from_mvs = self.addInPort("from_MvS") self.to_mvi = self.addOutPort("to_MvI") self.to_mvs = self.addOutPort("to_MvS") self.rules_per_phase = rules_per_phase def extTransition(self, inputs): if self.from_mvi in inputs: # Got input from MvI, so we queue it for inp in inputs[self.from_mvi]: taskname = inp[0] data = inp[1] if data is not None: self.state.inputs.setdefault(taskname, []).extend(data) else: self.state.outputs.setdefault(taskname, []).append(None) if self.from_mvs in inputs: # Got input from MvS, so we can continue processing for mvs_input in inputs[self.from_mvs]: mvs_stripped = [i[0] for i in mvs_input] if self.state.mvk is None: # No MvK, so set it with the root we have just received (or should have received) self.state.root = mvs_stripped[0] self.state.mvk = MvK(self.state.root) else: self.state.reply = mvs_stripped self.state.waiting = False return self.state def intTransition(self): was_empty = len(self.state.tasks) == 0 if self.state.commands is not None: self.state.commands = None return self.state if self.state.mvk is None: # Initializing self.state.waiting = True elif not self.state.loaded_primitives: commands = self.state.mvk.execute_yields("", "load_primitives", [], self.state.reply) if commands is None: self.state.loaded_primitives = True self.state.reply = None else: self.state.waiting = True self.state.commands = commands else: # Are initialized and have work to do if len(self.state.tasks) == 0: # Read out new set of tasks first if self.state.reply is None: commands = [("RDK", [self.state.root])] else: self.state.tasks = self.state.reply[0] commands = None elif self.state.phase == "init_task": if self.state.reply is None: commands = [("RV", [self.state.tasks[0]])] else: self.state.current_task = self.state.reply[0] if self.state.current_task.startswith("__"): # Don't process this task and force termination of task self.state.phase = "output" commands = None elif self.state.phase == "input": # Process inputs if self.state.inputs.get(self.state.current_task, None): element_type, value = self.state.inputs[self.state.current_task][0] commands = self.state.mvk.execute_yields(self.state.current_task, "set_input", [element_type, value], self.state.reply) if commands is None: self.state.inputs[self.state.current_task].pop(0) else: commands = None elif self.state.phase == "computation": commands = self.state.mvk.execute_yields(self.state.current_task, "execute_rule", [], self.state.reply) elif self.state.phase == "output": commands = self.state.mvk.execute_yields(self.state.current_task, "get_output", [], self.state.reply) else: raise Exception("Phase: " + str(self.state.phase)) # Advance phase if commands is None: if was_empty: self.state.phase = "init_task" elif self.state.phase == "init_task": self.state.phase = "input" elif self.state.phase == "input": self.state.phase = "computation" elif self.state.phase == "computation": if not self.state.mvk.success or (self.state.execution_counter > self.rules_per_phase): self.state.phase = "output" self.state.execution_counter = 0 else: self.state.execution_counter += 1 elif self.state.phase == "output": self.state.tasks.pop(0) self.state.phase = "init_task" self.state.waiting = False self.state.reply = None else: self.state.waiting = True # Send the commands to the MvS self.state.commands = commands return self.state def outputFnc(self): if self.state.mvk is None: # Ask the root first return {self.to_mvs: [[("RR", [])]]} elif self.state.waiting: return {self.to_mvs: [self.state.commands]} return {} def timeAdvance(self): if self.state.phase == "init_task" and self.state.all_failed: # Make this a parameter return 200 if self.state.commands is not None: return 0 elif self.state.waiting: return float("inf") elif self.state.mvk is None: return 0 else: return 0 class MvIState(): def __init__(self): self.operations = [] self.output = [] self.processing = [] self.memory = {} self.init = True class ModelverseInterface(AtomicDEVS): def __init__(self, taskname, operations): AtomicDEVS.__init__(self, "MvI_%s" % taskname) self.state = MvIState() self.state.operations = operations self.taskname = taskname self.to_mvk = self.addOutPort("to_MvK") self.from_mvk = self.addInPort("from_MvK") def intTransition(self): self.state.init = False while self.state.operations: i = self.state.operations[0] if isinstance(i, int) and i not in self.state.memory: break self.state.operations.pop(0) return self.state def extTransition(self, inputs): for inp in inputs[self.from_mvk]: if inp["value"] == "None" and isinstance(self.state.processing[0], int) and self.state.processing[0] not in self.state.memory: self.state.memory[self.state.processing.pop(0)] = int(inp["id"]) else: self.state.output.append(inp) return self.state def outputFnc(self): return {self.to_mvk: [(self.taskname, self.state.operations)]} def timeAdvance(self): if self.state.init: return 0 elif self.state.processing and (not isinstance(self.state.processing[0], int) or self.state.processing[0] in self.state.memory): return 0 else: return float("inf") class NetworkState(object): def __init__(self): self.processing = [] self.timer = float("inf") class Network(AtomicDEVS): def __init__(self, name, latency, bandwidth): AtomicDEVS.__init__(self, name) self.state = NetworkState() self.input_port = self.addInPort("input_port") self.output_port = self.addOutPort("output_port") self.latency = latency self.bandwidth = bandwidth def intTransition(self): self.state.processing.pop(0) if self.state.processing: self.state.timer = int(len(self.state.processing[0]) / float(self.bandwidth) + self.latency) else: self.state.timer = float("inf") return self.state def extTransition(self, inputs): self.state.timer -= self.elapsed if self.state.timer == float("inf"): self.state.timer = 0 for v in inputs[self.input_port]: self.state.processing.append(json.dumps(v)) if len(self.state.processing) > 0: self.state.timer = int(len(self.state.processing[0]) / float(self.bandwidth) + self.latency) return self.state def outputFnc(self): return {self.output_port: [json.loads(self.state.processing[0])]} def timeAdvance(self): return self.state.timer class System(CoupledDEVS): def __init__(self, taskname, operations, rules_per_phase, mvi2mvk_latency, mvi2mvk_bandwidth, mvk2mvs_latency, mvk2mvs_bandwidth, mvs2mvk_latency, mvs2mvk_bandwidth, mvk2mvi_latency, mvk2mvi_bandwidth, read_root, create_node, create_edge, create_nodevalue, create_dict, read_value, read_outgoing, read_incoming, read_edge, read_dict, read_dict_keys, read_dict_edge, read_dict_node, read_dict_node_edge, read_reverse_dict, delete_node, delete_edge): CoupledDEVS.__init__(self, "System") self.mvi_manager = self.addSubModel(ModelverseInterface(\ taskname = "task_manager", operations = [taskname], )) self.mvi = self.addSubModel(ModelverseInterface(\ taskname = taskname, operations = operations )) self.mvk = self.addSubModel(ModelverseKernel(\ rules_per_phase = rules_per_phase )) self.mvs = self.addSubModel(ModelverseState(\ read_root = read_root, create_node = create_node, create_edge = create_edge, create_nodevalue = create_nodevalue, create_dict = create_dict, read_value = read_value, read_outgoing = read_outgoing, read_incoming = read_incoming, read_edge = read_edge, read_dict = read_dict, read_dict_keys = read_dict_keys, read_dict_edge = read_dict_edge, read_dict_node = read_dict_node, read_dict_node_edge = read_dict_node_edge, read_reverse_dict = read_reverse_dict, delete_node = delete_node, delete_edge = delete_edge )) self.mvi2mvk = self.addSubModel(Network(\ name = "mvi2mvk", latency = mvi2mvk_latency, bandwidth = mvi2mvk_bandwidth )) self.mvk2mvs = self.addSubModel(Network(\ name = "mvk2mvs", latency = mvk2mvs_latency, bandwidth = mvk2mvs_bandwidth )) self.mvs2mvk = self.addSubModel(Network(\ name = "mvs2mvk", latency = mvs2mvk_latency, bandwidth = mvs2mvk_bandwidth )) self.mvk2mvi = self.addSubModel(Network(\ name = "mvk2mvi", latency = mvk2mvi_latency, bandwidth = mvk2mvi_bandwidth )) self.connectPorts(self.mvi_manager.to_mvk, self.mvk.from_mvi) self.connectPorts(self.mvi.to_mvk, self.mvi2mvk.input_port) self.connectPorts(self.mvi2mvk.output_port, self.mvk.from_mvi) self.connectPorts(self.mvk.to_mvs, self.mvk2mvs.input_port) self.connectPorts(self.mvk2mvs.output_port, self.mvs.from_mvk) self.connectPorts(self.mvs.to_mvk, self.mvs2mvk.input_port) self.connectPorts(self.mvs2mvk.output_port, self.mvk.from_mvs) self.connectPorts(self.mvk.to_mvi, self.mvk2mvi.input_port) self.connectPorts(self.mvk2mvi.output_port, self.mvi.from_mvk) files = ["bootstrap/primitives.alc"] code = \ """ include "primitives.alh" Void function main(): \tlog("Executed the code!") \treturn ! """ taskname = "test_task" operations = [get_object_constructor(open(f, 'r').read(), str(f)) for f in files] + \ get_object_constructor(code, "main") + \ link_code("main", taskname, files + ["code.alc"]) print("Generated operations:") print(operations) args = { "taskname": taskname, "operations": operations, "mvi2mvk_latency": 1, "mvi2mvk_bandwidth": 2000, "mvk2mvs_latency": 1, "mvk2mvs_bandwidth": 2000, "mvs2mvk_latency": 1, "mvs2mvk_bandwidth": 2000, "mvk2mvi_latency": 1, "mvk2mvi_bandwidth": 2000, "read_root": lambda: 1, "create_node": lambda: 1, "create_edge": lambda: 1, "create_nodevalue": lambda: 1, "create_dict": lambda: 1, "read_value": lambda: 1, "read_outgoing": lambda: 1, "read_incoming": lambda: 1, "read_edge": lambda: 1, "read_dict": lambda: 1, "read_dict_keys": lambda: 1, "read_dict_node": lambda: 1, "read_dict_edge": lambda: 1, "read_dict_node_edge": lambda: 1, "read_reverse_dict": lambda: 1, "delete_node": lambda: 1, "delete_edge": lambda: 1, "rules_per_phase": 200, } model = System(**args) sim = Simulator(model) sim.setTerminationTime(900000) #sim.setVerbose() sim.simulate()