import unittest from utils import run_file, get_constructor, get_model_constructor set_inheritance = [ "Which link in the metamodel is the inheritance link?", "Set inheritance link!", ] join = ["Model name?", "New metamodel?", ] unify = ["Which language do you want to unify?", "Name of this language in the unification", "Second language to unify?", "Name of this language in the unification", ] ramify = ["Name of the RAMified metamodel to create", "Source language?"] transform = ["Which model do you want to transform?", "Which schedule do you want to execute?", ] transform_result_true = ["Transformation result: True"] transform_result_false = ["Transformation result: False"] split = ["Name of the model to split up?", "Name of new metamodel?", "Typename to split?", ] do_instantiate_simple = [ "new", "PetriNets", "abc", "instantiate", "Transition", "t1", "instantiate", "Place", "p1", "attr_add", "p1", "tokens", 5, "instantiate", "Place", "p2", "attr_add", "p2", "tokens", 0, "instantiate", "P2T", "p2t", "p1", "t1", "attr_add", "p2t", "weight", 2, "instantiate", "T2P", "t2p", "t1", "p2", "attr_add", "t2p", "weight", 1] instantiate_node = ["Type to instantiate?", "Name of new element?", "Instantiation successful!"] instantiate_edge = ["Type to instantiate?", "Name of new element?", "Source name?", "Destination name?", "Instantiation successful!"] all_files = [ "pn_interface.alc", "primitives.alc", "object_operations.alc", "conformance_scd.alc", "library.alc", "transform.alc", "model_management.alc", "ramify.alc", "metamodels.alc", "random.alc", "constructors.alc", "modelling.alc", "compilation_manager.alc", ] greeting = ["Welcome to the Model Management Interface, running live on the Modelverse!", "Use 'help' command for a list of possible commands"] new = ["Metamodel to instantiate?", "Name of model?"] prompt = ["Please give your command."] loaded = ["Model loaded, ready for commands!", "Use 'help' command for a list of possible commands"] + prompt load = ["Model to load?"] instantiate = ["Type to instantiate?"] instantiate_name = ["Name of new element?"] instantiate_ok = ["Instantiation successful!"] instantiate_source= ["Source name?"] instantiate_destination = ["Destination name?"] def list_menu(defined): defined.append(("PetriNets", "SimpleClassDiagrams")) defined.append(("SimpleClassDiagrams", "SimpleClassDiagrams")) defined.append(("LTM_bottom", "LTM_bottom")) return ["Found models:", set([" %s : %s" % (m, mm) for m, mm in defined])] def list_model(defined): return ["List of all elements:", set([" %s : %s" % (m, mm) for m, mm in defined])] def read_node(name, t, defs, attrs): return ["Element to read?", "Name: %s" % name, "Type: %s" % t, "Defines attributes:"] + \ ([set([" %s : %s" % (m, mm) for m, mm in defs])] if defs else []) + \ ["Attributes:"] + \ ([set([' "%s" : "%s" = %s' % (m, mm, v) for m, mm, v in attrs])] if attrs else []) def read_edge(name, t, src, dst, defs, attrs): return ["Element to read?", "Name: %s" % name, "Type: %s" % t, "Source: %s" % src, "Destination: %s" % dst, "Defines attributes:"] + \ ([set([" %s : %s" % (m, mm) for m, mm in defs])] if defs else []) + \ ["Attributes:"] + \ ([set([' "%s" : "%s" = %s' % (m, mm, v) for m, mm, v in attrs])] if attrs else []) delete = ["Model to delete?", "Deleted!"] rename = ["Old name?", "New name?", "Rename complete!"] attr_add = ["Which model do you want to assign an attribute to?", "Which attribute do you wish to assign?", "Value of attribute?", "Added attribute!"] attr_del = ["Which model do you want to remove an attribute of?", "Which attribute do you want to delete?", "Attribute deleted!", ] help_root = ["Currently no model is loaded, so your operations are limited to:", " new -- Create a new model and save it for future use" " load -- Load a previously made model", " rename -- Rename a previously made model", " delete -- Delete a previously made model", " list -- Show a list of all stored models", " help -- Show a list of possible commands"] verify_fail_weight= ["Natural number not larger than or equal to zero"] verify_fail_tokens= ["Natural number not larger than or equal to zero"] verify_fail_structure = ["Source of model edge not typed by source of type: p2t"] init = greeting + prompt did_instantiate_simple = init + \ new + \ loaded +\ instantiate_node + \ prompt + \ instantiate_node + \ prompt + \ attr_add + \ prompt + \ instantiate_node + \ prompt + \ attr_add + \ prompt + \ instantiate_edge + \ prompt + \ attr_add + \ prompt + \ instantiate_edge + \ prompt + \ attr_add + \ prompt def list_types(t): return ["List of types:"] + \ [set([" %s : %s" % (m, mm) for m, mm in t])] modify = ["Element to modify?", "Attribute to modify?", "New value?", "Modified!",] class TestPetrinetInterface(unittest.TestCase): def test_po_pn_interface_manage(self): self.pn_interface_manage("PO") def test_co_pn_interface_manage(self): self.pn_interface_manage("CO") def pn_interface_manage(self, mode): self.assertTrue(run_file(all_files, ["list", "new", "PetriNets", "abc", "exit", "list", "new", "PetriNets", "def", "exit", "list", "delete", "def", "list", "rename", "abc", "a", "list", "delete", "a", "list", ], init + \ list_menu([]) + prompt + \ new + loaded + prompt + list_menu([("abc", "PetriNets")]) + prompt + \ new + loaded + prompt + list_menu([("abc", "PetriNets"), ("def", "PetriNets")]) + prompt + \ delete + prompt + list_menu([("abc", "PetriNets")]) + prompt + \ rename + prompt + list_menu([("a", "PetriNets")]) + prompt + \ delete + prompt + list_menu([]) + prompt, mode)) def test_po_pn_interface_new_reload(self): self.pn_interface_new_reload("PO") def test_co_pn_interface_new_reload(self): self.pn_interface_new_reload("CO") def pn_interface_new_reload(self, mode): self.assertTrue(run_file(all_files, ["new", "PetriNets", "abc", "exit", "load", "abc"], init + new + loaded + prompt + load + loaded, mode)) def test_po_pn_interface_instantiate_place(self): self.pn_interface_instantiate_place("PO") def test_co_pn_interface_instantiate_place(self): self.pn_interface_instantiate_place("CO") def pn_interface_instantiate_place(self, mode): self.assertTrue(run_file(all_files, ["new", "PetriNets", "abc", "instantiate", "Place", "p1", "attr_add", "p1", "tokens", 5, "list", "read", "p1", "instantiate", "Transition", "t1", "list", "read", "t1"], init + new + loaded + \ instantiate_node + prompt + \ attr_add + prompt + \ list_model([("p1", "Place"), ("p1.tokens", "Natural")]) + prompt + \ read_node("p1", "Place", [], [("tokens", "Natural", 5)]) + prompt + \ instantiate_node + prompt + \ list_model([("p1", "Place"), ("t1", "Transition"), ("p1.tokens", "Natural")]) + prompt + \ read_node("t1", "Transition", [], []) + prompt, mode)) def test_po_pn_interface_instantiate_arcs(self): self.pn_interface_instantiate_arcs("PO") def test_co_pn_interface_instantiate_arcs(self): self.pn_interface_instantiate_arcs("CO") def pn_interface_instantiate_arcs(self, mode): self.assertTrue(run_file(all_files, do_instantiate_simple + [ "read", "p1", "read", "p2", "read", "t1", "read", "p2t", "read", "t2p", ], did_instantiate_simple + \ read_node("p1", "Place", [], [("tokens", "Natural", 5)]) + prompt + \ read_node("p2", "Place", [], [("tokens", "Natural", 0)]) + prompt + \ read_node("t1", "Transition", [], []) + prompt + \ read_edge("p2t", "P2T", "p1", "t1", [], [("weight", "Natural", 2)]) + prompt + \ read_edge("t2p", "T2P", "t1", "p2", [], [("weight", "Natural", 1)]) + prompt, mode)) def test_po_pn_interface_verify_OK(self): self.pn_interface_verify_OK("PO") def test_co_pn_interface_verify_OK(self): self.pn_interface_verify_OK("CO") def pn_interface_verify_OK(self, mode): self.assertTrue(run_file(all_files, do_instantiate_simple + ["verify"], did_instantiate_simple + ["OK"], mode)) def test_po_pn_interface_verify_fail_tokens(self): self.pn_interface_verify_fail_tokens("PO") def test_co_pn_interface_verify_fail_tokens(self): self.pn_interface_verify_fail_tokens("CO") def pn_interface_verify_fail_tokens(self, mode): self.assertTrue(run_file(all_files, do_instantiate_simple + ["modify", "p1", "tokens", -5, "verify"], did_instantiate_simple + modify + prompt + verify_fail_tokens + prompt, mode)) def test_po_pn_interface_verify_fail_weight(self): self.pn_interface_verify_fail_weight("PO") def test_co_pn_interface_verify_fail_weight(self): self.pn_interface_verify_fail_weight("CO") def pn_interface_verify_fail_weight(self, mode): self.assertTrue(run_file(all_files, do_instantiate_simple + ["modify", "p2t", "weight", -2, "verify"], did_instantiate_simple + modify + prompt + verify_fail_weight + prompt, mode)) def test_po_pn_interface_verify_fail_structure(self): self.pn_interface_verify_fail_structure("PO") def test_co_pn_interface_verify_fail_structure(self): self.pn_interface_verify_fail_structure("CO") def pn_interface_verify_fail_structure(self, mode): self.assertTrue(run_file(all_files, ["new", "PetriNets", "abc", "instantiate", "Transition", "t1", "instantiate", "Place", "p1", "attr_add", "p1", "tokens", 5, "instantiate", "P2T", "p2t", "t1", "p1", "attr_add", "p2t", "weight", 2, "verify"], init + new + loaded + \ instantiate_node + prompt + \ instantiate_node + prompt + attr_add + prompt + \ instantiate_edge + prompt + attr_add + prompt + \ verify_fail_structure, mode)) def test_po_pn_interface_types(self): self.pn_interface_types("PO") def test_co_pn_interface_types(self): self.pn_interface_types("CO") def pn_interface_types(self, mode): self.assertTrue(run_file(all_files, ["new", "PetriNets", "abc", "types"], init + new + loaded + list_types([("Place", "Class"), ("Transition", "Class"), ("P2T", "Association"), ("T2P", "Association"), ("Natural", "Class"), ("Place_tokens", "Association"), ("Place_tokens.name", "String"), ("Place_tokens.target_lower_cardinality", "Natural"), ("Place_tokens.target_upper_cardinality", "Natural"), ("P2T_weight", "Association"), ("P2T_weight.name", "String"), ("P2T_weight.target_lower_cardinality", "Natural"), ("P2T_weight.target_upper_cardinality", "Natural"), ("T2P_weight", "Association"), ("T2P_weight.name", "String"), ("T2P_weight.target_lower_cardinality", "Natural"), ("T2P_weight.target_upper_cardinality", "Natural"), ]) + prompt, mode)) def test_po_pn_interface_modify_place(self): self.pn_interface_modify_place("PO") def test_co_pn_interface_modify_place(self): self.pn_interface_modify_place("CO") def pn_interface_modify_place(self, mode): self.assertTrue(run_file(all_files, ["new", "PetriNets", "abc", "instantiate", "Place", "p1", "attr_add", "p1", "tokens", 5, "read", "p1", "modify", "p1", "tokens", 1, "read", "p1"], init + new + loaded + \ instantiate_node + prompt + attr_add + prompt + \ read_node("p1", "Place", [], [("tokens", "Natural", 5)]) + prompt + \ modify + prompt + \ read_node("p1", "Place", [], [("tokens", "Natural", 1)]) + prompt, mode)) def test_po_pn_interface_verify_fail_attr_lower_cardinality(self): self.pn_interface_verify_fail_attr_lower_cardinality("PO") def test_co_pn_interface_verify_fail_attr_lower_cardinality(self): self.pn_interface_verify_fail_attr_lower_cardinality("CO") def pn_interface_verify_fail_attr_lower_cardinality(self, mode): self.assertTrue(run_file(all_files, do_instantiate_simple + ["instantiate", "Place", "p999", "verify"], did_instantiate_simple + instantiate_node + prompt + ["Lower cardinality violation for outgoing edge of type Place_tokens at p999"] + prompt, mode)) def test_po_pn_interface_verify_fail_attr_upper_cardinality(self): self.pn_interface_verify_fail_attr_upper_cardinality("PO") def test_co_pn_interface_verify_fail_attr_upper_cardinality(self): self.pn_interface_verify_fail_attr_upper_cardinality("CO") def pn_interface_verify_fail_attr_upper_cardinality(self, mode): self.assertTrue(run_file(all_files, do_instantiate_simple + ["attr_add", "p1", "tokens", 5, "verify"], did_instantiate_simple + attr_add + prompt + ["Upper cardinality violation for outgoing edge of type Place_tokens at p1"] + prompt, mode)) def test_po_pn_interface_verify_natural(self): self.pn_interface_verify_natural("PO") def test_co_pn_interface_verify_natural(self): self.pn_interface_verify_natural("CO") def pn_interface_verify_natural(self, mode): self.assertTrue(run_file(all_files, ["new", "PetriNets", "abc", "instantiate", "Place", "p1", "attr_add", "p1", "tokens", -5, "attr_del", "p1", "tokens", "attr_add", "p1", "tokens", 4, "verify"], init + new + loaded + \ instantiate_node + prompt + \ attr_add + prompt + \ attr_del + prompt + \ attr_add + prompt + \ ["OK"] + prompt, mode)) def test_po_pn_interface_verify_PN_OK(self): self.pn_interface_verify_PN_OK("PO") def test_co_pn_interface_verify_PN_OK(self): self.pn_interface_verify_PN_OK("CO") def pn_interface_verify_PN_OK(self, mode): self.assertTrue(run_file(all_files, ["load", "PetriNets", "verify"], init + load + loaded + ["OK"], mode)) def test_po_rpgame(self): self.rpgame("PO") def test_co_rpgame(self): self.rpgame("CO") def rpgame(self, mode): constraint_code = \ """ include "primitives.alh" include "object_operations.alh" Element function constraint(model : Element, name : String): Element associations Element back_associations Element association String destination associations = allOutgoingAssociationInstances(model, name, "tile_left") while (0 < list_len(associations)): association = set_pop(associations) destination = readAssociationDestination(model, association) back_associations = allOutgoingAssociationInstances(model, destination, "tile_right") if (list_len(back_associations) < 1): return "Left link does not have a right link back"! else: association = set_pop(back_associations) destination = readAssociationDestination(model, association) if (destination != name): return "Right link does not have a left link back to the same tile"! associations = allOutgoingAssociationInstances(model, name, "tile_right") while (0 < list_len(associations)): association = set_pop(associations) destination = readAssociationDestination(model, association) back_associations = allOutgoingAssociationInstances(model, destination, "tile_left") if (list_len(back_associations) < 1): return "Right link does not have a left link back"! else: association = set_pop(back_associations) destination = readAssociationDestination(model, association) if (destination != name): return "Right link does not have a left link back to the same tile"! associations = allOutgoingAssociationInstances(model, name, "tile_top") while (0 < list_len(associations)): association = set_pop(associations) destination = readAssociationDestination(model, association) back_associations = allOutgoingAssociationInstances(model, destination, "tile_bottom") if (list_len(back_associations) < 1): return "Top link does not have a bottom link back"! else: association = set_pop(back_associations) destination = readAssociationDestination(model, association) if (destination != name): return "Top link does not have a bottom link back to the same tile"! associations = allOutgoingAssociationInstances(model, name, "tile_bottom") while (0 < list_len(associations)): association = set_pop(associations) destination = readAssociationDestination(model, association) back_associations = allOutgoingAssociationInstances(model, destination, "tile_top") if (list_len(back_associations) < 1): return "Bottom link does not have a top link back"! else: association = set_pop(back_associations) destination = readAssociationDestination(model, association) if (destination != name): return "Bottom link does not have a top link back to the same tile"! return "OK"! """ constructors = get_constructor(constraint_code) self.assertTrue(run_file(all_files, ["new", "SimpleClassDiagrams", "RPGame", "set_inheritance", "Inheritance", "instantiate", "Class", "Scene", "instantiate", "Class", "Tile", "instantiate", "Class", "Item", "instantiate", "Class", "Goal", "instantiate", "Class", "Character", "instantiate", "Class", "Hero", "instantiate", "Association", "scene_has_tiles", "Scene", "Tile", "instantiate", "Association", "tile_left", "Tile", "Tile", "instantiate", "Association", "tile_right", "Tile", "Tile", "instantiate", "Association", "tile_top", "Tile", "Tile", "instantiate", "Association", "tile_bottom", "Tile", "Tile", "instantiate", "Association", "character_on", "Character", "Tile", "instantiate", "Association", "item_on", "Item", "Tile", "instantiate", "Inheritance", "hero_is_character", "Hero", "Character", "instantiate", "Inheritance", "goal_is_item", "Goal", "Item", "attr_add", "Scene", "lower_cardinality", 1, "attr_add", "Scene", "upper_cardinality", 1, "attr_add", "Goal", "lower_cardinality", 1, "attr_add", "scene_has_tiles", "source_lower_cardinality", 1, "attr_add", "scene_has_tiles", "source_upper_cardinality", 1, "attr_add", "scene_has_tiles", "target_lower_cardinality", 1, "attr_add", "item_on", "target_lower_cardinality", 1, "attr_add", "item_on", "target_upper_cardinality", 1, "attr_add", "item_on", "source_upper_cardinality", 1, "attr_add", "character_on", "target_lower_cardinality", 1, "attr_add", "character_on", "target_upper_cardinality", 1, "attr_add", "character_on", "source_upper_cardinality", 1, "attr_add", "tile_left", "source_upper_cardinality", 1, "attr_add", "tile_left", "target_upper_cardinality", 1, "attr_add", "tile_right", "source_upper_cardinality", 1, "attr_add", "tile_right", "target_upper_cardinality", 1, "attr_add", "tile_top", "source_upper_cardinality", 1, "attr_add", "tile_top", "target_upper_cardinality", 1, "attr_add", "tile_bottom", "source_upper_cardinality", 1, "attr_add", "tile_bottom", "target_upper_cardinality", 1, "attr_add_code", "Tile", "constraint", ] + constructors + ["verify"] + ["exit"] + [ "new", "RPGame", "my_game", "instantiate", "Scene", "scene", "instantiate", "Hero", "Link", "instantiate", "Goal", "goal", "instantiate", "Tile", "tile_00", "instantiate", "Tile", "tile_01", "instantiate", "Tile", "tile_10", "instantiate", "Tile", "tile_11", "instantiate", "scene_has_tiles", "", "scene", "tile_00", "instantiate", "scene_has_tiles", "", "scene", "tile_01", "instantiate", "scene_has_tiles", "", "scene", "tile_10", "instantiate", "scene_has_tiles", "", "scene", "tile_11", "instantiate", "character_on", "", "Link", "tile_00", "instantiate", "item_on", "", "goal", "tile_11", "instantiate", "tile_left", "", "tile_01", "tile_00", "instantiate", "tile_left", "", "tile_11", "tile_10", "instantiate", "tile_right", "", "tile_00", "tile_01", "instantiate", "tile_right", "", "tile_10", "tile_11", "instantiate", "tile_top", "", "tile_10", "tile_00", "instantiate", "tile_top", "", "tile_11", "tile_01", "instantiate", "tile_bottom", "", "tile_00", "tile_10", "instantiate", "tile_bottom", "", "tile_01", "tile_11", "verify", ], init + new + loaded + \ set_inheritance + prompt + \ (instantiate_node + prompt) * 6 + \ (instantiate_edge + prompt) * 9 + \ (attr_add + prompt) * 20 + \ ["Which model do you want to assign a coded attribute to?", "Which attribute do you wish to assign?", "Constructors for code?", "Added code!"] + \ prompt + \ ["OK"] + \ prompt + prompt + new + loaded + \ (instantiate_node + prompt) * 7 + \ (instantiate_edge + prompt) * 14 + \ ["OK"], mode)) def test_po_pn_interface_model_transform_pn(self): PN_runtime = \ """ import models/SimpleClassDiagrams as SimpleClassDiagrams SimpleClassDiagrams PetriNets_Runtime{ Class Natural {} Class Boolean {} Class Place { tokens : Natural } Class Transition { executing : Boolean } Association P2T (Place, Transition) { weight : Natural } Association T2P (Transition, Place) { weight : Natural } } export PetriNets_Runtime to models/PetriNets_Runtime """ PN_model = \ """ import models/PetriNets_Runtime as PetriNets_Runtime PetriNets_Runtime pn { Place p1 { tokens = 1 } Place p2 { tokens = 2 } Place p3 { tokens = 3 } Transition t1 { executing = False } P2T (p1, t1) { weight = 1 } P2T (p2, t1) { weight = 1 } T2P (t1, p3) { weight = 2 } } export pn to models/pn """ schedule_model = \ """ import models/PetriNets_Runtime_SCHEDULE as PN_Transform PN_Transform s { Composite schedule { {Contains} Failure failure {} {Contains} Success success {} {Contains} Atomic mark { NAC { Pre_Transition mark_nac_t { label = "1" } Pre_Place mark_nac_p{ label = "10" } Pre_P2T (mark_nac_p, mark_nac_t){ label = "11" } constraint = $ include "primitives.alh" include "modelling.alh" Boolean function constraint(host_model : Element, mapping : Element): Integer tokens Integer weight tokens = read_attribute(host_model, mapping["10"], "tokens") weight = read_attribute(host_model, mapping["11"], "weight") log("NAC == tokens " + cast_v2s(tokens)) log("NAC == weight " + cast_v2s(weight)) if (tokens < weight): log("TRUE") return True! else: log("FALSE") return False! $ } LHS { Pre_Transition { label = "1" } } RHS { Post_Transition { label = "1" action = $ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): unset_attribute(host_model, name, "executing") instantiate_attribute(host_model, name, "executing", True) return! $ } } } {Contains} ForAll consume { LHS { Pre_Transition lhs_consume_t{ label = "0" constraint = $ include "primitives.alh" include "modelling.alh" Boolean function constraint(host_model : Element, name : String): // Check if this node is executing currently return value_eq(read_attribute(host_model, name, "executing"), True)! $ } Pre_Place lhs_consume_p{ label = "1" } Pre_P2T lhs_consume_p2t(lhs_consume_p, lhs_consume_t){ label = "2" } } RHS { Post_Transition rhs_consume_t { label = "0" } Post_Place rhs_consume_p { label = "1" action = $ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): Integer tokens Integer weight tokens = read_attribute(host_model, name, "tokens") weight = read_attribute(host_model, mapping["2"], "weight") unset_attribute(host_model, name, "tokens") instantiate_attribute(host_model, name, "tokens", tokens - weight) return! $ } Post_P2T (rhs_consume_p, rhs_consume_t){ label = "2" } } } {Contains} ForAll produce { LHS { Pre_Transition lhs_produce_t{ label = "0" constraint = $ include "primitives.alh" include "modelling.alh" Boolean function constraint(host_model : Element, name : String): // Check if this node is executing currently return value_eq(read_attribute(host_model, name, "executing"), True)! $ } Pre_Place lhs_produce_p{ label = "1" } Pre_T2P (lhs_produce_t, lhs_produce_p){ label = "2" } } RHS { Post_Transition rhs_produce_t{ label = "0" } Post_Place rhs_produce_p{ label = "1" action = $ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): Integer tokens Integer weight tokens = read_attribute(host_model, name, "tokens") weight = read_attribute(host_model, mapping["2"], "weight") unset_attribute(host_model, name, "tokens") instantiate_attribute(host_model, name, "tokens", tokens + weight) return! $ } Post_T2P (rhs_produce_t, rhs_produce_p){ label = "2" } } } {Contains} Atomic unmark_transition { LHS { Pre_Transition { label = "0" constraint = $ include "primitives.alh" include "modelling.alh" Boolean function constraint(host_model : Element, name : String): // Check if this node is executing currently return value_eq(read_attribute(host_model, name, "executing"), True)! $ } } RHS { Post_Transition { label = "0" action = $ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): unset_attribute(host_model, name, "executing") instantiate_attribute(host_model, name, "executing", False) return! $ } } } } OnSuccess (mark, consume) {} OnFailure (mark, failure) {} OnSuccess (consume, produce) {} OnFailure (consume, produce) {} OnSuccess (produce, unmark_transition) {} OnFailure (produce, unmark_transition) {} OnSuccess (unmark_transition, success) {} OnFailure (unmark_transition, failure) {} Initial (schedule, mark) {} } export s to models/pn_simulate """ self.assertTrue(run_file(all_files, get_model_constructor(PN_runtime) + [ ] + get_model_constructor(PN_model) + [ "load", "pn", "read", "t1", "read", "p1", "read", "p2", "read", "p3", "exit", "ramify", "PetriNets_Runtime_SCHEDULE", "PetriNets_Runtime", ] + get_model_constructor(schedule_model) + [ "transform", "pn", "pn_simulate", "transform", "pn", "pn_simulate", "load", "pn", "verify", "read", "t1", "read", "p1", "read", "p2", "read", "p3", "exit", ], greeting + prompt * 3 + load + loaded + read_node("t1", "Transition", [], [("executing", "Boolean", False)]) + prompt + read_node("p1", "Place", [], [("tokens", "Natural", 1)]) + prompt + read_node("p2", "Place", [], [("tokens", "Natural", 2)]) + prompt + read_node("p3", "Place", [], [("tokens", "Natural", 3)]) + prompt + prompt + ramify + prompt + prompt + transform + transform_result_true + prompt + transform + transform_result_false + prompt + load + loaded + ["OK"] + prompt + read_node("t1", "Transition", [], [("executing", "Boolean", False)]) + prompt + read_node("p1", "Place", [], [("tokens", "Natural", 0)]) + prompt + read_node("p2", "Place", [], [("tokens", "Natural", 1)]) + prompt + read_node("p3", "Place", [], [("tokens", "Natural", 5)]) + prompt, "PO")) def test_po_pn_interface_transform_pn_to_runtime(self): PN_runtime = \ """ import models/SimpleClassDiagrams as SimpleClassDiagrams SimpleClassDiagrams PetriNets_Design{ Class Natural {} Class Place { tokens : Natural } Class Transition {} Association P2T (Place, Transition) { weight : Natural } Association T2P (Transition, Place) { weight : Natural } } SimpleClassDiagrams PetriNets_Runtime{ Class Natural {} Class Boolean {} Class String {} Class Place { tokens : Natural name : String } Class Transition { executing : Boolean } Association P2T (Place, Transition) { weight : Natural } Association T2P (Transition, Place) { weight : Natural } } export PetriNets_Design to models/PetriNets_Design export PetriNets_Runtime to models/PetriNets_Runtime """ PN_model = \ """ import models/PetriNets_Design as PetriNets PetriNets pn { Place p1 { tokens = 1 } Place p2 { tokens = 2 } Place p3 { tokens = 3 } Transition t1 {} P2T (p1, t1) { weight = 1 } P2T (p2, t1) { weight = 1 } T2P (t1, p3) { weight = 2 } } export pn to models/pn """ schedule_model_annotate = \ """ import models/RAM_PetriNets_Design_Runtime as RAM_PN_DR RAM_PN_DR annotate { Composite schedule { {Contains} Failure failure {} {Contains} Success success {} {Contains} ForAll copy_transitions { LHS { Pre_SOURCE_Transition { label = "0" } } RHS { Post_SOURCE_Transition ct1 { label = "0" } Post_TARGET_Transition ct2 { label = "1" action = $ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): instantiate_attribute(host_model, name, "executing", False) return! $ } Post_TransitionLink (ct1, ct2){ label = "2" } } } {Contains} ForAll copy_places { LHS { Pre_SOURCE_Place { label = "0" } } RHS { Post_SOURCE_Place cp1 { label = "0" } Post_TARGET_Place cp2 { label = "1" action = $ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): instantiate_attribute(host_model, name, "tokens", read_attribute(host_model, mapping["0"], "tokens")) instantiate_attribute(host_model, name, "name", mapping["0"]) return! $ } Post_PlaceLink (cp1, cp2){ label = "2" } } } {Contains} ForAll copy_P2T { LHS { Pre_SOURCE_Place cp2t_p{ label = "0" } Pre_SOURCE_Transition cp2t_t{ label = "1" } Pre_SOURCE_P2T (cp2t_p, cp2t_t){ label = "2" } Pre_TARGET_Place cp2t_p2{ label = "3" } Pre_TARGET_Transition cp2t_t2{ label = "4" } Pre_PlaceLink (cp2t_p, cp2t_p2){ label = "5" } Pre_TransitionLink (cp2t_t, cp2t_t2){ label = "6" } } RHS { Post_SOURCE_Place rhs_cp2t_p{ label = "0" } Post_SOURCE_Transition rhs_cp2t_t{ label = "1" } Post_SOURCE_P2T rhs_cp2t_p2t (rhs_cp2t_p, rhs_cp2t_t){ label = "2" } Post_TARGET_Place rhs_cp2t_p2 { label = "3" } Post_TARGET_Transition rhs_cp2t_t2 { label = "4" } Post_PlaceLink (rhs_cp2t_p, rhs_cp2t_p2){ label = "5" } Post_TransitionLink (rhs_cp2t_t, rhs_cp2t_t2){ label = "6" } Post_TARGET_P2T rhs_cp2t_p2t2(rhs_cp2t_p2, rhs_cp2t_t2) { label = "7" action = $ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): instantiate_attribute(host_model, name, "weight", read_attribute(host_model, mapping["2"], "weight")) return! $ } } } {Contains} ForAll copy_T2P { LHS { Pre_SOURCE_Place ct2p_p{ label = "0" } Pre_SOURCE_Transition ct2p_t{ label = "1" } Pre_SOURCE_T2P (ct2p_t, ct2p_p){ label = "2" } Pre_TARGET_Place ct2p_p2{ label = "3" } Pre_TARGET_Transition ct2p_t2{ label = "4" } Pre_PlaceLink (ct2p_p, ct2p_p2){ label = "5" } Pre_TransitionLink (ct2p_t, ct2p_t2){ label = "6" } } RHS { Post_SOURCE_Place rhs_ct2p_p{ label = "0" } Post_SOURCE_Transition rhs_ct2p_t{ label = "1" } Post_SOURCE_T2P (rhs_ct2p_t, rhs_ct2p_p){ label = "2" } Post_TARGET_Place rhs_ct2p_p2 { label = "3" } Post_TARGET_Transition rhs_ct2p_t2 { label = "4" } Post_PlaceLink (rhs_ct2p_p, rhs_ct2p_p2){ label = "5" } Post_TransitionLink (rhs_ct2p_t, rhs_ct2p_t2){ label = "6" } Post_TARGET_T2P (rhs_ct2p_t2, rhs_ct2p_p2) { label = "7" action = $ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): instantiate_attribute(host_model, name, "weight", read_attribute(host_model, mapping["2"], "weight")) return! $ } } } } OnSuccess (copy_places, copy_transitions) {} OnSuccess (copy_transitions, copy_P2T) {} OnSuccess (copy_P2T, copy_T2P) {} OnSuccess (copy_T2P, success) {} OnFailure (copy_places, copy_transitions) {} OnFailure (copy_transitions, copy_P2T) {} OnFailure (copy_P2T, copy_T2P) {} OnFailure (copy_T2P, success) {} Initial (schedule, copy_places) {} } export annotate to models/pn_annotate """ schedule_model_print = \ """ import models/RAM_PetriNets_Runtime as RAM_PN_R RAM_PN_R print { Composite schedule { {Contains} Success success {} {Contains} ForAll print_tokens { LHS { Pre_Place { label = "0" } } RHS { Post_Place { label = "0" action = $ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): output((cast_v2s(read_attribute(host_model, name, "name")) + " --> ") + cast_v2s(read_attribute(host_model, name, "tokens"))) return! $ } } } } OnSuccess (print_tokens, success) {} OnFailure (print_tokens, success) {} Initial (schedule, print_tokens) {} } export print to models/pn_print """ schedule_model_simulate = \ """ import models/RAM_PetriNets_Runtime as RAM_PN_R RAM_PN_R s { Composite schedule { {Contains} Failure failure {} {Contains} Success success {} {Contains} Atomic mark { LHS { Pre_Transition { label = "1" constraint = $ include "primitives.alh" include "modelling.alh" include "object_operations.alh" Boolean function constraint(host_model : Element, name : String): Element links String link String place links = allIncomingAssociationInstances(host_model, name, "P2T") while (read_nr_out(links) > 0): link = set_pop(links) place = readAssociationSource(host_model, link) if (integer_lt(read_attribute(host_model, place, "tokens"), read_attribute(host_model, link, "weight"))): return False! return True! $ } } RHS { Post_Transition { label = "1" action = $ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): unset_attribute(host_model, name, "executing") instantiate_attribute(host_model, name, "executing", True) return! $ } } } {Contains} ForAll consume { LHS { Pre_Transition lhs_consume_t{ label = "0" constraint = $ include "primitives.alh" include "modelling.alh" Boolean function constraint(host_model : Element, name : String): // Check if this node is executing currently return value_eq(read_attribute(host_model, name, "executing"), True)! $ } Pre_Place lhs_consume_p{ label = "1" } Pre_P2T lhs_consume_p2t(lhs_consume_p, lhs_consume_t){ label = "2" } } RHS { Post_Transition rhs_consume_t { label = "0" } Post_Place rhs_consume_p { label = "1" action = $ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): Integer tokens Integer weight tokens = read_attribute(host_model, name, "tokens") weight = read_attribute(host_model, mapping["2"], "weight") unset_attribute(host_model, name, "tokens") instantiate_attribute(host_model, name, "tokens", tokens - weight) log("Consume for " + cast_v2s(read_attribute(host_model, name, "name"))) log("Previous: " + cast_v2s(tokens)) log("Now: " + cast_v2s(tokens - weight)) return! $ } Post_P2T (rhs_consume_p, rhs_consume_t){ label = "2" } } } {Contains} ForAll produce { LHS { Pre_Transition lhs_produce_t{ label = "0" constraint = $ include "primitives.alh" include "modelling.alh" Boolean function constraint(host_model : Element, name : String): // Check if this node is executing currently return value_eq(read_attribute(host_model, name, "executing"), True)! $ } Pre_Place lhs_produce_p{ label = "1" } Pre_T2P (lhs_produce_t, lhs_produce_p){ label = "2" } } RHS { Post_Transition rhs_produce_t{ label = "0" } Post_Place rhs_produce_p{ label = "1" action = $ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): Integer tokens Integer weight tokens = read_attribute(host_model, name, "tokens") weight = read_attribute(host_model, mapping["2"], "weight") unset_attribute(host_model, name, "tokens") instantiate_attribute(host_model, name, "tokens", tokens + weight) log("Produce for " + cast_v2s(read_attribute(host_model, name, "name"))) log("Previous: " + cast_v2s(tokens)) log("Now: " + cast_v2s(tokens + weight)) return! $ } Post_T2P (rhs_produce_t, rhs_produce_p){ label = "2" } } } {Contains} Atomic unmark_transition { LHS { Pre_Transition { label = "0" constraint = $ include "primitives.alh" include "modelling.alh" Boolean function constraint(host_model : Element, name : String): // Check if this node is executing currently return value_eq(read_attribute(host_model, name, "executing"), True)! $ } } RHS { Post_Transition { label = "0" action = $ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): unset_attribute(host_model, name, "executing") instantiate_attribute(host_model, name, "executing", False) return! $ } } } } OnSuccess (mark, consume) {} OnFailure (mark, failure) {} OnSuccess (consume, produce) {} OnFailure (consume, produce) {} OnSuccess (produce, unmark_transition) {} OnFailure (produce, unmark_transition) {} OnSuccess (unmark_transition, success) {} OnFailure (unmark_transition, failure) {} Initial (schedule, mark) {} } export s to models/pn_simulate """ self.assertTrue(run_file(all_files, get_model_constructor(PN_runtime) + \ get_model_constructor(PN_model) + [ "unify", "PetriNets_Design_to_Runtime", "PetriNets_Design", "SOURCE_", "PetriNets_Runtime", "TARGET_", "join", "pn", "PetriNets_Design_to_Runtime", "SOURCE_", "load", "PetriNets_Design_to_Runtime", "instantiate", "Association", "PlaceLink", "SOURCE_Place", "TARGET_Place", "instantiate", "Association", "TransitionLink", "SOURCE_Transition", "TARGET_Transition", "exit", "ramify", "RAM_PetriNets_Design_Runtime", "PetriNets_Design_to_Runtime", "ramify", "RAM_PetriNets_Runtime", "PetriNets_Runtime", ] + get_model_constructor(schedule_model_annotate) + [ ] + get_model_constructor(schedule_model_print) + [ ] + get_model_constructor(schedule_model_simulate) + [ "transform", "pn", "pn_annotate", "split", "pn", "PetriNets_Runtime", "TARGET_", "transform", "pn", "pn_print", "transform", "pn", "pn_simulate", "transform", "pn", "pn_print", ], greeting + prompt * 3 + unify + prompt + join + prompt + load + loaded + instantiate_edge + prompt + instantiate_edge + prompt + prompt + ramify + prompt + ramify + prompt + prompt + prompt + prompt + transform + transform_result_true + prompt + split + prompt + transform + [set(['"p1" --> 1', '"p2" --> 2', '"p3" --> 3'])] + transform_result_true + prompt + transform + transform_result_true + prompt + transform + [set(['"p1" --> 0', '"p2" --> 1', '"p3" --> 5'])] + transform_result_true + prompt , "PO"))