import unittest from utils import run_file, get_constructor, get_model_constructor set_inheritance = [ "Which link in the metamodel is the inheritance link?", "Set inheritance link!", ] do_instantiate_simple = [ "new", "PetriNets", "abc", "instantiate", "Transition", "t1", "instantiate", "Place", "p1", "attr_add", "p1", "tokens", 5, "instantiate", "Place", "p2", "attr_add", "p2", "tokens", 0, "instantiate", "P2T", "p2t", "p1", "t1", "attr_add", "p2t", "weight", 2, "instantiate", "T2P", "t2p", "t1", "p2", "attr_add", "t2p", "weight", 1] instantiate_node = ["Type to instantiate?", "Name of new element?", "Instantiation successful!"] instantiate_edge = ["Type to instantiate?", "Name of new element?", "Source name?", "Destination name?", "Instantiation successful!"] all_files = [ "pn_interface.alc", "primitives.alc", "object_operations.alc", "conformance_scd.alc", "library.alc", "transform.alc", "ramify.alc", "metamodels.alc", "random.alc", "constructors.alc", "modelling.alc", "compilation_manager.alc", ] greeting = ["Welcome to the Model Management Interface, running live on the Modelverse!", "Use 'help' command for a list of possible commands"] new = ["Metamodel to instantiate?", "Name of model?"] prompt = ["Please give your command."] loaded = ["Model loaded, ready for commands!", "Use 'help' command for a list of possible commands"] + prompt load = ["Model to load?"] instantiate = ["Type to instantiate?"] instantiate_name = ["Name of new element?"] instantiate_ok = ["Instantiation successful!"] instantiate_source= ["Source name?"] instantiate_destination = ["Destination name?"] def list_menu(defined): defined.append(("PetriNets", "SimpleClassDiagrams")) defined.append(("SimpleClassDiagrams", "SimpleClassDiagrams")) defined.append(("LTM_bottom", "LTM_bottom")) return ["Found models:", set([" %s : %s" % (m, mm) for m, mm in defined])] def list_model(defined): return ["List of all elements:", set([" %s : %s" % (m, mm) for m, mm in defined])] def read_node(name, t, defs, attrs): return ["Element to read?", "Name: %s" % name, "Type: %s" % t, "Defines attributes:"] + \ ([set([" %s : %s" % (m, mm) for m, mm in defs])] if defs else []) + \ ["Attributes:"] + \ ([set([' "%s" : "%s" = %s' % (m, mm, v) for m, mm, v in attrs])] if attrs else []) def read_edge(name, t, src, dst, defs, attrs): return ["Element to read?", "Name: %s" % name, "Type: %s" % t, "Source: %s" % src, "Destination: %s" % dst, "Defines attributes:"] + \ ([set([" %s : %s" % (m, mm) for m, mm in defs])] if defs else []) + \ ["Attributes:"] + \ ([set([' "%s" : "%s" = %s' % (m, mm, v) for m, mm, v in attrs])] if attrs else []) delete = ["Model to delete?", "Deleted!"] rename = ["Old name?", "New name?", "Rename complete!"] attr_add = ["Which model do you want to assign an attribute to?", "Which attribute do you wish to assign?", "Value of attribute?", "Added attribute!"] attr_del = ["Which model do you want to remove an attribute of?", "Which attribute do you want to delete?", "Attribute deleted!", ] help_root = ["Currently no model is loaded, so your operations are limited to:", " new -- Create a new model and save it for future use" " load -- Load a previously made model", " rename -- Rename a previously made model", " delete -- Delete a previously made model", " list -- Show a list of all stored models", " help -- Show a list of possible commands"] verify_fail_weight= ["Natural number not larger than or equal to zero"] verify_fail_tokens= ["Natural number not larger than or equal to zero"] verify_fail_structure = ["Source of model edge not typed by source of type: p2t"] init = greeting + prompt did_instantiate_simple = init + \ new + \ loaded +\ instantiate_node + \ prompt + \ instantiate_node + \ prompt + \ attr_add + \ prompt + \ instantiate_node + \ prompt + \ attr_add + \ prompt + \ instantiate_edge + \ prompt + \ attr_add + \ prompt + \ instantiate_edge + \ prompt + \ attr_add + \ prompt def list_types(t): return ["List of types:"] + \ [set([" %s : %s" % (m, mm) for m, mm in t])] modify = ["Element to modify?", "Attribute to modify?", "New value?", "Modified!",] class TestPetrinetInterface(unittest.TestCase): def test_po_pn_interface_manage(self): self.pn_interface_manage("PO") def test_co_pn_interface_manage(self): self.pn_interface_manage("CO") def pn_interface_manage(self, mode): self.assertTrue(run_file(all_files, ["list", "new", "PetriNets", "abc", "exit", "list", "new", "PetriNets", "def", "exit", "list", "delete", "def", "list", "rename", "abc", "a", "list", "delete", "a", "list", ], init + \ list_menu([]) + prompt + \ new + loaded + prompt + list_menu([("abc", "PetriNets")]) + prompt + \ new + loaded + prompt + list_menu([("abc", "PetriNets"), ("def", "PetriNets")]) + prompt + \ delete + prompt + list_menu([("abc", "PetriNets")]) + prompt + \ rename + prompt + list_menu([("a", "PetriNets")]) + prompt + \ delete + prompt + list_menu([]) + prompt, mode)) def test_po_pn_interface_new_reload(self): self.pn_interface_new_reload("PO") def test_co_pn_interface_new_reload(self): self.pn_interface_new_reload("CO") def pn_interface_new_reload(self, mode): self.assertTrue(run_file(all_files, ["new", "PetriNets", "abc", "exit", "load", "abc"], init + new + loaded + prompt + load + loaded, mode)) def test_po_pn_interface_instantiate_place(self): self.pn_interface_instantiate_place("PO") def test_co_pn_interface_instantiate_place(self): self.pn_interface_instantiate_place("CO") def pn_interface_instantiate_place(self, mode): self.assertTrue(run_file(all_files, ["new", "PetriNets", "abc", "instantiate", "Place", "p1", "attr_add", "p1", "tokens", 5, "list", "read", "p1", "instantiate", "Transition", "t1", "list", "read", "t1"], init + new + loaded + \ instantiate_node + prompt + \ attr_add + prompt + \ list_model([("p1", "Place"), ("p1.tokens", "Natural")]) + prompt + \ read_node("p1", "Place", [], [("tokens", "Natural", 5)]) + prompt + \ instantiate_node + prompt + \ list_model([("p1", "Place"), ("t1", "Transition"), ("p1.tokens", "Natural")]) + prompt + \ read_node("t1", "Transition", [], []) + prompt, mode)) def test_po_pn_interface_instantiate_arcs(self): self.pn_interface_instantiate_arcs("PO") def test_co_pn_interface_instantiate_arcs(self): self.pn_interface_instantiate_arcs("CO") def pn_interface_instantiate_arcs(self, mode): self.assertTrue(run_file(all_files, do_instantiate_simple + [ "read", "p1", "read", "p2", "read", "t1", "read", "p2t", "read", "t2p", ], did_instantiate_simple + \ read_node("p1", "Place", [], [("tokens", "Natural", 5)]) + prompt + \ read_node("p2", "Place", [], [("tokens", "Natural", 0)]) + prompt + \ read_node("t1", "Transition", [], []) + prompt + \ read_edge("p2t", "P2T", "p1", "t1", [], [("weight", "Natural", 2)]) + prompt + \ read_edge("t2p", "T2P", "t1", "p2", [], [("weight", "Natural", 1)]) + prompt, mode)) def test_po_pn_interface_verify_OK(self): self.pn_interface_verify_OK("PO") def test_co_pn_interface_verify_OK(self): self.pn_interface_verify_OK("CO") def pn_interface_verify_OK(self, mode): self.assertTrue(run_file(all_files, do_instantiate_simple + ["verify"], did_instantiate_simple + ["OK"], mode)) def test_po_pn_interface_verify_fail_tokens(self): self.pn_interface_verify_fail_tokens("PO") def test_co_pn_interface_verify_fail_tokens(self): self.pn_interface_verify_fail_tokens("CO") def pn_interface_verify_fail_tokens(self, mode): self.assertTrue(run_file(all_files, do_instantiate_simple + ["modify", "p1", "tokens", -5, "verify"], did_instantiate_simple + modify + prompt + verify_fail_tokens + prompt, mode)) def test_po_pn_interface_verify_fail_weight(self): self.pn_interface_verify_fail_weight("PO") def test_co_pn_interface_verify_fail_weight(self): self.pn_interface_verify_fail_weight("CO") def pn_interface_verify_fail_weight(self, mode): self.assertTrue(run_file(all_files, do_instantiate_simple + ["modify", "p2t", "weight", -2, "verify"], did_instantiate_simple + modify + prompt + verify_fail_weight + prompt, mode)) def test_po_pn_interface_verify_fail_structure(self): self.pn_interface_verify_fail_structure("PO") def test_co_pn_interface_verify_fail_structure(self): self.pn_interface_verify_fail_structure("CO") def pn_interface_verify_fail_structure(self, mode): self.assertTrue(run_file(all_files, ["new", "PetriNets", "abc", "instantiate", "Transition", "t1", "instantiate", "Place", "p1", "attr_add", "p1", "tokens", 5, "instantiate", "P2T", "p2t", "t1", "p1", "attr_add", "p2t", "weight", 2, "verify"], init + new + loaded + \ instantiate_node + prompt + \ instantiate_node + prompt + attr_add + prompt + \ instantiate_edge + prompt + attr_add + prompt + \ verify_fail_structure, mode)) def test_po_pn_interface_types(self): self.pn_interface_types("PO") def test_co_pn_interface_types(self): self.pn_interface_types("CO") def pn_interface_types(self, mode): self.assertTrue(run_file(all_files, ["new", "PetriNets", "abc", "types"], init + new + loaded + list_types([("Place", "Class"), ("Transition", "Class"), ("P2T", "Association"), ("T2P", "Association"), ("Natural", "Class"), ("Place_tokens", "Association"), ("Place_tokens.name", "String"), ("Place_tokens.target_lower_cardinality", "Natural"), ("Place_tokens.target_upper_cardinality", "Natural"), ("P2T_weight", "Association"), ("P2T_weight.name", "String"), ("P2T_weight.target_lower_cardinality", "Natural"), ("P2T_weight.target_upper_cardinality", "Natural"), ("T2P_weight", "Association"), ("T2P_weight.name", "String"), ("T2P_weight.target_lower_cardinality", "Natural"), ("T2P_weight.target_upper_cardinality", "Natural"), ]) + prompt, mode)) def test_po_pn_interface_modify_place(self): self.pn_interface_modify_place("PO") def test_co_pn_interface_modify_place(self): self.pn_interface_modify_place("CO") def pn_interface_modify_place(self, mode): self.assertTrue(run_file(all_files, ["new", "PetriNets", "abc", "instantiate", "Place", "p1", "attr_add", "p1", "tokens", 5, "read", "p1", "modify", "p1", "tokens", 1, "read", "p1"], init + new + loaded + \ instantiate_node + prompt + attr_add + prompt + \ read_node("p1", "Place", [], [("tokens", "Natural", 5)]) + prompt + \ modify + prompt + \ read_node("p1", "Place", [], [("tokens", "Natural", 1)]) + prompt, mode)) def test_po_pn_interface_verify_fail_attr_lower_cardinality(self): self.pn_interface_verify_fail_attr_lower_cardinality("PO") def test_co_pn_interface_verify_fail_attr_lower_cardinality(self): self.pn_interface_verify_fail_attr_lower_cardinality("CO") def pn_interface_verify_fail_attr_lower_cardinality(self, mode): self.assertTrue(run_file(all_files, do_instantiate_simple + ["instantiate", "Place", "p999", "verify"], did_instantiate_simple + instantiate_node + prompt + ["Lower cardinality violation for outgoing edge of type Place_tokens at p999"] + prompt, mode)) def test_po_pn_interface_verify_fail_attr_upper_cardinality(self): self.pn_interface_verify_fail_attr_upper_cardinality("PO") def test_co_pn_interface_verify_fail_attr_upper_cardinality(self): self.pn_interface_verify_fail_attr_upper_cardinality("CO") def pn_interface_verify_fail_attr_upper_cardinality(self, mode): self.assertTrue(run_file(all_files, do_instantiate_simple + ["attr_add", "p1", "tokens", 5, "verify"], did_instantiate_simple + attr_add + prompt + ["Upper cardinality violation for outgoing edge of type Place_tokens at p1"] + prompt, mode)) def test_po_pn_interface_verify_natural(self): self.pn_interface_verify_natural("PO") def test_co_pn_interface_verify_natural(self): self.pn_interface_verify_natural("CO") def pn_interface_verify_natural(self, mode): self.assertTrue(run_file(all_files, ["new", "PetriNets", "abc", "instantiate", "Place", "p1", "attr_add", "p1", "tokens", -5, "attr_del", "p1", "tokens", "attr_add", "p1", "tokens", 4, "verify"], init + new + loaded + \ instantiate_node + prompt + \ attr_add + prompt + \ attr_del + prompt + \ attr_add + prompt + \ ["OK"] + prompt, mode)) def test_po_pn_interface_verify_PN_OK(self): self.pn_interface_verify_PN_OK("PO") def test_co_pn_interface_verify_PN_OK(self): self.pn_interface_verify_PN_OK("CO") def pn_interface_verify_PN_OK(self, mode): self.assertTrue(run_file(all_files, ["load", "PetriNets", "verify"], init + load + loaded + ["OK"], mode)) def test_po_rpgame(self): self.rpgame("PO") def test_co_rpgame(self): self.rpgame("CO") def rpgame(self, mode): constraint_code = \ """ include "primitives.alh" include "object_operations.alh" Element function constraint(model : Element, name : String): Element associations Element back_associations Element association String destination associations = allOutgoingAssociationInstances(model, name, "tile_left") while (0 < list_len(associations)): association = set_pop(associations) destination = readAssociationDestination(model, association) back_associations = allOutgoingAssociationInstances(model, destination, "tile_right") if (list_len(back_associations) < 1): return "Left link does not have a right link back"! else: association = set_pop(back_associations) destination = readAssociationDestination(model, association) if (destination != name): return "Right link does not have a left link back to the same tile"! associations = allOutgoingAssociationInstances(model, name, "tile_right") while (0 < list_len(associations)): association = set_pop(associations) destination = readAssociationDestination(model, association) back_associations = allOutgoingAssociationInstances(model, destination, "tile_left") if (list_len(back_associations) < 1): return "Right link does not have a left link back"! else: association = set_pop(back_associations) destination = readAssociationDestination(model, association) if (destination != name): return "Right link does not have a left link back to the same tile"! associations = allOutgoingAssociationInstances(model, name, "tile_top") while (0 < list_len(associations)): association = set_pop(associations) destination = readAssociationDestination(model, association) back_associations = allOutgoingAssociationInstances(model, destination, "tile_bottom") if (list_len(back_associations) < 1): return "Top link does not have a bottom link back"! else: association = set_pop(back_associations) destination = readAssociationDestination(model, association) if (destination != name): return "Top link does not have a bottom link back to the same tile"! associations = allOutgoingAssociationInstances(model, name, "tile_bottom") while (0 < list_len(associations)): association = set_pop(associations) destination = readAssociationDestination(model, association) back_associations = allOutgoingAssociationInstances(model, destination, "tile_top") if (list_len(back_associations) < 1): return "Bottom link does not have a top link back"! else: association = set_pop(back_associations) destination = readAssociationDestination(model, association) if (destination != name): return "Bottom link does not have a top link back to the same tile"! return "OK"! """ constructors = get_constructor(constraint_code) self.assertTrue(run_file(all_files, ["new", "SimpleClassDiagrams", "RPGame", "set_inheritance", "Inheritance", "instantiate", "Class", "Scene", "instantiate", "Class", "Tile", "instantiate", "Class", "Item", "instantiate", "Class", "Goal", "instantiate", "Class", "Character", "instantiate", "Class", "Hero", "instantiate", "Association", "scene_has_tiles", "Scene", "Tile", "instantiate", "Association", "tile_left", "Tile", "Tile", "instantiate", "Association", "tile_right", "Tile", "Tile", "instantiate", "Association", "tile_top", "Tile", "Tile", "instantiate", "Association", "tile_bottom", "Tile", "Tile", "instantiate", "Association", "character_on", "Character", "Tile", "instantiate", "Association", "item_on", "Item", "Tile", "instantiate", "Inheritance", "hero_is_character", "Hero", "Character", "instantiate", "Inheritance", "goal_is_item", "Goal", "Item", "attr_add", "Scene", "lower_cardinality", 1, "attr_add", "Scene", "upper_cardinality", 1, "attr_add", "Goal", "lower_cardinality", 1, "attr_add", "scene_has_tiles", "source_lower_cardinality", 1, "attr_add", "scene_has_tiles", "source_upper_cardinality", 1, "attr_add", "scene_has_tiles", "target_lower_cardinality", 1, "attr_add", "item_on", "target_lower_cardinality", 1, "attr_add", "item_on", "target_upper_cardinality", 1, "attr_add", "item_on", "source_upper_cardinality", 1, "attr_add", "character_on", "target_lower_cardinality", 1, "attr_add", "character_on", "target_upper_cardinality", 1, "attr_add", "character_on", "source_upper_cardinality", 1, "attr_add", "tile_left", "source_upper_cardinality", 1, "attr_add", "tile_left", "target_upper_cardinality", 1, "attr_add", "tile_right", "source_upper_cardinality", 1, "attr_add", "tile_right", "target_upper_cardinality", 1, "attr_add", "tile_top", "source_upper_cardinality", 1, "attr_add", "tile_top", "target_upper_cardinality", 1, "attr_add", "tile_bottom", "source_upper_cardinality", 1, "attr_add", "tile_bottom", "target_upper_cardinality", 1, "attr_add_code", "Tile", "constraint", ] + constructors + ["verify"] + ["exit"] + [ "new", "RPGame", "my_game", "instantiate", "Scene", "scene", "instantiate", "Hero", "Link", "instantiate", "Goal", "goal", "instantiate", "Tile", "tile_00", "instantiate", "Tile", "tile_01", "instantiate", "Tile", "tile_10", "instantiate", "Tile", "tile_11", "instantiate", "scene_has_tiles", "", "scene", "tile_00", "instantiate", "scene_has_tiles", "", "scene", "tile_01", "instantiate", "scene_has_tiles", "", "scene", "tile_10", "instantiate", "scene_has_tiles", "", "scene", "tile_11", "instantiate", "character_on", "", "Link", "tile_00", "instantiate", "item_on", "", "goal", "tile_11", "instantiate", "tile_left", "", "tile_01", "tile_00", "instantiate", "tile_left", "", "tile_11", "tile_10", "instantiate", "tile_right", "", "tile_00", "tile_01", "instantiate", "tile_right", "", "tile_10", "tile_11", "instantiate", "tile_top", "", "tile_10", "tile_00", "instantiate", "tile_top", "", "tile_11", "tile_01", "instantiate", "tile_bottom", "", "tile_00", "tile_10", "instantiate", "tile_bottom", "", "tile_01", "tile_11", "verify", ], init + new + loaded + \ set_inheritance + prompt + \ (instantiate_node + prompt) * 6 + \ (instantiate_edge + prompt) * 9 + \ (attr_add + prompt) * 20 + \ ["Element to constrain (empty for global)?", "Give input to function constructors for LOCAL constraint!", "Added constraint to model!"] + \ prompt + \ ["OK"] + \ prompt + prompt + new + loaded + \ (instantiate_node + prompt) * 7 + \ (instantiate_edge + prompt) * 14 + \ ["OK"], mode)) def test_po_pn_interface_transform_pn(self): code_mark_constraint = \ """ include "primitives.alh" include "modelling.alh" include "object_operations.alh" Boolean function constraint(host_model : Element, name : String): // Make sure that all places have enough tokens Element links String link String place links = allIncomingAssociationInstances(host_model, name, "P2T") while (read_nr_out(links) > 0): link = set_pop(links) place = readAssociationSource(host_model, link) log("Check link " + link) log("Check place " + place) if (integer_lt(read_attribute(host_model, place, "tokens"), read_attribute(host_model, link, "weight"))): // Not enough tokens for this weight return False! return True! """ code_mark_action = \ """ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): unset_attribute(host_model, name, "executing") instantiate_attribute(host_model, name, "executing", True) return! """ code_consume_transition_constraint = \ """ include "primitives.alh" include "modelling.alh" Boolean function constraint(host_model : Element, name : String): // Check if this node is executing currently return value_eq(read_attribute(host_model, name, "executing"), True)! """ code_consume_place_constraint = \ """ include "primitives.alh" include "modelling.alh" Boolean function constraint(host_model : Element, name : String): // Check if this node is executing currently return value_eq(read_attribute(host_model, name, "executed"), False)! """ code_consume_place_action = \ """ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): Integer tokens Integer weight tokens = read_attribute(host_model, name, "tokens") weight = read_attribute(host_model, mapping["2"], "weight") unset_attribute(host_model, name, "tokens") instantiate_attribute(host_model, name, "tokens", tokens - weight) unset_attribute(host_model, name, "executed") instantiate_attribute(host_model, name, "executed", True) return! """ code_produce_place_action = \ """ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): Integer tokens Integer weight tokens = read_attribute(host_model, name, "tokens") weight = read_attribute(host_model, mapping["2"], "weight") unset_attribute(host_model, name, "tokens") instantiate_attribute(host_model, name, "tokens", tokens + weight) unset_attribute(host_model, name, "executed") instantiate_attribute(host_model, name, "executed", True) return! """ code_unmark_transition_constraint = \ """ include "primitives.alh" include "modelling.alh" Boolean function constraint(host_model : Element, name : String): // Check if this node is executing currently return value_eq(read_attribute(host_model, name, "executing"), True)! """ code_unmark_place_constraint = \ """ include "primitives.alh" include "modelling.alh" Boolean function constraint(host_model : Element, name : String): // Check if this node is executing currently return value_eq(read_attribute(host_model, name, "executed"), True)! """ code_unmark_transition_action = \ """ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): unset_attribute(host_model, name, "executing") instantiate_attribute(host_model, name, "executing", False) return! """ code_unmark_place_action = \ """ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): unset_attribute(host_model, name, "executed") instantiate_attribute(host_model, name, "executed", False) return! """ constructor_mark_constraint = get_constructor(code_mark_constraint) constructor_mark_action = get_constructor(code_mark_action) constructor_consume_transition_constraint = get_constructor(code_consume_transition_constraint) constructor_consume_place_constraint = get_constructor(code_consume_place_constraint) constructor_consume_place_action = get_constructor(code_consume_place_action) constructor_produce_transition_constraint = constructor_consume_transition_constraint constructor_produce_place_constraint = constructor_consume_place_constraint constructor_produce_place_action = get_constructor(code_produce_place_action) constructor_unmark_transition_constraint = constructor_consume_transition_constraint constructor_unmark_transition_action = get_constructor(code_unmark_transition_action) constructor_unmark_place_constraint = get_constructor(code_unmark_place_constraint) constructor_unmark_place_action = get_constructor(code_unmark_place_action) self.assertTrue(run_file(all_files, ["new", "SimpleClassDiagrams", "PetriNets_Runtime", "set_inheritance", "Inheritance", "instantiate", "Class", "Natural", "instantiate", "Class", "Boolean", "instantiate", "Class", "Place", "attr_def", "Place", "tokens", "Natural", "attr_def", "Place", "executed", "Boolean", "instantiate", "Class", "Transition", "attr_def", "Transition", "executing", "Boolean", "instantiate", "Association", "P2T", "Place", "Transition", "attr_def", "P2T", "weight", "Natural", "instantiate", "Association", "T2P", "Transition", "Place", "attr_def", "T2P", "weight", "Natural", "exit", "new", "PetriNets_Runtime", "pn", "instantiate", "Place", "p1", "attr_add", "p1", "tokens", 1, "attr_add", "p1", "executed", False, "instantiate", "Place", "p2", "attr_add", "p2", "tokens", 2, "attr_add", "p2", "executed", False, "instantiate", "Transition", "t1", "attr_add", "t1", "executing", False, "instantiate", "P2T", "p2t", "p1", "t1", "attr_add", "p2t", "weight", 1, "instantiate", "P2T", "p2t2", "p2", "t1", "attr_add", "p2t2", "weight", 1, "exit", "ramify", "PetriNets_Runtime", "new", "PetriNets_Runtime_SCHEDULE", "pn_simulate", # Rule 1: mark transition to execute # LHS "instantiate", "LHS", "lhs_mark", "instantiate", "Pre_Transition", "pre_mark_t", "attr_add", "pre_mark_t", "label", "0", "attr_add_code", "pre_mark_t", "constraint", ] + constructor_mark_constraint + [ "instantiate", "LHS_contains", "", "lhs_mark", "pre_mark_t", # RHS "instantiate", "RHS", "rhs_mark", "instantiate", "Post_Transition", "post_mark_t", "attr_add", "post_mark_t", "label", "0", "attr_add_code", "post_mark_t", "action", ] + constructor_mark_action + [ "instantiate", "RHS_contains", "", "rhs_mark", "post_mark_t", # Atomic of rule 1 "mark" "instantiate", "Atomic", "mark", "instantiate", "AtomicLHS", "", "mark", "lhs_mark", "instantiate", "AtomicRHS", "", "mark", "rhs_mark", # Rule 2: consume tokens going into marked transition # LHS "instantiate", "LHS", "lhs_consume", "instantiate", "Pre_Transition", "pre_consume_t", "attr_add", "pre_consume_t", "label", "0", "attr_add_code", "pre_consume_t", "constraint", ] + constructor_consume_transition_constraint + [ "instantiate", "Pre_Place", "pre_consume_p", "attr_add", "pre_consume_p", "label", "1", "attr_add_code", "pre_consume_p", "constraint", ] + constructor_consume_place_constraint + [ "instantiate", "Pre_P2T", "pre_consume_p2t", "pre_consume_p", "pre_consume_t", "attr_add", "pre_consume_p2t", "label", "2", "instantiate", "LHS_contains", "", "lhs_consume", "pre_consume_t", "instantiate", "LHS_contains", "", "lhs_consume", "pre_consume_p", "instantiate", "LHS_contains", "", "lhs_consume", "pre_consume_p2t", # RHS "instantiate", "RHS", "rhs_consume", "instantiate", "Post_Transition", "post_consume_t", "attr_add", "post_consume_t", "label", "0", "instantiate", "Post_Place", "post_consume_p", "attr_add", "post_consume_p", "label", "1", "attr_add_code", "post_consume_p", "action", ] + constructor_consume_place_action + [ "instantiate", "Post_P2T", "post_consume_p2t", "post_consume_p", "post_consume_t", "attr_add", "post_consume_p2t", "label", "2", "instantiate", "RHS_contains", "", "rhs_consume", "post_consume_t", "instantiate", "RHS_contains", "", "rhs_consume", "post_consume_p", "instantiate", "RHS_contains", "", "rhs_consume", "post_consume_p2t", # Atomic of rule 2 "consume" "instantiate", "Atomic", "consume", "instantiate", "AtomicLHS", "", "consume", "lhs_consume", "instantiate", "AtomicRHS", "", "consume", "rhs_consume", # Rule 3: produce tokens going out of marked transition # LHS "instantiate", "LHS", "lhs_produce", "instantiate", "Pre_Transition", "pre_produce_t", "attr_add", "pre_produce_t", "label", "0", "attr_add_code", "pre_produce_t", "constraint", ] + constructor_produce_transition_constraint + [ "instantiate", "Pre_Place", "pre_produce_p", "attr_add", "pre_produce_p", "label", "1", "attr_add_code", "pre_produce_p", "constraint", ] + constructor_produce_place_constraint + [ "instantiate", "Pre_T2P", "pre_produce_t2p", "pre_produce_t", "pre_produce_p", "attr_add", "pre_produce_t2p", "label", "2", "instantiate", "LHS_contains", "", "lhs_produce", "pre_produce_t", "instantiate", "LHS_contains", "", "lhs_produce", "pre_produce_p", "instantiate", "LHS_contains", "", "lhs_produce", "pre_produce_t2p", # RHS "instantiate", "RHS", "rhs_produce", "instantiate", "Post_Transition", "post_produce_t", "attr_add", "post_produce_t", "label", "0", "instantiate", "Post_Place", "post_produce_p", "attr_add", "post_produce_p", "label", "1", "attr_add_code", "post_produce_p", "action", ] + constructor_produce_place_action + [ "instantiate", "Post_T2P", "post_produce_t2p", "post_produce_t", "post_produce_p", "attr_add", "post_produce_t2p", "label", "2", "instantiate", "RHS_contains", "", "rhs_produce", "post_produce_t", "instantiate", "RHS_contains", "", "rhs_produce", "post_produce_p", "instantiate", "RHS_contains", "", "rhs_produce", "post_produce_t2p", # Atomic of rule 3 "produce" "instantiate", "Atomic", "produce", "instantiate", "AtomicLHS", "", "produce", "lhs_produce", "instantiate", "AtomicRHS", "", "produce", "rhs_produce", # Rule 4: unmark marked transition # LHS "instantiate", "LHS", "lhs_unmark_transition", "instantiate", "Pre_Transition", "pre_unmark_t", "attr_add", "pre_unmark_t", "label", "0", "attr_add_code", "pre_unmark_t", "constraint", ] + constructor_unmark_transition_constraint + [ "instantiate", "LHS_contains", "", "lhs_unmark_transition", "pre_unmark_t", # RHS "instantiate", "RHS", "rhs_unmark_transition", "instantiate", "Post_Transition", "post_unmark_t", "attr_add", "post_unmark_t", "label", "0", "attr_add_code", "post_unmark_t", "action", ] + constructor_unmark_transition_action + [ "instantiate", "RHS_contains", "", "rhs_unmark_transition", "post_unmark_t", # Atomic of rule 4 "unmark transition" "instantiate", "Atomic", "unmark_transition", "instantiate", "AtomicLHS", "", "unmark_transition", "lhs_unmark_transition", "instantiate", "AtomicRHS", "", "unmark_transition", "rhs_unmark_transition", # Rule 5: unmark marked places # LHS "instantiate", "LHS", "lhs_unmark_place", "instantiate", "Pre_Place", "pre_unmark_p", "attr_add", "pre_unmark_p", "label", "0", "attr_add_code", "pre_unmark_p", "constraint", ] + constructor_unmark_place_constraint + [ "instantiate", "LHS_contains", "", "lhs_unmark_place", "pre_unmark_p", # RHS "instantiate", "RHS", "rhs_unmark_place", "instantiate", "Post_Place", "post_unmark_p", "attr_add", "post_unmark_p", "label", "0", "attr_add_code", "post_unmark_p", "action", ] + constructor_unmark_place_action + [ "instantiate", "RHS_contains", "", "rhs_unmark_place", "post_unmark_p", # Atomic of rule 5 "unmark place" "instantiate", "Atomic", "unmark_place", "instantiate", "AtomicLHS", "", "unmark_place", "lhs_unmark_place", "instantiate", "AtomicRHS", "", "unmark_place", "rhs_unmark_place", # Compose schedule "instantiate", "Composite", "main", "instantiate", "Success", "success", "instantiate", "Failure", "failure", # Add all rules "instantiate", "Contains", "", "main", "success", "instantiate", "Contains", "", "main", "failure", "instantiate", "Contains", "", "main", "mark", "instantiate", "Contains", "", "main", "consume", "instantiate", "Contains", "", "main", "produce", "instantiate", "Contains", "", "main", "unmark_transition", "instantiate", "Contains", "", "main", "unmark_place", # Scheduling order "instantiate", "OnSuccess", "", "mark", "consume", "instantiate", "OnFailure", "", "mark", "success", "instantiate", "OnSuccess", "", "consume", "consume", "instantiate", "OnFailure", "", "consume", "produce", "instantiate", "OnSuccess", "", "produce", "produce", "instantiate", "OnFailure", "", "produce", "unmark_transition", "instantiate", "OnSuccess", "", "unmark_transition", "unmark_place", "instantiate", "OnFailure", "", "unmark_transition", "failure", "instantiate", "OnSuccess", "", "unmark_place", "unmark_place", "instantiate", "OnFailure", "", "unmark_place", "success", # Composite initial "instantiate", "Initial", "", "main", "mark", "exit", "transform", "pn", "pn_simulate", "load", "pn", "list", "verify", "read", "t1", "read", "p1", "read", "p2", "exit", ], None, "PO")) def test_po_pn_interface_model_transform_pn(self): PN_runtime = \ """ import models/SimpleClassDiagrams as SimpleClassDiagrams SimpleClassDiagrams (Inheritance) PetriNets_Runtime{ Class Natural {} Class Boolean {} Class Place { tokens : Natural executed : Boolean } Class Transition { executing : Boolean } Association P2T (Place, Transition) { weight : Natural } Association T2P (Transition, Place) { weight : Natural } } export PetriNets_Runtime to models/PetriNets_Runtime """ PN_model = \ """ import models/PetriNets_Runtime as PetriNets_Runtime PetriNets_Runtime pn { Place p1 { tokens = 1 executed = False } Place p2 { tokens = 2 executed = False } Place p3 { tokens = 3 executed = False } Transition t1 { executing = False } P2T (p1, t1) { weight = 1 } P2T (p2, t1) { weight = 1 } T2P (t1, p3) { weight = 2 } } export pn to models/pn """ schedule_model = \ """ import models/PetriNets_Runtime_SCHEDULE as PN_Transform PN_Transform s { Composite schedule { {Contains} Failure failure {} {Contains} Success success {} {Contains} Atomic mark { LHS { Pre_Transition { label = "1" constraint = $ include "primitives.alh" include "modelling.alh" include "object_operations.alh" Boolean function constraint(host_model : Element, name : String): Element links String link String place links = allIncomingAssociationInstances(host_model, name, "P2T") while (read_nr_out(links) > 0): link = set_pop(links) place = readAssociationSource(host_model, link) if (integer_lt(read_attribute(host_model, place, "tokens"), read_attribute(host_model, link, "weight"))): return False! return True! $ } } RHS { Post_Transition { label = "1" action = $ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): unset_attribute(host_model, name, "executing") instantiate_attribute(host_model, name, "executing", True) return! $ } } } {Contains} Atomic consume { LHS { Pre_Transition lhs_consume_t{ label = "0" constraint = $ include "primitives.alh" include "modelling.alh" Boolean function constraint(host_model : Element, name : String): // Check if this node is executing currently return value_eq(read_attribute(host_model, name, "executing"), True)! $ } Pre_Place lhs_consume_p{ label = "1" constraint = $ include "primitives.alh" include "modelling.alh" Boolean function constraint(host_model : Element, name : String): // Check if this node is executing currently return value_eq(read_attribute(host_model, name, "executed"), False)! $ } Pre_P2T lhs_consume_p2t(lhs_consume_p, lhs_consume_t){ label = "2" } } RHS { Post_Transition rhs_consume_t { label = "0" } Post_Place rhs_consume_p { label = "1" action = $ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): Integer tokens Integer weight tokens = read_attribute(host_model, name, "tokens") weight = read_attribute(host_model, mapping["2"], "weight") unset_attribute(host_model, name, "tokens") instantiate_attribute(host_model, name, "tokens", tokens - weight) unset_attribute(host_model, name, "executed") instantiate_attribute(host_model, name, "executed", True) return! $ } Post_P2T (rhs_consume_p, rhs_consume_t){ label = "2" } } } {Contains} Atomic produce { LHS { Pre_Transition lhs_produce_t{ label = "0" constraint = $ include "primitives.alh" include "modelling.alh" Boolean function constraint(host_model : Element, name : String): // Check if this node is executing currently return value_eq(read_attribute(host_model, name, "executing"), True)! $ } Pre_Place lhs_produce_p{ label = "1" constraint = $ include "primitives.alh" include "modelling.alh" Boolean function constraint(host_model : Element, name : String): // Check if this node is executing currently return value_eq(read_attribute(host_model, name, "executed"), False)! $ } Pre_T2P (lhs_produce_t, lhs_produce_p){ label = "2" } } RHS { Post_Transition rhs_produce_t{ label = "0" } Post_Place rhs_produce_p{ label = "1" action = $ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): Integer tokens Integer weight tokens = read_attribute(host_model, name, "tokens") weight = read_attribute(host_model, mapping["2"], "weight") unset_attribute(host_model, name, "tokens") instantiate_attribute(host_model, name, "tokens", tokens + weight) unset_attribute(host_model, name, "executed") instantiate_attribute(host_model, name, "executed", True) return! $ } Post_T2P (rhs_produce_t, rhs_produce_p){ label = "2" } } } {Contains} Atomic unmark_transition { LHS { Pre_Transition { label = "0" constraint = $ include "primitives.alh" include "modelling.alh" Boolean function constraint(host_model : Element, name : String): // Check if this node is executing currently return value_eq(read_attribute(host_model, name, "executing"), True)! $ } } RHS { Post_Transition { label = "0" action = $ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): unset_attribute(host_model, name, "executing") instantiate_attribute(host_model, name, "executing", False) return! $ } } } {Contains} Atomic unmark_place { LHS { Pre_Place { label = "0" constraint = $ include "primitives.alh" include "modelling.alh" Boolean function constraint(host_model : Element, name : String): // Check if this node is executing currently return value_eq(read_attribute(host_model, name, "executed"), True)! $ } } RHS { Post_Place { label = "0" action = $ include "primitives.alh" include "modelling.alh" Void function action(host_model : Element, name : String, mapping : Element): unset_attribute(host_model, name, "executed") instantiate_attribute(host_model, name, "executed", False) return! $ } } } } OnSuccess (mark, consume) {} OnFailure (mark, failure) {} OnSuccess (consume, consume) {} OnFailure (consume, produce) {} OnSuccess (produce, produce) {} OnFailure (produce, unmark_transition) {} OnSuccess (unmark_transition, unmark_place) {} OnFailure (unmark_transition, failure) {} OnSuccess (unmark_place, unmark_place) {} OnFailure (unmark_place, success) {} Initial (schedule, mark) {} } export s to models/pn_simulate """ self.assertTrue(run_file(all_files, get_model_constructor(PN_runtime) + \ get_model_constructor(PN_model) + [ "ramify", "PetriNets_Runtime_SCHEDULE", "PetriNets_Runtime", "PetriNets_Runtime", ] + get_model_constructor(schedule_model) + [ "transform", "pn", "pn_simulate", "load", "pn", "list", "verify", "read", "t1", "read", "p1", "read", "p2", "read", "p3", "exit", ], None, "PO"))