from abc import ABC, abstractmethod from collections import defaultdict from pystone import pystones from pypdevs.DEVS import AtomicDEVS, CoupledDEVS from pypdevs.infinity import INFINITY from pypdevs.simulator import Simulator from graphviz import Source import time class DelayedAtomic(AtomicDEVS): def __init__(self, name: str, int_delay: float, ext_delay: float, add_out_port: bool = False, prep_time=0.0, mode: str = ""): super().__init__(name) self.int_delay = int_delay self.ext_delay = ext_delay self.prep_time = prep_time self.i_in = self.addInPort("i_in") if add_out_port: # for HI and HO models self.o_out = self.addOutPort("o_out") # Dynamic Structure extras self.mode = mode self.out_ports = [] # used for dynamic structure model LI2HI def intTransition(self): if self.int_delay: pystones(self.int_delay) return "passive" def timeAdvance(self): if self.state == "active": return self.prep_time else: return INFINITY def outputFnc(self): if hasattr(self, "o_out") and self.o_out is not None: return {self.o_out: [0]} # for DSDEVS LI2HI if len(self.out_ports) == 1 and self.out_ports[0].outline != []: return {self.out_ports[0]: [0]} return {} def extTransition(self, inputs): if self.ext_delay: pystones(self.ext_delay) return "active" def modelTransition(self, state): result = False if self.mode == "LI2HI": result = self.LI2HI(state) elif self.mode == "HI2LI": result = self.HI2LI(state) return result def LI2HI(self, state): if len(self.out_ports) == 0 and not hasattr(self, "o_out"): # every automic dev gets the extra out port # even though the last atomic could miss it see fig # except the atomic model in the coupled model at max depth # which has a output port by default state["LI2HI"] = True state["name"] = self.name self.out_ports.append(self.addOutPort("o_HI")) # standard LI model doesn't have by default # print(f"{self.name}: should be first call!!") return True # send to the parent to connect ports return False def HI2LI(self, state): if hasattr(self, "o_out") and self.o_out is not None: state["HI2LI"] = True state["name"] = self.name self.removePort(self.o_out) self.o_out = None return False class DelayedAtomicStats(DelayedAtomic): def __init__(self, name: str, int_delay: float, ext_delay: float, add_out_port: bool = False, prep_time=0, mode:str = ""): super().__init__(name, int_delay, ext_delay, add_out_port, prep_time, mode) self.int_count = 0 self.ext_count = 0 def intTransition(self): self.int_count += 1 return super().intTransition() def extTransition(self, inputs): self.ext_count += 1 return super().extTransition(inputs) class DEVStoneWrapper(CoupledDEVS, ABC): def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, add_atomic_out_ports: bool = False, prep_time=0, stats=False, mode: str = "", dynamic: bool = False): super().__init__(name) self.depth = depth self.width = width self.int_delay = int_delay self.ext_delay = ext_delay self.prep_time = prep_time self.stats = stats self.add_atomic_out_ports = add_atomic_out_ports self.i_in = self.addInPort("i_in") self.o_out = self.addOutPort("o_out") self.models = [] # stores all models part of the coupled model # added to deal with DS DEVS which don't keep it all in the component_set if depth < 1: raise ValueError("Invalid depth") if width < 1: raise ValueError("Invalid width") if int_delay < 0: raise ValueError("Invalid int_delay") if ext_delay < 0: raise ValueError("Invalid ext_delay") if depth == 1: if self.stats: atomic = DelayedAtomicStats("Atomic_0_0", int_delay, ext_delay, add_out_port=True, prep_time=1.0, mode=mode) else: atomic = DelayedAtomic("Atomic_0_0", int_delay, ext_delay, add_out_port=True, prep_time=1.0, mode=mode) self.models.append(self.addSubModel(atomic)) self.connectPorts(self.i_in, atomic.i_in) self.connectPorts(atomic.o_out, self.o_out) else: coupled = self.gen_coupled() self.addSubModel(coupled) self.connectPorts(self.i_in, coupled.i_in) self.connectPorts(coupled.o_out, self.o_out) if dynamic: return # width will be dynamically generated # + (idx * 1e-6) to address too many transitions at the same time # => problem for Classic DEVS only it seems for idx in range(width - 1): if self.stats: atomic = DelayedAtomicStats("Atomic_%d_%d" % (depth - 1, idx), int_delay, ext_delay, add_out_port=add_atomic_out_ports, prep_time=1.0 + (idx * 1e-6), mode=mode) else: atomic = DelayedAtomic("Atomic_%d_%d" % (depth - 1, idx), int_delay, ext_delay, add_out_port=add_atomic_out_ports, prep_time=1.0 + (idx * 1e-6), mode=mode) self.models.append(self.addSubModel(atomic)) def dot(self, name) -> None: result = ("\n\ndigraph {\n\tlayout=dot;" "\n\tnodesep=0.25;" "\n\tranksep=0.2;" "\n\trankdir=LR;" "\n\tsplines=ortho;\n") result += f"\tCoupled_{self.depth}[shape=box, label=\"C_{self.depth}\"];\n" coupled = self.component_set[0] result += f"\t{coupled.name}[shape=box, label=\"{'C_' + '_'.join(coupled.name.split('_')[1:3])}\"];\n" for model in self.models: result += f"\t{model.name}[shape=ellipse, label=\"{'A_' + '_'.join(model.name.split('_')[1:3])}\"];\n" for connections in self.i_in.outline: result += f"\tCoupled_{self.depth}:i_in -> {str(connections).split('.')[-2]}:{connections.name};\n" for model in self.models: if not hasattr(model, "o_out"): continue for connections in model.o_out.outline: result += f"\t{model.name}:o_out -> {str(connections).split('.')[2]}:{connections.name};\n" result += "}\n" src = Source(result) src.view(filename=f'{name}_{self.depth}') @abstractmethod def gen_coupled(self): """ :return a coupled method with i_in and o_out ports""" pass class LI(DEVStoneWrapper): def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False): super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports=False, prep_time=prep_time, stats=stats) for idx in range(1, len(self.component_set)): assert isinstance(self.component_set[idx], AtomicDEVS) self.connectPorts(self.i_in, self.component_set[idx].i_in) def gen_coupled(self): return LI("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay, prep_time=self.prep_time, stats=self.stats) class HI(DEVStoneWrapper): def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False): super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports=True, prep_time=prep_time, stats=stats) if len(self.component_set) > 1: assert isinstance(self.component_set[-1], AtomicDEVS) self.connectPorts(self.i_in, self.component_set[-1].i_in) for idx in range(1, len(self.component_set) - 1): assert isinstance(self.component_set[idx], AtomicDEVS) self.connectPorts(self.component_set[idx].o_out, self.component_set[idx + 1].i_in) self.connectPorts(self.i_in, self.component_set[idx].i_in) def gen_coupled(self): return HI("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay, prep_time=self.prep_time, stats=self.stats) class HO(DEVStoneWrapper): def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False): super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports=True, prep_time=prep_time, stats=stats) self.i_in2 = self.addInPort("i_in2") self.o_out2 = self.addOutPort("o_out2") assert len(self.component_set) > 0 if isinstance(self.component_set[0], CoupledDEVS): self.connectPorts(self.i_in, self.component_set[0].i_in2) if len(self.component_set) > 1: assert isinstance(self.component_set[-1], AtomicDEVS) self.connectPorts(self.i_in2, self.component_set[-1].i_in) self.connectPorts(self.component_set[-1].o_out, self.o_out2) for idx in range(1, len(self.component_set) - 1): assert isinstance(self.component_set[idx], AtomicDEVS) self.connectPorts(self.component_set[idx].o_out, self.component_set[idx + 1].i_in) self.connectPorts(self.i_in2, self.component_set[idx].i_in) self.connectPorts(self.component_set[idx].o_out, self.o_out2) def gen_coupled(self): return HO("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay, prep_time=self.prep_time, stats=self.stats) class HOmod(CoupledDEVS): def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False): super().__init__(name) self.depth = depth self.width = width self.int_delay = int_delay self.ext_delay = ext_delay self.prep_time = prep_time self.stats = stats self.i_in = self.addInPort("i_in") self.i_in2 = self.addInPort("i_in2") self.o_out = self.addOutPort("o_out") if depth < 1: raise ValueError("Invalid depth") if width < 1: raise ValueError("Invalid width") if int_delay < 0: raise ValueError("Invalid int_delay") if ext_delay < 0: raise ValueError("Invalid ext_delay") if depth == 1: if self.stats: atomic = DelayedAtomicStats("Atomic_0_0", int_delay, ext_delay, add_out_port=True, prep_time=prep_time) else: atomic = DelayedAtomic("Atomic_0_0", int_delay, ext_delay, add_out_port=True, prep_time=prep_time) self.addSubModel(atomic) self.connectPorts(self.i_in, atomic.i_in) self.connectPorts(atomic.o_out, self.o_out) else: coupled = HOmod("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay, prep_time=prep_time, stats=stats) self.addSubModel(coupled) self.connectPorts(self.i_in, coupled.i_in) self.connectPorts(coupled.o_out, self.o_out) if width >= 2: atomics = defaultdict(list) # Generate atomic components for i in range(width): min_row_idx = 0 if i < 2 else i - 1 for j in range(min_row_idx, width - 1): if self.stats: atomic = DelayedAtomicStats("Atomic_%d_%d_%d" % (depth - 1, i, j), int_delay, ext_delay, add_out_port=True, prep_time=prep_time) else: atomic = DelayedAtomic("Atomic_%d_%d_%d" % (depth - 1, i, j), int_delay, ext_delay, add_out_port=True, prep_time=prep_time) self.addSubModel(atomic) atomics[i].append(atomic) # Connect EIC for atomic in atomics[0]: self.connectPorts(self.i_in2, atomic.i_in) for i in range(1, width): atomic_set = atomics[i] self.connectPorts(self.i_in2, atomic_set[0].i_in) # Connect IC for atomic in atomics[0]: # First row to coupled component self.connectPorts(atomic.o_out, coupled.i_in2) for i in range(len(atomics[1])): # Second to first rows for j in range(len(atomics[0])): self.connectPorts(atomics[1][i].o_out, atomics[0][j].i_in) for i in range(2, width): # Rest of rows for j in range(len(atomics[i])): self.connectPorts(atomics[i][j].o_out, atomics[i - 1][j + 1].i_in) class LI2HI(DEVStoneWrapper): """ Dynamic DEVStone variant that starts as an LI and (gradually??) transforms into an HI model """ def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False): super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports=False, prep_time=prep_time, stats=stats, mode="LI2HI") for idx in range(1, len(self.component_set)): assert isinstance(self.component_set[idx], AtomicDEVS) self.connectPorts(self.i_in, self.component_set[idx].i_in) def gen_coupled(self): return LI2HI("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay, prep_time=self.prep_time, stats=self.stats) def modelTransition(self, state): if state.get("LI2HI", True): for i in range(len(self.component_set) - 1): # -1 because the last model can't be connected # basically checks if it is an atomic model and if it got the extra output port if hasattr(self.component_set[i], "out_ports") and len(self.component_set[i].out_ports) == 1: # print(f"Connect {self.component_set[i].name} to {self.component_set[i+1].name}") self.connectPorts(self.component_set[i].out_ports[0], self.component_set[i + 1].i_in) return False return False class HI2LI(DEVStoneWrapper): """ Dynamic DEVStone variant that starts as an HI and (gradually??) transforms into an LI model """ def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False): super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports=True, prep_time=prep_time, stats=stats, mode="HI2LI") if len(self.component_set) > 1: assert isinstance(self.component_set[-1], AtomicDEVS) self.connectPorts(self.i_in, self.component_set[-1].i_in) for idx in range(1, len(self.component_set) - 1): assert isinstance(self.component_set[idx], AtomicDEVS) self.connectPorts(self.component_set[idx].o_out, self.component_set[idx + 1].i_in) self.connectPorts(self.i_in, self.component_set[idx].i_in) def gen_coupled(self): return HI2LI("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay, prep_time=self.prep_time, stats=self.stats) class DynamicGenerator(AtomicDEVS): def __init__(self, name: str, period: float = 1, repeat: int = 1, num: int = 1): super().__init__(name) self.period = period self.repeat = repeat self.number = num # how many models need to be generated per cycle self._counter = 0 def intTransition(self): return "passive" # state is unimportant def timeAdvance(self): if self._counter >= self.repeat: return INFINITY return self.period def outputFnc(self): return {} def modelTransition(self, state): # every time TA elapses, this will be called on the atomic: # set a flag the parent coupled can read # increment counter and request modelTransition (return True) if self._counter >= self.repeat: return False remaining = self.repeat - self._counter num2create = self.number if remaining < self.number: num2create = remaining self._counter += num2create state["generate"] = True state["create"] = num2create state["created"] = 0 return True class DynamicDEVStoneWrapper(DEVStoneWrapper, ABC): def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, add_atomic_out_ports: bool = False, /, prep_time=0, stats=False, root=False, mode: str = "", number: int = 1, gen_mode: str="uniform"): if number <= 0: raise ValueError("number of models to be generated must be greater than zero") self.root = root self.mode = mode self.created = 0 # keeps track of how many atomic models have been created self.current_width = 1 self.number = number self.max_gen = width - 1 # dynamically generate atomic components until width is reached excludes the coupled model super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports, prep_time=prep_time, stats=stats, mode=mode, dynamic=True) if self.depth == 1: self.generator = self.addSubModel(DynamicGenerator(self.name + "_gen", repeat=self.max_gen, num=self.number)) def createAtomic(self): name = f"Atomic_0_{self.current_width}" if self.depth > 1: name = f"Atomic_{self.depth-1}_{self.current_width}" if self.stats: atomic = DelayedAtomicStats(name, self.int_delay, self.ext_delay, add_out_port=self.add_atomic_out_ports, prep_time=self.prep_time, mode=self.mode) else: atomic = DelayedAtomic(name, self.int_delay, self.ext_delay, add_out_port=self.add_atomic_out_ports, prep_time=self.prep_time, mode=self.mode) self.models.append(self.addSubModel(atomic)) return atomic def modelTransition(self, state): if state.get("generate", True) and self.depth == 1: return True if state.get("generate", True) and self.depth > 1: number = state.get("create") for _ in range(number): atom = self.createAtomic() self.connectPorts(self.i_in, atom.i_in) if len(self.models) > 1: prev = self.models[-2] # new model was already added so the previous one isn't the last in the list anymore if isinstance(prev, AtomicDEVS) and hasattr(prev, "o_out"): self.connectPorts(prev.o_out, atom.i_in) self.current_width += 1 if self.current_width > self.width: raise RuntimeError(f"The width has grown beyond what was specified!" f"\n current width: {self.current_width}" f"\n specified width: {self.width}") # enables visualisation of each level (coupled model) # if self.current_width == self.width: # self.dot(f"dLI_{self.depth}") if not self.root: state["created"] += number return True self.created += state["created"] + number return False return False class dLI(DynamicDEVStoneWrapper): """ A LI model which grows in width """ def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, /, prep_time=0, stats=False, root=True, number: int = 1, gen_mode: str = "uniform"): super().__init__(name, depth, width, int_delay, ext_delay, False, prep_time, stats, mode="LI-Dynamic", root=root, number=number, gen_mode=gen_mode) for idx in range(1, len(self.models)): assert isinstance(self.component_set[idx], AtomicDEVS) self.connectPorts(self.i_in, self.component_set[idx].i_in) def gen_coupled(self): return dLI("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay, prep_time=self.prep_time, stats=self.stats, root=False, number=self.number) class dHI(DynamicDEVStoneWrapper): """ A HI model which grows in width """ def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, /, prep_time=0, stats=False, root=True, number: int = 1, gen_mode: str = "uniform"): super().__init__(name, depth, width, int_delay, ext_delay, True, prep_time, stats, mode="HI-Dynamic", root=root, number=number, gen_mode=gen_mode) if len(self.models) > 1: assert isinstance(self.component_set[-1], AtomicDEVS) self.connectPorts(self.i_in, self.component_set[-1].i_in) for idx in range(1, len(self.models) - 1): assert isinstance(self.component_set[idx], AtomicDEVS) self.connectPorts(self.component_set[idx].o_out, self.component_set[idx + 1].i_in) self.connectPorts(self.i_in, self.component_set[idx].i_in) def gen_coupled(self): return dHI("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay, prep_time=self.prep_time, stats=self.stats, root=False, number=self.number) if __name__ == '__main__': import sys sys.setrecursionlimit(10000) root = HOmod("Root", 4, 3, 0, 0) sim = Simulator(root) sim.setVerbose(None) # sim.setTerminationTime(10.0) sim.setStateSaving("custom") sim.simulate()