rule.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. from concrete_syntax.textual_od.renderer import render_od
  2. import pprint
  3. from typing import Generator, Callable
  4. from uuid import UUID
  5. import functools
  6. from api.od import ODAPI
  7. from concrete_syntax.common import indent
  8. from transformation.matcher import match_od
  9. from transformation.rewriter import rewrite
  10. from transformation.cloner import clone_od
  11. from util.timer import Timer
  12. class Rule:
  13. def __init__(self, nacs: list[UUID], lhs: UUID, rhs: UUID):
  14. self.nacs = nacs
  15. self.lhs = lhs
  16. self.rhs = rhs
  17. PP = pprint.PrettyPrinter(depth=4)
  18. class _NAC_MATCHED(Exception):
  19. pass
  20. # Helper for executing NAC/LHS/RHS-type rules
  21. class RuleMatcherRewriter:
  22. def __init__(self, state, mm: UUID, mm_ramified: UUID):
  23. self.state = state
  24. self.mm = mm
  25. self.mm_ramified = mm_ramified
  26. # Generates matches.
  27. # Every match is a dictionary with entries LHS_element_name -> model_element_name
  28. def match_rule(self, m: UUID, lhs: UUID, nacs: list[UUID], rule_name: str) -> Generator[dict, None, None]:
  29. lhs_matcher = match_od(self.state,
  30. host_m=m,
  31. host_mm=self.mm,
  32. pattern_m=lhs,
  33. pattern_mm=self.mm_ramified)
  34. try:
  35. # First we iterate over LHS-matches:
  36. # for i, lhs_match in enumerate(lhs_matcher):
  37. x=0
  38. while True:
  39. try:
  40. with Timer(f"MATCH LHS {rule_name}"):
  41. lhs_match = lhs_matcher.__next__()
  42. x += 1
  43. nac_matched = False
  44. with Timer(f"MATCH NACs {rule_name}"):
  45. try:
  46. for i_nac, nac in enumerate(nacs):
  47. # For every LHS-match, we see if there is a NAC-match:
  48. nac_matcher = match_od(self.state,
  49. host_m=m,
  50. host_mm=self.mm,
  51. pattern_m=nac,
  52. pattern_mm=self.mm_ramified,
  53. pivot=lhs_match) # try to "grow" LHS-match with NAC-match
  54. try:
  55. # for nac_match in nac_matcher:
  56. while True:
  57. try:
  58. with Timer(f"MATCH NAC{i_nac} {rule_name}"):
  59. nac_match = nac_matcher.__next__()
  60. # The NAC has at least one match
  61. # (there could be more, but we know enough, so let's not waste CPU/MEM resources and proceed to next LHS match)
  62. raise _NAC_MATCHED()
  63. except StopIteration:
  64. break # no more nac-matches
  65. except Exception as e:
  66. # The exception may originate from eval'ed condition-code in LHS or NAC
  67. # Decorate exception with some context, to help with debugging
  68. e.add_note(f"while matching NAC of '{rule_name}'")
  69. raise
  70. except _NAC_MATCHED:
  71. continue # continue with next LHS-match
  72. # There were no NAC matches -> yield LHS-match!
  73. yield lhs_match
  74. except StopIteration:
  75. break # no more lhs-matches
  76. except Exception as e:
  77. # The exception may originate from eval'ed condition-code in LHS or NAC
  78. # Decorate exception with some context, to help with debugging
  79. e.add_note(f"while matching LHS of '{rule_name}'")
  80. raise
  81. def exec_rule(self, m: UUID, lhs: UUID, rhs: UUID, lhs_match: dict, rule_name: str, in_place=False):
  82. if in_place:
  83. # dangerous
  84. cloned_m = m
  85. else:
  86. cloned_m = clone_od(self.state, m, self.mm)
  87. # print('before clone:')
  88. # print(render_od(self.state, m, self.mm))
  89. # print('after clone:')
  90. # print(render_od(self.state, cloned_m, self.mm))
  91. try:
  92. rhs_match = rewrite(self.state,
  93. lhs_m=lhs,
  94. rhs_m=rhs,
  95. pattern_mm=self.mm_ramified,
  96. lhs_match=lhs_match,
  97. host_m=cloned_m,
  98. host_mm=self.mm)
  99. except Exception as e:
  100. # Make exceptions raised in eval'ed code easier to trace:
  101. e.add_note(f"while executing RHS of '{rule_name}'")
  102. raise
  103. return (cloned_m, rhs_match)
  104. # Generator that yields actions in the format expected by 'Simulator' class
  105. class ActionGenerator:
  106. def __init__(self, matcher_rewriter: RuleMatcherRewriter, rule_dict: dict[str, Rule]):
  107. self.matcher_rewriter = matcher_rewriter
  108. self.rule_dict = rule_dict
  109. def __call__(self, od: ODAPI):
  110. at_least_one_match = False
  111. for rule_name, rule in self.rule_dict.items():
  112. match_iterator = self.matcher_rewriter.match_rule(od.m, rule.lhs, rule.nacs, rule_name)
  113. x = 0
  114. while True:
  115. try:
  116. # if True:
  117. with Timer(f"MATCH RULE {rule_name}"):
  118. lhs_match = match_iterator.__next__()
  119. x += 1
  120. # We got a match!
  121. def do_action(od, rule, lhs_match, rule_name):
  122. with Timer(f"EXEC RHS {rule_name}"):
  123. new_m, rhs_match = self.matcher_rewriter.exec_rule(od.m, rule.lhs, rule.rhs, lhs_match, rule_name)
  124. msgs = [f"executed rule '{rule_name}'\n" + indent(PP.pformat(rhs_match), 6)]
  125. return (ODAPI(od.state, new_m, od.mm), msgs)
  126. yield (
  127. rule_name + '\n' + indent(PP.pformat(lhs_match), 6), # description of action
  128. functools.partial(do_action, od, rule, lhs_match, rule_name) # the action itself (as a callback)
  129. )
  130. at_least_one_match = True
  131. except StopIteration:
  132. break
  133. return at_least_one_match
  134. # Given a list of actions (in high -> low priority), will always yield the highest priority enabled actions.
  135. class PriorityActionGenerator:
  136. def __init__(self, matcher_rewriter: RuleMatcherRewriter, rule_dicts: list[dict[str, Rule]]):
  137. self.generators = [ActionGenerator(matcher_rewriter, rule_dict) for rule_dict in rule_dicts]
  138. def __call__(self, od: ODAPI):
  139. for generator in self.generators:
  140. at_least_one_match = yield from generator(od)
  141. if at_least_one_match:
  142. return True
  143. return False
  144. # class ForAllGenerator:
  145. # def __init__(self, matcher_rewriter: RuleMatcherRewriter, rule_dict: dict[str, Rule]):
  146. # self.matcher_rewriter = matcher_rewriter
  147. # self.rule_dict = rule_dict
  148. # def __call__(self, od: ODAPI):
  149. # matches = []
  150. # for rule_name, rule in self.rule_dict.items():
  151. # for lhs_match in self.matcher_rewriter.match_rule(od.m, rule.lhs, rule.nacs, rule_name):
  152. # matches.append((rule_name, rule, lhs_match))
  153. # def do_action(matches):
  154. # pass
  155. # if len(matches) > 0:
  156. # yield (
  157. # [rule_name for rule_name, _, _ in matches]
  158. # )
  159. # return True
  160. # return False