123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- import abc
- import random
- import math
- import functools
- import sys
- from framework.conformance import Conformance, render_conformance_check_result
- from concrete_syntax.common import indent
- from concrete_syntax.textual_od.renderer import render_od
- from transformation.cloner import clone_od
- from api.od import ODAPI
- class DecisionMaker:
- @abc.abstractmethod
- def __call__(self, actions):
- pass
- class Simulator:
- def __init__(self,
- action_generator,
- decision_maker: DecisionMaker,
- termination_condition,
- check_conformance=True,
- verbose=True,
- renderer=lambda od: render_od(od.state, od.m, od.mm),
- ):
- self.action_generator = action_generator
- self.decision_maker = decision_maker
- self.termination_condition = termination_condition
- self.check_conformance = check_conformance
- self.verbose = verbose
- self.renderer = renderer
- def __print(self, *args):
- if self.verbose:
- print(*args)
- # Run simulation until termination condition satisfied
- def run(self, od):
- self.__print("Start simulation")
- self.__print(f"Decision maker: {self.decision_maker}")
- step_counter = 0
- while True:
- self.__print("--------------")
- self.__print(indent(self.renderer(od), 4))
- self.__print("--------------")
- termination_reason = self.termination_condition(od)
- if termination_reason != None:
- self.__print(f"Termination condition satisfied.\nReason: {termination_reason}.")
- break
- actions = self.action_generator(od)
- chosen_action = self.decision_maker(actions)
- if chosen_action == None:
- self.__print(f"No enabled actions.")
- break
- (od, msgs) = chosen_action()
- self.__print(indent('\n'.join(f"▸ {msg}" for msg in msgs), 2))
- step_counter += 1
- if self.check_conformance:
- self.__print()
- conf = Conformance(od.state, od.m, od.mm)
- self.__print(render_conformance_check_result(conf.check_nominal()))
- self.__print(f"Executed {step_counter} steps.")
- return od
- def make_actions_pure(actions, od):
- # Copy model before modifying it
- def exec_pure(action, od):
- cloned_rt_m = clone_od(od.state, od.m, od.mm)
- new_od = ODAPI(od.state, cloned_rt_m, od.mm)
- msgs = action(new_od)
- return (new_od, msgs)
- for descr, action in actions:
- yield (descr, functools.partial(exec_pure, action, od))
- def filter_valid_actions(pure_actions):
- result = {}
- def make_tuple(new_od, msgs):
- return (new_od, msgs)
- for name, callback in pure_actions:
- print(f"attempt '{name}' ...", end='\r')
- (new_od, msgs) = callback()
- conf = Conformance(new_od.state, new_od.m, new_od.mm)
- errors = conf.check_nominal()
- # erase current line:
- print(" ", end='\r')
- if len(errors) == 0:
- # updated RT-M is conform, we have a valid action:
- yield (name, functools.partial(make_tuple, new_od, msgs))
- class RandomDecisionMaker(DecisionMaker):
- def __init__(self, seed=0, verbose=True):
- self.seed = seed
- self.r = random.Random(seed)
- def __str__(self):
- return f"RandomDecisionMaker(seed={self.seed})"
- def __call__(self, actions):
- arr = [action for descr, action in actions]
- i = math.floor(self.r.random()*len(arr))
- return arr[i]
- class InteractiveDecisionMaker(DecisionMaker):
- def __init__(self, msg="Select action:"):
- self.msg = msg
- def __str__(self):
- return f"InteractiveDecisionMaker()"
- def __call__(self, actions):
- arr = []
- for i, (key, result) in enumerate(actions):
- print(f" {i}. {key}")
- arr.append(result)
- if len(arr) == 0:
- return
- def __choose():
- sys.stdout.write(f"{self.msg} ")
- try:
- raw = input()
- choice = int(raw) # may raise ValueError
- if choice >= 0 and choice < len(arr):
- return arr[choice]
- except ValueError:
- pass
- print("Invalid option")
- return __choose()
- return __choose()
|