import unittest from utils import * import sys import os import shutil sys.path.append("wrappers") from modelverse import * import threading import uuid import glob def traceability_CTRL2EPN(model): instantiate(model, "Association", ("PW_Control/State", "Encapsulated_PetriNet/Place"), ID="CTRL2EPN_link") instantiate(model, "Association", ("PW_Control/Transition", "Encapsulated_PetriNet/Transition"), ID="CTRL2EPN_tlink") def traceability_PLANT2EPN(model): instantiate(model, "Association", ("PW_Plant/State", "Encapsulated_PetriNet/Place"), ID="PLANT2EPN_link") instantiate(model, "Association", ("PW_Plant/Transition", "Encapsulated_PetriNet/Transition"), ID="PLANT2EPN_tlink") def traceability_ENV2EPN(model): instantiate(model, "Association", ("PW_Environment/Event", "Encapsulated_PetriNet/Place"), ID="ENV2EPN_link") def traceability_EPN2PN(model): instantiate(model, "Association", ("Encapsulated_PetriNet/Place", "PetriNet/Place"), ID="EPN2PN_transition_link") instantiate(model, "Association", ("Encapsulated_PetriNet/Transition", "PetriNet/Transition"), ID="EPN2PN_place_link") def upload_req_debug(model): model_overwrite(model, open("models/PowerWindow/models/requirements.mvc", "r").read()) open(".TEST_POWER_WINDOW/req_%s" % uuid.uuid4(), 'w').close() assert len(glob.glob(".TEST_POWER_WINDOW/*")) <= 11 def upload_plant_debug(model): model_overwrite(model, open("models/PowerWindow/models/plant.mvc", "r").read()) open(".TEST_POWER_WINDOW/plant_%s" % uuid.uuid4(), 'w').close() assert len(glob.glob(".TEST_POWER_WINDOW/*")) <= 11 def upload_env_debug(model): model_overwrite(model, open("models/PowerWindow/models/environment.mvc", "r").read()) open(".TEST_POWER_WINDOW/env_%s" % uuid.uuid4(), 'w').close() assert len(glob.glob(".TEST_POWER_WINDOW/*")) <= 11 def upload_ctrl_debug(model): if (len(glob.glob(".TEST_POWER_WINDOW/ctrl_*")) == 0): model_overwrite(model, open("models/PowerWindow/models/control_wrong.mvc", "r").read()) else: model_overwrite(model, open("models/PowerWindow/models/control.mvc", "r").read()) open(".TEST_POWER_WINDOW/ctrl_%s" % uuid.uuid4(), 'w').close() assert len(glob.glob(".TEST_POWER_WINDOW/*")) <= 11 def upload_query_debug(model): model_overwrite(model, open("models/SafetyQuery/models/powerwindow_safe.mvc", "r").read()) open(".TEST_POWER_WINDOW/query_%s" % uuid.uuid4(), 'w').close() assert len(glob.glob(".TEST_POWER_WINDOW/*")) <= 11 def upload_arch_debug(model): model_overwrite(model, open("models/PowerWindow/models/architecture.mvc", "r").read()) open(".TEST_POWER_WINDOW/arch_%s" % uuid.uuid4(), 'w').close() assert len(glob.glob(".TEST_POWER_WINDOW/*")) <= 11 def upload_req_fast(model): model_overwrite(model, open("models/PowerWindow/models/requirements.mvc", "r").read()) open(".TEST_POWER_WINDOW/req_%s" % uuid.uuid4(), 'w').close() assert len(glob.glob(".TEST_POWER_WINDOW/*")) <= 6 def upload_plant_fast(model): model_overwrite(model, open("models/PowerWindow/models/plant.mvc", "r").read()) open(".TEST_POWER_WINDOW/plant_%s" % uuid.uuid4(), 'w').close() assert len(glob.glob(".TEST_POWER_WINDOW/*")) <= 6 def upload_env_fast(model): model_overwrite(model, open("models/PowerWindow/models/environment.mvc", "r").read()) open(".TEST_POWER_WINDOW/env_%s" % uuid.uuid4(), 'w').close() assert len(glob.glob(".TEST_POWER_WINDOW/*")) <= 6 def upload_ctrl_fast(model): model_overwrite(model, open("models/PowerWindow/models/control.mvc", "r").read()) open(".TEST_POWER_WINDOW/ctrl_%s" % uuid.uuid4(), 'w').close() assert len(glob.glob(".TEST_POWER_WINDOW/*")) <= 6 def upload_query_fast(model): model_overwrite(model, open("models/SafetyQuery/models/powerwindow_safe.mvc", "r").read()) open(".TEST_POWER_WINDOW/query_%s" % uuid.uuid4(), 'w').close() assert len(glob.glob(".TEST_POWER_WINDOW/*")) <= 6 def upload_arch_fast(model): model_overwrite(model, open("models/PowerWindow/models/architecture.mvc", "r").read()) open(".TEST_POWER_WINDOW/arch_%s" % uuid.uuid4(), 'w').close() assert len(glob.glob(".TEST_POWER_WINDOW/*")) <= 6 class TestPowerWindow(unittest.TestCase): def setUp(self): self.proc, self.address = start_mvc() init(self.address) login("admin", "admin") def tearDown(self): kill(self.proc) def test_process_powerwindow_fast(self): model_add("formalisms/ReachabilityGraph", "formalisms/SimpleClassDiagrams", open("models/ReachabilityGraph/metamodels/reachability_graph.mvc", "r").read()) model_add("formalisms/PetriNet", "formalisms/SimpleClassDiagrams", open("models/PetriNets/metamodels/PetriNets.mvc", 'r').read()) model_add("formalisms/Encapsulated_PetriNet", "formalisms/SimpleClassDiagrams", open("models/EncapsulatedPetriNets/metamodels/epn.mvc", 'r').read()) model_add("formalisms/PW_Plant", "formalisms/SimpleClassDiagrams", open("models/PowerWindow/metamodels/plant.mvc", 'r').read()) model_add("formalisms/PW_Environment", "formalisms/SimpleClassDiagrams", open("models/PowerWindow/metamodels/environment.mvc", 'r').read()) model_add("formalisms/PW_Control", "formalisms/SimpleClassDiagrams", open("models/PowerWindow/metamodels/control.mvc", 'r').read()) model_add("formalisms/Requirements", "formalisms/SimpleClassDiagrams", open("models/PowerWindow/metamodels/requirements.mvc", 'r').read()) model_add("formalisms/Query", "formalisms/SimpleClassDiagrams", open("models/SafetyQuery/metamodels/query.mvc", 'r').read()) model_add("formalisms/Architecture", "formalisms/SimpleClassDiagrams", open("models/PowerWindow/metamodels/architecture.mvc", 'r').read()) model_add("models/pm_powerwindow", "formalisms/ProcessModel", open("models/PowerWindow/PM_analyze.mvc", 'r').read()) transformation_add_MANUAL({"Requirements": "formalisms/Requirements"}, {"Requirements": "formalisms/Requirements"}, "models/revise_req") transformation_add_MANUAL({"Requirements": "formalisms/Requirements", "PW_Environment": "formalisms/PW_Environment"}, {"PW_Environment": "formalisms/PW_Environment"}, "models/revise_environment") transformation_add_MANUAL({"Requirements": "formalisms/Requirements", "PW_Plant": "formalisms/PW_Plant"}, {"PW_Plant": "formalisms/PW_Plant"}, "models/revise_plant") transformation_add_MANUAL({"Requirements": "formalisms/Requirements", "PW_Control": "formalisms/PW_Control"}, {"PW_Control": "formalisms/PW_Control"}, "models/revise_control") transformation_add_MANUAL({"Requirements": "formalisms/Requirements", "Query": "formalisms/Query"}, {"Query": "formalisms/Query"}, "models/revise_query") transformation_add_MANUAL({"Requirements": "formalisms/Requirements", "Architecture": "formalisms/Architecture"}, {"Architecture": "formalisms/Architecture"}, "models/revise_architecture") transformation_add_MT({}, {"PW_Plant": "formalisms/PW_Plant", "PW_Environment": "formalisms/PW_Environment", "PW_Control": "formalisms/PW_Control", "Query": "formalisms/Query", "Architecture": "formalisms/Architecture", "Requirements": "formalisms/Requirements"}, "models/make_initial_models", open("models/PowerWindow/transformations/initialize.mvc", 'r').read()) transformation_add_MT({"PW_Plant": "formalisms/PW_Plant"}, {"Encapsulated_PetriNet": "formalisms/Encapsulated_PetriNet"}, "models/plant_to_EPN", open("models/PowerWindow/transformations/plant_to_EPN.mvc", 'r').read(), traceability_PLANT2EPN) transformation_add_MT({"PW_Control": "formalisms/PW_Control"}, {"Encapsulated_PetriNet": "formalisms/Encapsulated_PetriNet"}, "models/control_to_EPN", open("models/PowerWindow/transformations/control_to_EPN.mvc", 'r').read(), traceability_CTRL2EPN) transformation_add_MT({"PW_Environment": "formalisms/PW_Environment"}, {"Encapsulated_PetriNet": "formalisms/Encapsulated_PetriNet"}, "models/environment_to_EPN", open("models/PowerWindow/transformations/environment_to_EPN.mvc", 'r').read(), traceability_ENV2EPN) transformation_add_MT({"Encapsulated_PetriNet": "formalisms/Encapsulated_PetriNet", "Architecture": "formalisms/Architecture"}, {"PetriNet": "formalisms/PetriNet"}, "models/combine_EPN", open("models/EncapsulatedPetriNets/transformations/combine.mvc", 'r').read(), traceability_EPN2PN) transformation_add_MT({"ReachabilityGraph": "formalisms/ReachabilityGraph", "Query": "formalisms/Query"}, {"ReachabilityGraph": "formalisms/ReachabilityGraph"}, "models/match", open("models/ReachabilityGraph/transformations/match_query.mvc", 'r').read()) transformation_add_AL({"PetriNet": "formalisms/PetriNet"}, {"ReachabilityGraph": "formalisms/ReachabilityGraph"}, "models/reachability", open("models/PetriNets/transformations/reachability.alc", 'r').read()) transformation_add_AL({"ReachabilityGraph": "formalisms/ReachabilityGraph"}, {}, "models/bfs", open("models/ReachabilityGraph/transformations/bfs.alc", 'r').read()) transformation_add_AL({"EPN_Plant": "formalisms/Encapsulated_PetriNet", "EPN_Control": "formalisms/Encapsulated_PetriNet", "EPN_Environment": "formalisms/Encapsulated_PetriNet"}, {"Encapsulated_PetriNet": "formalisms/Encapsulated_PetriNet"}, "models/merge_EPN", open("models/EncapsulatedPetriNets/transformations/merge.alc", 'r').read()) try: shutil.rmtree(".TEST_POWER_WINDOW") except: pass os.makedirs(".TEST_POWER_WINDOW") callbacks = { "models/revise_req": upload_req_fast, "models/revise_plant": upload_plant_fast, "models/revise_environment": upload_env_fast, "models/revise_control": upload_ctrl_fast, "models/revise_query": upload_query_fast, "models/revise_architecture": upload_arch_fast, } try: process_execute("models/pm_powerwindow", {}, callbacks) except: import traceback print(traceback.format_exc()) called = len(os.listdir(".TEST_POWER_WINDOW/")) shutil.rmtree(".TEST_POWER_WINDOW") if called != 6: print(called) raise Exception("Not executed sufficiently:" + str(called)) @slow def test_process_powerwindow_debug(self): model_add("formalisms/ReachabilityGraph", "formalisms/SimpleClassDiagrams", open("models/ReachabilityGraph/metamodels/reachability_graph.mvc", "r").read()) model_add("formalisms/PetriNet", "formalisms/SimpleClassDiagrams", open("models/PetriNets/metamodels/PetriNets.mvc", 'r').read()) model_add("formalisms/Encapsulated_PetriNet", "formalisms/SimpleClassDiagrams", open("models/EncapsulatedPetriNets/metamodels/epn.mvc", 'r').read()) model_add("formalisms/PW_Plant", "formalisms/SimpleClassDiagrams", open("models/PowerWindow/metamodels/plant.mvc", 'r').read()) model_add("formalisms/PW_Environment", "formalisms/SimpleClassDiagrams", open("models/PowerWindow/metamodels/environment.mvc", 'r').read()) model_add("formalisms/PW_Control", "formalisms/SimpleClassDiagrams", open("models/PowerWindow/metamodels/control.mvc", 'r').read()) model_add("formalisms/Requirements", "formalisms/SimpleClassDiagrams", open("models/PowerWindow/metamodels/requirements.mvc", 'r').read()) model_add("formalisms/Query", "formalisms/SimpleClassDiagrams", open("models/SafetyQuery/metamodels/query.mvc", 'r').read()) model_add("formalisms/Architecture", "formalisms/SimpleClassDiagrams", open("models/PowerWindow/metamodels/architecture.mvc", 'r').read()) model_add("models/pm_powerwindow", "formalisms/ProcessModel", open("models/PowerWindow/PM_analyze.mvc", 'r').read()) transformation_add_MANUAL({"Requirements": "formalisms/Requirements"}, {"Requirements": "formalisms/Requirements"}, "models/revise_req") transformation_add_MANUAL({"Requirements": "formalisms/Requirements", "PW_Environment": "formalisms/PW_Environment"}, {"PW_Environment": "formalisms/PW_Environment"}, "models/revise_environment") transformation_add_MANUAL({"Requirements": "formalisms/Requirements", "PW_Plant": "formalisms/PW_Plant"}, {"PW_Plant": "formalisms/PW_Plant"}, "models/revise_plant") transformation_add_MANUAL({"Requirements": "formalisms/Requirements", "PW_Control": "formalisms/PW_Control"}, {"PW_Control": "formalisms/PW_Control"}, "models/revise_control") transformation_add_MANUAL({"Requirements": "formalisms/Requirements", "Query": "formalisms/Query"}, {"Query": "formalisms/Query"}, "models/revise_query") transformation_add_MANUAL({"Requirements": "formalisms/Requirements", "Architecture": "formalisms/Architecture"}, {"Architecture": "formalisms/Architecture"}, "models/revise_architecture") transformation_add_MT({}, {"PW_Plant": "formalisms/PW_Plant", "PW_Environment": "formalisms/PW_Environment", "PW_Control": "formalisms/PW_Control", "Query": "formalisms/Query", "Architecture": "formalisms/Architecture", "Requirements": "formalisms/Requirements"}, "models/make_initial_models", open("models/PowerWindow/transformations/initialize.mvc", 'r').read()) transformation_add_MT({"PW_Plant": "formalisms/PW_Plant"}, {"Encapsulated_PetriNet": "formalisms/Encapsulated_PetriNet"}, "models/plant_to_EPN", open("models/PowerWindow/transformations/plant_to_EPN.mvc", 'r').read(), traceability_PLANT2EPN) transformation_add_MT({"PW_Control": "formalisms/PW_Control"}, {"Encapsulated_PetriNet": "formalisms/Encapsulated_PetriNet"}, "models/control_to_EPN", open("models/PowerWindow/transformations/control_to_EPN.mvc", 'r').read(), traceability_CTRL2EPN) transformation_add_MT({"PW_Environment": "formalisms/PW_Environment"}, {"Encapsulated_PetriNet": "formalisms/Encapsulated_PetriNet"}, "models/environment_to_EPN", open("models/PowerWindow/transformations/environment_to_EPN.mvc", 'r').read(), traceability_ENV2EPN) transformation_add_MT({"Encapsulated_PetriNet": "formalisms/Encapsulated_PetriNet", "Architecture": "formalisms/Architecture"}, {"PetriNet": "formalisms/PetriNet"}, "models/combine_EPN", open("models/EncapsulatedPetriNets/transformations/combine.mvc", 'r').read(), traceability_EPN2PN) transformation_add_MT({"ReachabilityGraph": "formalisms/ReachabilityGraph", "Query": "formalisms/Query"}, {"ReachabilityGraph": "formalisms/ReachabilityGraph"}, "models/match", open("models/ReachabilityGraph/transformations/match_query.mvc", 'r').read()) transformation_add_AL({"PetriNet": "formalisms/PetriNet"}, {"ReachabilityGraph": "formalisms/ReachabilityGraph"}, "models/reachability", open("models/PetriNets/transformations/reachability.alc", 'r').read()) transformation_add_AL({"ReachabilityGraph": "formalisms/ReachabilityGraph"}, {}, "models/bfs", open("models/ReachabilityGraph/transformations/bfs.alc", 'r').read()) transformation_add_AL({"EPN_Plant": "formalisms/Encapsulated_PetriNet", "EPN_Control": "formalisms/Encapsulated_PetriNet", "EPN_Environment": "formalisms/Encapsulated_PetriNet"}, {"Encapsulated_PetriNet": "formalisms/Encapsulated_PetriNet"}, "models/merge_EPN", open("models/EncapsulatedPetriNets/transformations/merge.alc", 'r').read()) try: shutil.rmtree(".TEST_POWER_WINDOW") except: pass os.makedirs(".TEST_POWER_WINDOW") import log_output error_path = [] ctrl = log_output.Controller(error_path, keep_running=False) thrd = threading.Thread(target=ctrl.start) thrd.daemon = True thrd.start() callbacks = { "models/revise_req": upload_req_debug, "models/revise_plant": upload_plant_debug, "models/revise_environment": upload_env_debug, "models/revise_control": upload_ctrl_debug, "models/revise_query": upload_query_debug, "models/revise_architecture": upload_arch_debug, "models/bfs": (ctrl, "inp", "outp"), } process_execute("models/pm_powerwindow", {}, callbacks) thrd.join() called = len(os.listdir(".TEST_POWER_WINDOW")) shutil.rmtree(".TEST_POWER_WINDOW") if called != 11: print(called) raise Exception("Not executed sufficiently:" + str(called)) assert len(error_path) == 10