simulator.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. import abc
  2. import random
  3. import math
  4. import functools
  5. import sys
  6. from framework.conformance import Conformance, render_conformance_check_result
  7. from concrete_syntax.common import indent
  8. from concrete_syntax.textual_od.renderer import render_od
  9. class DecisionMaker:
  10. @abc.abstractmethod
  11. def __call__(self, actions):
  12. pass
  13. class Simulator:
  14. def __init__(self,
  15. action_generator,
  16. decision_maker: DecisionMaker,
  17. termination_condition,
  18. check_conformance=True,
  19. verbose=True,
  20. renderer=lambda od: render_od(od.state, od.m, od.mm),
  21. ):
  22. self.action_generator = action_generator
  23. self.decision_maker = decision_maker
  24. self.termination_condition = termination_condition
  25. self.check_conformance = check_conformance
  26. self.verbose = verbose
  27. self.renderer = renderer
  28. def __print(self, *args):
  29. if self.verbose:
  30. print(*args)
  31. # Run simulation until termination condition satisfied
  32. def run(self, od):
  33. self.__print("Start simulation")
  34. step_counter = 0
  35. while True:
  36. self.__print("--------------")
  37. self.__print(indent(self.renderer(od), 4))
  38. self.__print("--------------")
  39. termination_reason = self.termination_condition(od)
  40. if termination_reason != None:
  41. self.__print(f"Termination condition satisfied.\nReason: {termination_reason}.")
  42. break
  43. actions = self.action_generator(od)
  44. chosen_action = self.decision_maker(actions)
  45. if chosen_action == None:
  46. self.__print(f"No enabled actions.")
  47. break
  48. (od, msgs) = chosen_action()
  49. self.__print(indent('\n'.join(f"▸ {msg}" for msg in msgs), 2))
  50. step_counter += 1
  51. if self.check_conformance:
  52. self.__print()
  53. conf = Conformance(od.state, od.m, od.mm)
  54. self.__print(render_conformance_check_result(conf.check_nominal()))
  55. self.__print(f"Executed {step_counter} steps.")
  56. return od
  57. def filter_valid_actions(actions):
  58. result = {}
  59. def make_tuple(new_od, msgs):
  60. return (new_od, msgs)
  61. for name, callback in actions:
  62. print(f"attempt '{name}' ...", end='\r')
  63. (new_od, msgs) = callback()
  64. conf = Conformance(new_od.state, new_od.m, new_od.mm)
  65. errors = conf.check_nominal()
  66. # erase current line:
  67. print(" ", end='\r')
  68. if len(errors) == 0:
  69. # updated RT-M is conform, we have a valid action:
  70. yield (name, functools.partial(make_tuple, new_od, msgs))
  71. class RandomDecisionMaker(DecisionMaker):
  72. def __init__(self, seed=0, verbose=True):
  73. self.r = random.Random(seed)
  74. def __call__(self, actions):
  75. arr = [action for descr, action in actions]
  76. i = math.floor(self.r.random()*len(arr))
  77. return arr[i]
  78. class InteractiveDecisionMaker(DecisionMaker):
  79. def __init__(self, msg="Select action:"):
  80. self.msg = msg
  81. def __call__(self, actions):
  82. arr = []
  83. for i, (key, result) in enumerate(actions):
  84. print(f" {i}. {key}")
  85. arr.append(result)
  86. if len(arr) == 0:
  87. return
  88. def __choose():
  89. sys.stdout.write(f"{self.msg} ")
  90. try:
  91. raw = input()
  92. choice = int(raw) # may raise ValueError
  93. if choice >= 0 and choice < len(arr):
  94. return arr[choice]
  95. except ValueError:
  96. pass
  97. print("Invalid option")
  98. return __choose()
  99. return __choose()