runner.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. import re
  2. from state.devstate import DevState
  3. from bootstrap.scd import bootstrap_scd
  4. from util import loader
  5. from transformation.rule import RuleMatcherRewriter
  6. from transformation.ramify import ramify
  7. from concrete_syntax.graphviz import renderer as graphviz
  8. from concrete_syntax.graphviz.make_url import make_url
  9. from concrete_syntax.plantuml import renderer as plantuml
  10. from concrete_syntax.plantuml.make_url import make_url as plant_make_url
  11. from api.od import ODAPI
  12. import os
  13. from os import listdir
  14. from os.path import isfile, join
  15. import importlib.util
  16. from util.module_to_dict import module_to_dict
  17. from examples.ftg_pm_pt import help_functions
  18. from examples.ftg_pm_pt.ftg_pm_pt import FtgPmPt
  19. class FtgPmPtRunner:
  20. def __init__(self, model: FtgPmPt, composite_linkage: dict | None = None):
  21. self.model = model
  22. self.ram_mm = ramify(self.model.state, self.model.meta_model)
  23. self.rules = self.load_rules()
  24. self.packages = None
  25. self.composite_linkage = composite_linkage
  26. def load_rules(self):
  27. return loader.load_rules(
  28. self.model.state,
  29. lambda rule_name, kind: os.path.join(
  30. os.path.dirname(__file__),
  31. f"operational_semantics/r_{rule_name}_{kind}.od"
  32. ),
  33. self.ram_mm,
  34. ["connect_process_trace", "trigger_ctrl_flow", "exec_activity", "exec_composite_activity"]
  35. )
  36. def set_packages(self, packages: str | dict, is_path: bool):
  37. if not is_path:
  38. self.packages = packages
  39. return
  40. self.packages = self.parse_packages(packages)
  41. def parse_packages(self, packages_path: str) -> dict:
  42. return self.collect_functions_from_packages(packages_path, packages_path)
  43. def collect_functions_from_packages(self, base_path, current_path):
  44. functions_dict = {}
  45. for entry in listdir(current_path):
  46. entry_path = join(current_path, entry)
  47. if isfile(entry_path) and entry.endswith(".py"):
  48. module_name = self.convert_path_to_module_name(base_path, entry_path)
  49. module = self.load_module_from_file(entry_path)
  50. for func_name, func in module_to_dict(module).items():
  51. functions_dict[f"{module_name}.{func_name}"] = func
  52. elif not isfile(entry_path):
  53. nested_functions = self.collect_functions_from_packages(base_path, entry_path)
  54. functions_dict.update(nested_functions)
  55. return functions_dict
  56. @staticmethod
  57. def convert_path_to_module_name(base_path, file_path):
  58. return file_path.replace(base_path, "").replace(".py", "").replace("/", "")
  59. @staticmethod
  60. def load_module_from_file(file_path):
  61. spec = importlib.util.spec_from_file_location("", file_path)
  62. module = importlib.util.module_from_spec(spec)
  63. spec.loader.exec_module(module)
  64. return module
  65. def create_matcher(self):
  66. packages = module_to_dict(help_functions)
  67. if self.packages:
  68. packages.update({ "packages": self.packages })
  69. if self.composite_linkage:
  70. packages.update({ "composite_linkage": self.composite_linkage })
  71. matcher_rewriter = RuleMatcherRewriter(
  72. self.model.state, self.model.meta_model, self.ram_mm, eval_context=packages
  73. )
  74. return matcher_rewriter
  75. def visualize_model(self):
  76. print(make_url(graphviz.render_object_diagram(self.model.state, self.model.model, self.model.meta_model)))
  77. print(plant_make_url(plantuml.render_object_diagram(self.model.state, self.model.model, self.model.meta_model)))
  78. @staticmethod
  79. def __extract_artefact_info(od, pt_art):
  80. """Extract artefact metadata and data."""
  81. data = od.get_slot_value(pt_art, "data")
  82. pm_art = od.get_name(od.get_target(od.get_outgoing(pt_art, "pt_BelongsTo")[0]))
  83. has_prev_version = bool(od.get_outgoing(pt_art, "pt_PrevVersion"))
  84. is_last_version = not od.get_incoming(pt_art, "pt_PrevVersion")
  85. return {
  86. "Artefact Name": pm_art,
  87. "Data": data,
  88. "Has previous version": has_prev_version,
  89. "Is last version": is_last_version
  90. }
  91. def __extract_inputs(self, od, event_node):
  92. """Extract all consumed artefacts for an event."""
  93. return [
  94. self.__extract_artefact_info(od, od.get_source(consumes))
  95. for consumes in od.get_incoming(event_node, "pt_Consumes")
  96. ]
  97. def __extract_outputs(self, od, event_node):
  98. """Extract all produced artefacts for an event."""
  99. return [
  100. self.__extract_artefact_info(od, od.get_target(produces))
  101. for produces in od.get_outgoing(event_node, "pt_Produces")
  102. ]
  103. @staticmethod
  104. def to_snake_case(experiment_type):
  105. # Finds uppercase letters that are not at the start of the string.
  106. # Example: AtomicExperiment -> atomic_experiment
  107. return re.sub(r'(?<!^)(?=[A-Z])', '_', experiment_type).lower()
  108. def run(self, debug_flag: bool = False):
  109. matcher = self.create_matcher()
  110. rule_performed = True
  111. while rule_performed:
  112. # Loop over all the rules first in order priority
  113. for i, (rule_name, rule) in enumerate(self.rules.items()):
  114. rule_performed = False
  115. result = matcher.exec_on_first_match(
  116. self.model.model, rule, rule_name, in_place=True
  117. )
  118. # If the rule cannot be executed go to the next rule
  119. if not result:
  120. continue
  121. rule_performed = True
  122. self.model.model, lhs_match, _ = result
  123. if debug_flag:
  124. print(f"Match: {lhs_match}")
  125. self.visualize_model()
  126. # If a rule is performed, break and start loping over the rules from the beginning
  127. break