simulator.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. import abc
  2. import random
  3. import math
  4. import functools
  5. import sys
  6. from framework.conformance import Conformance, render_conformance_check_result
  7. from concrete_syntax.common import indent
  8. from concrete_syntax.textual_od.renderer import render_od
  9. from transformation.cloner import clone_od
  10. from api.od import ODAPI
  11. class DecisionMaker:
  12. @abc.abstractmethod
  13. def __call__(self, actions):
  14. pass
  15. class Simulator:
  16. def __init__(self,
  17. action_generator,
  18. decision_maker: DecisionMaker,
  19. termination_condition,
  20. check_conformance=True,
  21. verbose=True,
  22. renderer=lambda od: render_od(od.state, od.m, od.mm),
  23. ):
  24. self.action_generator = action_generator
  25. self.decision_maker = decision_maker
  26. self.termination_condition = termination_condition
  27. self.check_conformance = check_conformance
  28. self.verbose = verbose
  29. self.renderer = renderer
  30. def __print(self, *args):
  31. if self.verbose:
  32. print(*args)
  33. # Run simulation until termination condition satisfied
  34. def run(self, od):
  35. self.__print("Start simulation")
  36. self.__print(f"Decision maker: {self.decision_maker}")
  37. step_counter = 0
  38. while True:
  39. self.__print("--------------")
  40. self.__print(indent(self.renderer(od), 4))
  41. self.__print("--------------")
  42. termination_reason = self.termination_condition(od)
  43. if termination_reason != None:
  44. self.__print(f"Termination condition satisfied.\nReason: {termination_reason}.")
  45. break
  46. actions = self.action_generator(od)
  47. chosen_action = self.decision_maker(actions)
  48. if chosen_action == None:
  49. self.__print(f"No enabled actions.")
  50. break
  51. (od, msgs) = chosen_action()
  52. self.__print(indent('\n'.join(f"▸ {msg}" for msg in msgs), 2))
  53. step_counter += 1
  54. if self.check_conformance:
  55. self.__print()
  56. conf = Conformance(od.state, od.m, od.mm)
  57. self.__print(render_conformance_check_result(conf.check_nominal()))
  58. self.__print(f"Executed {step_counter} steps.")
  59. return od
  60. def make_actions_pure(actions, od):
  61. # Copy model before modifying it
  62. def exec_pure(action, od):
  63. cloned_rt_m = clone_od(od.state, od.m, od.mm)
  64. new_od = ODAPI(od.state, cloned_rt_m, od.mm)
  65. msgs = action(new_od)
  66. return (new_od, msgs)
  67. for descr, action in actions:
  68. yield (descr, functools.partial(exec_pure, action, od))
  69. def filter_valid_actions(pure_actions):
  70. result = {}
  71. def make_tuple(new_od, msgs):
  72. return (new_od, msgs)
  73. for name, callback in pure_actions:
  74. print(f"attempt '{name}' ...", end='\r')
  75. (new_od, msgs) = callback()
  76. conf = Conformance(new_od.state, new_od.m, new_od.mm)
  77. errors = conf.check_nominal()
  78. # erase current line:
  79. print(" ", end='\r')
  80. if len(errors) == 0:
  81. # updated RT-M is conform, we have a valid action:
  82. yield (name, functools.partial(make_tuple, new_od, msgs))
  83. class RandomDecisionMaker(DecisionMaker):
  84. def __init__(self, seed=0, verbose=True):
  85. self.seed = seed
  86. self.r = random.Random(seed)
  87. def __str__(self):
  88. return f"RandomDecisionMaker(seed={self.seed})"
  89. def __call__(self, actions):
  90. arr = [action for descr, action in actions]
  91. i = math.floor(self.r.random()*len(arr))
  92. return arr[i]
  93. class InteractiveDecisionMaker(DecisionMaker):
  94. def __init__(self, msg="Select action:"):
  95. self.msg = msg
  96. def __str__(self):
  97. return f"InteractiveDecisionMaker()"
  98. def __call__(self, actions):
  99. arr = []
  100. for i, (key, result) in enumerate(actions):
  101. print(f" {i}. {key}")
  102. arr.append(result)
  103. if len(arr) == 0:
  104. return
  105. def __choose():
  106. sys.stdout.write(f"{self.msg} ")
  107. try:
  108. raw = input()
  109. choice = int(raw) # may raise ValueError
  110. if choice >= 0 and choice < len(arr):
  111. return arr[choice]
  112. except ValueError:
  113. pass
  114. print("Invalid option")
  115. return __choose()
  116. return __choose()