123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- from typing import *
- from dataclasses import *
- from abc import *
- from sccd.util import timer
- from sccd.util.bitmap import *
- from sccd.statechart.static.tree import *
- from sccd.statechart.dynamic.event import *
- @dataclass
- class CacheCounter:
- __slots__ = ["cache_hits", "cache_misses"]
- cache_hits: int
- cache_misses: int
- ctr = CacheCounter(0, 0)
- if timer.TIMINGS:
- import atexit
- def print_stats():
- print("\ncache hits: %s, cache misses: %s" %(ctr.cache_hits, ctr.cache_misses))
- atexit.register(print_stats)
- class GeneratorStrategy(ABC):
- __slots__ = ["priority_ordered_transitions"]
- def __init__(self, priority_ordered_transitions):
- self.priority_ordered_transitions = priority_ordered_transitions
- @abstractmethod
- def cache_init(self):
- pass
- @abstractmethod
- def key(self, execution, events, forbidden_arenas):
- pass
- @abstractmethod
- def generate(self, execution, events, forbidden_arenas):
- pass
- @abstractmethod
- def filter_f(self, execution, events):
- pass
- # First filter on current state, then filter on current events
- class CurrentConfigStrategy(GeneratorStrategy):
- __slots__ = []
- def cache_init(self):
- return {}
- def key(self, execution, events, forbidden_arenas):
- return (execution.configuration, forbidden_arenas)
- def generate(self, execution, events, forbidden_arenas):
- return [ t for t in self.priority_ordered_transitions
- if (t.source.state_id_bitmap & execution.configuration)
- and (not forbidden_arenas & t.arena_bitmap) ]
- def filter_f(self, execution, events):
- return lambda t: (not t.trigger or t.trigger.check(events)) and execution.check_guard(t, events)
- # # First filter based on current events, then filter on current state
- # class EnabledEventsStrategy(GeneratorStrategy):
- # __slots__ = ["statechart"]
- # def __init__(self, priority_ordered_transitions, statechart):
- # super().__init__(priority_ordered_transitions)
- # self.statechart = statechart
- # def cache_init(self):
- # cache = {}
- # cache[(0, 0)] = self.generate(None, 0, 0)
- # for event_id in bm_items(self.statechart.internal_events):
- # events_bitmap = bit(event_id)
- # cache[(events_bitmap, 0)] = self.generate(None, events_bitmap, 0)
- # return cache
- # def key(self, execution, events, forbidden_arenas):
- # return (events_bitmap, forbidden_arenas)
- # def generate(self, execution, events, forbidden_arenas):
- # return [ t for t in self.priority_ordered_transitions
- # if (not t.trigger or t.trigger.check(events))
- # and (not forbidden_arenas & t.arena_bitmap) ]
- # def filter_f(self, execution, events):
- # return lambda t: (execution.configuration & t.source.state_id_bitmap) and execution.check_guard(t, enabled_events)
- # class CurrentConfigAndEnabledEventsStrategy(GeneratorStrategy):
- # __slots__ = ["statechart"]
- # def __init__(self, priority_ordered_transitions, statechart):
- # super().__init__(priority_ordered_transitions)
- # self.statechart = statechart
- # def cache_init(self):
- # return {}
- # def key(self, execution, events, forbidden_arenas):
- # return (execution.configuration, events_bitmap, forbidden_arenas)
- # def generate(self, execution, events, forbidden_arenas):
- # return [ t for t in self.priority_ordered_transitions
- # if (not t.trigger or t.trigger.check(events))
- # and (t.source.state_id_bitmap & execution.configuration)
- # and (not forbidden_arenas & t.arena_bitmap) ]
- # def filter_f(self, execution, events):
- # return lambda t: execution.check_guard(t, enabled_events)
- class CandidateGenerator:
- __slots__ = ["strategy", "cache"]
- def __init__(self, strategy):
- self.strategy = strategy
- self.cache = strategy.cache_init()
- def generate(self, execution, enabled_events: List[InternalEvent], forbidden_arenas: Bitmap) -> List[Transition]:
- with timer.Context("generate candidates"):
- # events_bitmap = bm_from_list(e.id for e in enabled_events)
- key = self.strategy.key(execution, enabled_events, forbidden_arenas)
- try:
- candidates = self.cache[key]
- ctr.cache_hits += 1
- except KeyError:
- candidates = self.cache[key] = self.strategy.generate(execution, enabled_events, forbidden_arenas)
- ctr.cache_misses += 1
- candidates = filter(self.strategy.filter_f(execution, enabled_events), candidates)
- if DEBUG:
- candidates = list(candidates) # convert generator to list (gotta do this, otherwise the generator will be all used up by our debug printing
- if candidates:
- print()
- if enabled_events:
- print("events: " + str(enabled_events))
- print("candidates: " + ", ".join(str(t) for t in candidates))
- candidates = iter(candidates)
- t = next(candidates, None)
- if t:
- return [t]
- else:
- return []
- class ConcurrentCandidateGenerator:
- __slots__ = ["strategy", "cache", "synchronous"]
- def __init__(self, strategy, synchronous):
- self.strategy = strategy
- self.cache = strategy.cache_init()
- self.synchronous = synchronous
- def generate(self, execution, enabled_events: List[InternalEvent], forbidden_arenas: Bitmap) -> List[Transition]:
- with timer.Context("generate candidates"):
- events = list(enabled_events)
- transitions = []
- while True:
- key = self.strategy.key(execution, events, forbidden_arenas)
- try:
- candidates = self.cache[key]
- ctr.cache_hits += 1
- except KeyError:
- candidates = self.cache[key] = self.strategy.generate(execution, events, forbidden_arenas)
- ctr.cache_misses += 1
- candidates = filter(self.strategy.filter_f(execution, events), candidates)
- t = next(candidates, None)
- if t:
- transitions.append(t)
- else:
- break
- events.extend(InternalEvent(name=event_name, params=[]) for event_name in t.raised_events)
- forbidden_arenas |= t.arena_bitmap
- return transitions
|