include "primitives.alh" include "modelling.alh" include "object_operations.alh" include "library.alh" include "conformance_scd.alh" include "mini_modify.alh" Void function main(): Element model String verify_result while (True): output("Which model do you want to execute with CBD semantics?") model = import_node(input()) if (element_eq(model, read_root())): output("Could not find model; aborting") elif (element_neq(model["metamodel"], import_node("models/CausalBlockDiagrams_Design"))): output("Not a CBD design model; aborting") else: verify_result = conformance_scd(model) if (verify_result == "OK"): output("Model OK!") log("Model conforms!") execute_cbd(model) else: output("Non-conforming model: " + verify_result) Element function retype_to_runtime(design_model : Element): Element runtime_model Element all_blocks Element all_links String mm_type_name String element_name String attr_name String attr_value String attribute String src String dst String time Element all_attributes runtime_model = instantiate_model(import_node("models/CausalBlockDiagrams_Runtime")) all_blocks = allInstances(design_model, "Block") while (list_len(all_blocks) > 0): element_name = set_pop(all_blocks) mm_type_name = reverseKeyLookup(design_model["metamodel"]["model"], dict_read_node(design_model["type_mapping"], design_model["model"][element_name])) element_name = instantiate_node(runtime_model, mm_type_name, element_name) if (mm_type_name == "ConstantBlock"): instantiate_attribute(runtime_model, element_name, "value", read_attribute(design_model, element_name, "value")) // Don't merge this together with the block conversion, as the destination block might not exist yet! all_links = allInstances(design_model, "Link") while (read_nr_out(all_links) > 0): element_name = set_pop(all_links) src = reverseKeyLookup(design_model["model"], read_edge_src(design_model["model"][element_name])) dst = reverseKeyLookup(design_model["model"], read_edge_dst(design_model["model"][element_name])) instantiate_link(runtime_model, "Link", element_name, src, dst) all_links = allInstances(design_model, "InitialCondition") while (read_nr_out(all_links) > 0): element_name = set_pop(all_links) src = reverseKeyLookup(design_model["model"], read_edge_src(design_model["model"][element_name])) dst = reverseKeyLookup(design_model["model"], read_edge_dst(design_model["model"][element_name])) instantiate_link(runtime_model, "InitialCondition", element_name, src, dst) return runtime_model! Element function sanitize(new_runtime_model : Element, old_runtime_model : Element): Element all_blocks Element all_links String mm_type_name String element_name String attr_name String attr_value String attribute String time Element all_attributes Integer current_time Integer termination_time all_blocks = allInstances(new_runtime_model, "Block") while (list_len(all_blocks) > 0): element_name = set_pop(all_blocks) mm_type_name = reverseKeyLookup(new_runtime_model["metamodel"]["model"], dict_read_node(new_runtime_model["type_mapping"], new_runtime_model["model"][element_name])) if (dict_in(old_runtime_model["model"], element_name)): if (mm_type_name == "DelayBlock"): instantiate_attribute(new_runtime_model, element_name, "memory", read_attribute(old_runtime_model, element_name, "memory")) instantiate_attribute(new_runtime_model, element_name, "signal", read_attribute(old_runtime_model, element_name, "signal")) else: if (mm_type_name == "DelayBlock"): instantiate_attribute(new_runtime_model, element_name, "memory", 0.0) instantiate_attribute(new_runtime_model, element_name, "signal", 0.0) if (dict_in(old_runtime_model["model"], "time")): current_time = read_attribute(old_runtime_model, "time", "current_time") termination_time = read_attribute(old_runtime_model, "time", "termination_time") else: current_time = 0 termination_time = 10 create_schedule(new_runtime_model, old_runtime_model, current_time) create_schedule(new_runtime_model, old_runtime_model, -1) time = instantiate_node(new_runtime_model, "Time", "time") instantiate_attribute(new_runtime_model, time, "start_time", current_time) instantiate_attribute(new_runtime_model, time, "current_time", current_time) instantiate_attribute(new_runtime_model, time, "termination_time", termination_time) return new_runtime_model! Void function create_schedule(model : Element, old_model : Element, start_time : Integer): Element all_blocks Element visited Element to_visit Element incoming_links String schedule String element_name String link String source String new_schedule Boolean ready all_blocks = allInstances(model, "Block") visited = create_node() to_visit = create_node() if (start_time != -1): schedule = instantiate_node(model, "Schedule", "schedule_init") else: schedule = instantiate_node(model, "Schedule", "schedule_run") while (read_nr_out(all_blocks) > 0): element_name = set_pop(all_blocks) if (bool_not(set_in(visited, element_name))): list_append(to_visit, element_name) while (list_len(to_visit) > 0): element_name = list_read(to_visit, list_len(to_visit) - 1) if (reverseKeyLookup(model["metamodel"]["model"], dict_read_node(model["type_mapping"], model["model"][element_name])) == "DelayBlock"): if (dict_in(old_model["model"], element_name)): incoming_links = create_node() else: incoming_links = allIncomingAssociationInstances(model, element_name, "InitialCondition") else: incoming_links = allIncomingAssociationInstances(model, element_name, "Link") ready = True while (list_len(incoming_links) > 0): link = set_pop(incoming_links) source = readAssociationSource(model, link) if (bool_not(set_in(visited, source))): list_append(to_visit, source) ready = False if (ready): new_schedule = instantiate_node(model, "Schedule", "") instantiate_link(model, "LinkedBlock", "", schedule, element_name) instantiate_link(model, "NextSchedule", "", schedule, new_schedule) schedule = new_schedule list_delete(to_visit, list_len(to_visit) - 1) set_add(visited, element_name) return ! String function readType(model : Element, name : String): return reverseKeyLookup(model["metamodel"]["model"], dict_read_node(model["type_mapping"], model["model"][name]))! Void function list_CBD(model : Element): Element all_elements String elem String block output("Blocks:") all_elements = allInstances(model, "Block") while (read_nr_out(all_elements) > 0): elem = set_pop(all_elements) output(((" " + elem) + ": ") + readType(model, elem)) output("Links:") all_elements = allInstances(model, "Link") while (read_nr_out(all_elements) > 0): elem = set_pop(all_elements) output(((" " + reverseKeyLookup(model["model"], read_edge_src(model["model"][elem]))) + " --> ") + reverseKeyLookup(model["model"], read_edge_dst(model["model"][elem]))) output("Initial conditions:") all_elements = allInstances(model, "InitialCondition") while (read_nr_out(all_elements) > 0): elem = set_pop(all_elements) output(((" " + reverseKeyLookup(model["model"], read_edge_src(model["model"][elem]))) + " --> ") + reverseKeyLookup(model["model"], read_edge_dst(model["model"][elem]))) output(("Schedule (t = " + cast_v2s(read_attribute(model, "time", "start_time"))) + "):") elem = "schedule_init" while (read_nr_out(allOutgoingAssociationInstances(model, elem, "LinkedBlock")) > 0): block = readAssociationDestination(model, set_pop(allOutgoingAssociationInstances(model, elem, "LinkedBlock"))) output(" " + block) elem = readAssociationDestination(model, set_pop(allOutgoingAssociationInstances(model, elem, "NextSchedule"))) output(("Schedule (t > " + cast_v2s(read_attribute(model, "time", "start_time"))) + "):") elem = "schedule_run" while (read_nr_out(allOutgoingAssociationInstances(model, elem, "LinkedBlock")) > 0): block = readAssociationDestination(model, set_pop(allOutgoingAssociationInstances(model, elem, "LinkedBlock"))) output(" " + block) elem = readAssociationDestination(model, set_pop(allOutgoingAssociationInstances(model, elem, "NextSchedule"))) return ! Void function step_simulation(model : Element): String time String schedule Float signal Element incoming String selected String block String elem String blocktype String output_string Element delay_blocks time = "time" if (read_attribute(model, time, "current_time") == read_attribute(model, time, "start_time")): schedule = "schedule_init" else: schedule = "schedule_run" delay_blocks = create_node() output_string = "(" while (read_nr_out(allOutgoingAssociationInstances(model, schedule, "LinkedBlock")) > 0): block = readAssociationDestination(model, set_pop(allOutgoingAssociationInstances(model, schedule, "LinkedBlock"))) schedule = readAssociationDestination(model, set_pop(allOutgoingAssociationInstances(model, schedule, "NextSchedule"))) // Execute "block" blocktype = readType(model, block) if (blocktype == "ConstantBlock"): signal = read_attribute(model, block, "value") elif (blocktype == "AdditionBlock"): signal = 0.0 incoming = allIncomingAssociationInstances(model, block, "Link") while (read_nr_out(incoming) > 0): selected = readAssociationSource(model, set_pop(incoming)) signal = signal + cast_s2f(cast_v2s(read_attribute(model, selected, "signal"))) elif (blocktype == "MultiplyBlock"): signal = 1.0 incoming = allIncomingAssociationInstances(model, block, "Link") while (read_nr_out(incoming) > 0): selected = readAssociationSource(model, set_pop(incoming)) signal = signal * cast_s2f(cast_v2s(read_attribute(model, selected, "signal"))) elif (blocktype == "NegatorBlock"): incoming = allIncomingAssociationInstances(model, block, "Link") while (read_nr_out(incoming) > 0): selected = readAssociationSource(model, set_pop(incoming)) signal = float_neg(cast_s2f(cast_v2s(read_attribute(model, selected, "signal")))) elif (blocktype == "InverseBlock"): incoming = allIncomingAssociationInstances(model, block, "Link") while (read_nr_out(incoming) > 0): selected = readAssociationSource(model, set_pop(incoming)) signal = float_division(1.0, cast_s2f(cast_v2s(read_attribute(model, selected, "signal")))) elif (blocktype == "DelayBlock"): if (cast_s2i(cast_v2s(read_attribute(model, time, "current_time"))) == 0): incoming = allIncomingAssociationInstances(model, block, "InitialCondition") while (read_nr_out(incoming) > 0): selected = readAssociationSource(model, set_pop(incoming)) signal = cast_s2f(cast_v2s(read_attribute(model, selected, "signal"))) else: signal = read_attribute(model, block, "memory") set_add(delay_blocks, block) unset_attribute(model, block, "signal") instantiate_attribute(model, block, "signal", signal) output_string = output_string + (((block + " = ") + cast_v2s(signal)) + "; ") output_string = output_string + ")" while (read_nr_out(delay_blocks) > 0): block = set_pop(delay_blocks) // Update memory incoming = allIncomingAssociationInstances(model, block, "Link") while (read_nr_out(incoming) > 0): selected = readAssociationSource(model, set_pop(incoming)) unset_attribute(model, block, "memory") instantiate_attribute(model, block, "memory", cast_s2f(cast_v2s(read_attribute(model, selected, "signal")))) // Increase simulation time Integer new_time new_time = cast_s2i(cast_v2s(read_attribute(model, time, "current_time"))) + 1 unset_attribute(model, time, "current_time") instantiate_attribute(model, time, "current_time", new_time) output(output_string) return ! Void function set_termination_time(model : Element): unset_attribute(model, "time", "termination_time") instantiate_attribute(model, "time", "termination_time", input()) return ! Void function execute_cbd(design_model : Element): String verify_result Element runtime_model Element old_runtime_model String cmd Boolean running log("New model") old_runtime_model = instantiate_model(import_node("models/CausalBlockDiagrams_Runtime")) log("Retype") runtime_model = retype_to_runtime(design_model) log("Sanitize") runtime_model = sanitize(runtime_model, old_runtime_model) log("Finished") running = False while (True): if (running): if (has_input()): cmd = input() else: cmd = "step" else: output("Which operation do you want to execute?") cmd = input() if (cmd == "help"): output("Supported operations:") if (bool_not(running)): output(" step -- do one simulation step") output(" start -- start simulation") output(" modify -- live modelling: modify model structure") output(" list -- list blocks and connections") output(" exit -- select another model") output(" verify -- verify the runtime model") else: output(" pause -- pause simulation") output(" help -- this information") output(" term -- set termination time") elif (cmd == "step"): // Do a simulation step step_simulation(runtime_model) elif (cmd == "term"): set_termination_time(runtime_model) elif (running): if (cmd == "pause"): running = False else: output("Did not understand command!") output("Use 'help' for a list of available options") else: // Not running if (cmd == "list"): list_CBD(runtime_model) elif (cmd == "start"): running = True elif (cmd == "exit"): return! elif (cmd == "modify"): verify_result = "init" while (verify_result != "OK"): modify(design_model) verify_result = conformance_scd(design_model) if (verify_result != "OK"): output("Error in design model: " + verify_result) output("Successfully made modifications to design model!") old_runtime_model = runtime_model runtime_model = retype_to_runtime(design_model) runtime_model = sanitize(runtime_model, old_runtime_model) elif (cmd == "verify"): verify_result = conformance_scd(runtime_model) if (verify_result != "OK"): output("Error in runtime model: " + verify_result) else: output("Runtime model OK!") else: output("Did not understand command!") output("Use 'help' for a list of available options") if (cast_s2i(cast_v2s(read_attribute(runtime_model, "time", "current_time"))) > cast_s2i(cast_v2s(read_attribute(runtime_model, "time", "termination_time")))): running = False