from abc import ABC, abstractmethod from collections import defaultdict from pystone import pystones from pypdevs.DEVS import AtomicDEVS, CoupledDEVS from pypdevs.infinity import INFINITY from pypdevs.simulator import Simulator from graphviz import Source class DelayedAtomic(AtomicDEVS): def __init__(self, name: str, int_delay: float, ext_delay: float, add_out_port: bool = False, prep_time=0.0, mode:str = ""): super().__init__(name) self.int_delay = int_delay self.ext_delay = ext_delay self.prep_time = prep_time self.i_in = self.addInPort("i_in") if add_out_port: # for HI and HO models self.o_out = self.addOutPort("o_out") # Dynamic Structure extras self.mode = mode self.out_ports = [] # used for dynamic structure model LI2HI def intTransition(self): if self.int_delay: pystones(self.int_delay) return "passive" def timeAdvance(self): if self.state == "active": return self.prep_time else: return INFINITY def outputFnc(self): if hasattr(self, "o_out") and self.o_out is not None: return {self.o_out: [0]} # for DSDEVS LI2HI if len(self.out_ports) == 1 and self.out_ports[0].outline != []: # print(f"{self.name}: should be last call!!") # print("Send to ", self.out_ports[0].outline) return {self.out_ports[0]: [0]} return {} def extTransition(self, inputs): if self.ext_delay: pystones(self.ext_delay) return "active" def modelTransition(self, state): if self.mode == "LI2HI": return self.LI2HI(state) elif self.mode == "HI2LI": return self.HI2LI(state) return False def LI2HI(self, state): if len(self.out_ports) == 0 and not hasattr(self, "o_out"): # every automic dev gets the extra out port # even though the last atomic could miss it see fig # except the atomic model in the coupled model at max depth # which has a output port by default state["LI2HI"] = True state["name"] = self.name self.out_ports.append(self.addOutPort("o_HI")) # standard LI model doesn't have by default # print(f"{self.name}: should be first call!!") return True # send to the parent to connect ports return False def HI2LI(self, state): if hasattr(self, "o_out") and self.o_out is not None: state["HI2LI"] = True state["name"] = self.name self.removePort(self.o_out) self.o_out = None return False class DelayedAtomicStats(DelayedAtomic): def __init__(self, name: str, int_delay: float, ext_delay: float, add_out_port: bool = False, prep_time=0, mode:str = ""): super().__init__(name, int_delay, ext_delay, add_out_port, prep_time, mode) self.int_count = 0 self.ext_count = 0 def intTransition(self): self.int_count += 1 return super().intTransition() def extTransition(self, inputs): self.ext_count += 1 return super().extTransition(inputs) class DEVStoneWrapper(CoupledDEVS, ABC): def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, add_atomic_out_ports: bool = False, prep_time=0, stats=False, mode: str = "", dynamic: bool = False): super().__init__(name) self.depth = depth self.width = width self.int_delay = int_delay self.ext_delay = ext_delay self.prep_time = prep_time self.stats = stats self.add_atomic_out_ports = add_atomic_out_ports self.i_in = self.addInPort("i_in") self.o_out = self.addOutPort("o_out") self.models = [] # stores all models part of the coupled model # added to deal with DS DEVS which don't keep it all in the component_set if depth < 1: raise ValueError("Invalid depth") if width < 1: raise ValueError("Invalid width") if int_delay < 0: raise ValueError("Invalid int_delay") if ext_delay < 0: raise ValueError("Invalid ext_delay") if depth == 1: if self.stats: atomic = DelayedAtomicStats("Atomic_0_0", int_delay, ext_delay, add_out_port=True, prep_time=1.0, mode=mode) else: atomic = DelayedAtomic("Atomic_0_0", int_delay, ext_delay, add_out_port=True, prep_time=1.0, mode=mode) self.models.append(self.addSubModel(atomic)) self.connectPorts(self.i_in, atomic.i_in) self.connectPorts(atomic.o_out, self.o_out) else: coupled = self.gen_coupled() self.addSubModel(coupled) self.connectPorts(self.i_in, coupled.i_in) self.connectPorts(coupled.o_out, self.o_out) if dynamic: return # width will be dynamically generated # + (idx * 1e-6) to address too many transitions at the same time # => problem for Classic DEVS only it seems for idx in range(width - 1): if self.stats: atomic = DelayedAtomicStats("Atomic_%d_%d" % (depth - 1, idx), int_delay, ext_delay, add_out_port=add_atomic_out_ports, prep_time=1.0 + (idx * 1e-6), mode=mode) else: atomic = DelayedAtomic("Atomic_%d_%d" % (depth - 1, idx), int_delay, ext_delay, add_out_port=add_atomic_out_ports, prep_time=1.0 + (idx * 1e-6), mode=mode) self.models.append(self.addSubModel(atomic)) def extend(self, string: str, width: int): length = len(string) return string + " "*(width - length) def __str__(self): result = [self.extend(self.name, 16) + "\n", "-" * 20 + "\n"] first = self.component_set[0] coupled = "" if isinstance(first, CoupledDEVS): result.append("| " + self.extend(first.name, 16) + " |\n") coupled = "\n\n" + first.__str__() for model in self.models: result.append("| " + self.extend(model.name, 16) + " |\n") result.append("-"*20 + "\n") result.append(coupled) return "".join(result) def dot(self, name) -> None: result = ("\n\ndigraph {\n\tlayout=dot;" "\n\tnodesep=0.25;" "\n\tranksep=0.2;" "\n\trankdir=LR;" "\n\tsplines=ortho;\n") result += f"\tCoupled_{self.depth}[shape=box, label=\"C_{self.depth}\"];\n" coupled = self.component_set[0] result += f"\t{coupled.name}[shape=box, label=\"{'C_' + '_'.join(coupled.name.split('_')[1:3])}\"];\n" for model in self.models: result += f"\t{model.name}[shape=ellipse, label=\"{'A_' + '_'.join(model.name.split('_')[1:3])}\"];\n" for connections in self.i_in.outline: result += f"\tCoupled_{self.depth}:i_in -> {str(connections).split('.')[2]}:{connections.name};\n" for model in self.models: if not hasattr(model, "o_out"): continue for connections in model.o_out.outline: result += f"\t{model.name}:o_out -> {str(connections).split('.')[2]}:{connections.name};\n" result += "}\n" src = Source(result) src.view(filename=f'{name}_{self.depth}') @abstractmethod def gen_coupled(self): """ :return a coupled method with i_in and o_out ports""" pass class LI(DEVStoneWrapper): def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False): super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports=False, prep_time=prep_time, stats=stats) for idx in range(1, len(self.component_set)): assert isinstance(self.component_set[idx], AtomicDEVS) self.connectPorts(self.i_in, self.component_set[idx].i_in) def gen_coupled(self): return LI("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay, prep_time=self.prep_time, stats=self.stats) class HI(DEVStoneWrapper): def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False): super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports=True, prep_time=prep_time, stats=stats) if len(self.component_set) > 1: assert isinstance(self.component_set[-1], AtomicDEVS) self.connectPorts(self.i_in, self.component_set[-1].i_in) for idx in range(1, len(self.component_set) - 1): assert isinstance(self.component_set[idx], AtomicDEVS) self.connectPorts(self.component_set[idx].o_out, self.component_set[idx + 1].i_in) self.connectPorts(self.i_in, self.component_set[idx].i_in) def gen_coupled(self): return HI("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay, prep_time=self.prep_time, stats=self.stats) class HO(DEVStoneWrapper): def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False): super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports=True, prep_time=prep_time, stats=stats) self.i_in2 = self.addInPort("i_in2") self.o_out2 = self.addOutPort("o_out2") assert len(self.component_set) > 0 if isinstance(self.component_set[0], CoupledDEVS): self.connectPorts(self.i_in, self.component_set[0].i_in2) if len(self.component_set) > 1: assert isinstance(self.component_set[-1], AtomicDEVS) self.connectPorts(self.i_in2, self.component_set[-1].i_in) self.connectPorts(self.component_set[-1].o_out, self.o_out2) for idx in range(1, len(self.component_set) - 1): assert isinstance(self.component_set[idx], AtomicDEVS) self.connectPorts(self.component_set[idx].o_out, self.component_set[idx + 1].i_in) self.connectPorts(self.i_in2, self.component_set[idx].i_in) self.connectPorts(self.component_set[idx].o_out, self.o_out2) def gen_coupled(self): return HO("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay, prep_time=self.prep_time, stats=self.stats) class HOmod(CoupledDEVS): def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False): super().__init__(name) self.depth = depth self.width = width self.int_delay = int_delay self.ext_delay = ext_delay self.prep_time = prep_time self.stats = stats self.i_in = self.addInPort("i_in") self.i_in2 = self.addInPort("i_in2") self.o_out = self.addOutPort("o_out") if depth < 1: raise ValueError("Invalid depth") if width < 1: raise ValueError("Invalid width") if int_delay < 0: raise ValueError("Invalid int_delay") if ext_delay < 0: raise ValueError("Invalid ext_delay") if depth == 1: if self.stats: atomic = DelayedAtomicStats("Atomic_0_0", int_delay, ext_delay, add_out_port=True, prep_time=prep_time) else: atomic = DelayedAtomic("Atomic_0_0", int_delay, ext_delay, add_out_port=True, prep_time=prep_time) self.addSubModel(atomic) self.connectPorts(self.i_in, atomic.i_in) self.connectPorts(atomic.o_out, self.o_out) else: coupled = HOmod("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay, prep_time=prep_time, stats=stats) self.addSubModel(coupled) self.connectPorts(self.i_in, coupled.i_in) self.connectPorts(coupled.o_out, self.o_out) if width >= 2: atomics = defaultdict(list) # Generate atomic components for i in range(width): min_row_idx = 0 if i < 2 else i - 1 for j in range(min_row_idx, width - 1): if self.stats: atomic = DelayedAtomicStats("Atomic_%d_%d_%d" % (depth - 1, i, j), int_delay, ext_delay, add_out_port=True, prep_time=prep_time) else: atomic = DelayedAtomic("Atomic_%d_%d_%d" % (depth - 1, i, j), int_delay, ext_delay, add_out_port=True, prep_time=prep_time) self.addSubModel(atomic) atomics[i].append(atomic) # Connect EIC for atomic in atomics[0]: self.connectPorts(self.i_in2, atomic.i_in) for i in range(1, width): atomic_set = atomics[i] self.connectPorts(self.i_in2, atomic_set[0].i_in) # Connect IC for atomic in atomics[0]: # First row to coupled component self.connectPorts(atomic.o_out, coupled.i_in2) for i in range(len(atomics[1])): # Second to first rows for j in range(len(atomics[0])): self.connectPorts(atomics[1][i].o_out, atomics[0][j].i_in) for i in range(2, width): # Rest of rows for j in range(len(atomics[i])): self.connectPorts(atomics[i][j].o_out, atomics[i - 1][j + 1].i_in) class LI2HI(DEVStoneWrapper): """ Dynamic DEVStone variant that starts as an LI and (gradually??) transforms into an HI model """ def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False): super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports=False, prep_time=prep_time, stats=stats, mode="LI2HI") for idx in range(1, len(self.component_set)): assert isinstance(self.component_set[idx], AtomicDEVS) self.connectPorts(self.i_in, self.component_set[idx].i_in) def gen_coupled(self): return LI2HI("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay, prep_time=self.prep_time, stats=self.stats) def modelTransition(self, state): if state.get("LI2HI", True): for i in range(len(self.component_set) - 1): # -1 because the last model can't be connected # basically checks if it is an atomic model and if it got the extra output port if hasattr(self.component_set[i], "out_ports") and len(self.component_set[i].out_ports) == 1: # print(f"Connect {self.component_set[i].name} to {self.component_set[i+1].name}") self.connectPorts(self.component_set[i].out_ports[0], self.component_set[i + 1].i_in) return False return False class HI2LI(DEVStoneWrapper): """ Dynamic DEVStone variant that starts as an HI and (gradually??) transforms into an LI model """ def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False): super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports=True, prep_time=prep_time, stats=stats, mode="HI2LI") if len(self.component_set) > 1: assert isinstance(self.component_set[-1], AtomicDEVS) self.connectPorts(self.i_in, self.component_set[-1].i_in) for idx in range(1, len(self.component_set) - 1): assert isinstance(self.component_set[idx], AtomicDEVS) self.connectPorts(self.component_set[idx].o_out, self.component_set[idx + 1].i_in) self.connectPorts(self.i_in, self.component_set[idx].i_in) def gen_coupled(self): return HI2LI("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay, prep_time=self.prep_time, stats=self.stats) class DynamicGenerator(AtomicDEVS): def __init__(self, name: str, period: float = 1.0, repeat: int = 1): super().__init__(name) self.period = period self.repeat = repeat self._counter = 0 self._active = True def intTransition(self): return "passive" # state is unimportant def timeAdvance(self): # stop when counter >= repeat if self._counter >= self.repeat: return INFINITY return self.period def outputFnc(self): return {} def modelTransition(self, state): # every time TA elapses, this will be called on the atomic: # set a flag the parent coupled can read # increment counter and request modelTransition (return True) if self._counter >= self.repeat: return False self._counter += 1 state["generate"] = True return True class dLI(DEVStoneWrapper): """ A LI model which grows in width """ def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False): super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports=False, prep_time=prep_time, stats=stats, mode="LI-Dynamic", dynamic=True) for idx in range(1, len(self.component_set)): assert isinstance(self.component_set[idx], AtomicDEVS) self.connectPorts(self.i_in, self.component_set[idx].i_in) self.current_width = 1 self.max_gen = width - 1 # dynamically generate atomic components until width is reached excludes the coupled model self.generator = self.addSubModel(DynamicGenerator(self.name + "_gen", repeat=self.max_gen)) def gen_coupled(self): return dLI("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay, prep_time=self.prep_time, stats=self.stats) def modelTransition(self, state): if state.get("generate", True) and self.depth > 1: name = "Atomic_%d_%d" % (self.depth - 1, self.current_width) if self.depth > 1 else "Atomic_0_%d" % self.current_width if self.stats: new_atom = DelayedAtomicStats(name, self.int_delay, self.ext_delay, add_out_port=self.add_atomic_out_ports, prep_time=self.prep_time, mode="LI-Dynamic") else: new_atom = DelayedAtomic(name, self.int_delay, self.ext_delay, add_out_port=self.add_atomic_out_ports, prep_time=self.prep_time, mode="LI-Dynamic") self.models.append(self.addSubModel(new_atom)) self.connectPorts(self.i_in, new_atom.i_in) self.current_width += 1 # print(f"{self.name}: added {new_atom.name} (width is now {self.width})") return False return False class dHI(DEVStoneWrapper): """ A HI model which grows in width """ def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False): super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports=True, prep_time=prep_time, stats=stats, mode="HI-Dynamic", dynamic=True) if len(self.component_set) > 1: assert isinstance(self.component_set[-1], AtomicDEVS) self.connectPorts(self.i_in, self.component_set[-1].i_in) for idx in range(1, len(self.component_set) - 1): assert isinstance(self.component_set[idx], AtomicDEVS) self.connectPorts(self.component_set[idx].o_out, self.component_set[idx + 1].i_in) self.connectPorts(self.i_in, self.component_set[idx].i_in) self.current_width = 1 self.max_gen = width - 1 # dynamically generate atomic components until width is reached excludes the coupled model self.generator = self.addSubModel(DynamicGenerator(self.name + "_gen", repeat=self.max_gen)) def gen_coupled(self): return dHI("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay, prep_time=self.prep_time, stats=self.stats) def modelTransition(self, state): if state.get("generate", True) and self.depth > 1: name = "Atomic_%d_%d" % (self.depth - 1, self.current_width) if self.depth > 1 else "Atomic_0_%d" % self.current_width if self.stats: new_atom = DelayedAtomicStats(name, self.int_delay, self.ext_delay, add_out_port=self.add_atomic_out_ports, prep_time=self.prep_time, mode="LI-Dynamic") else: new_atom = DelayedAtomic(name, self.int_delay, self.ext_delay, add_out_port=self.add_atomic_out_ports, prep_time=self.prep_time, mode="LI-Dynamic") self.models.append(self.addSubModel(new_atom)) self.connectPorts(self.i_in, new_atom.i_in) if len(self.models) > 1: prev = self.models[-2] print("last: ", prev.name) if isinstance(prev, AtomicDEVS) and hasattr(prev, "o_out"): self.connectPorts(prev.o_out, new_atom.i_in) self.current_width += 1 return False return False if __name__ == '__main__': import sys sys.setrecursionlimit(10000) root = HOmod("Root", 4, 3, 0, 0) sim = Simulator(root) sim.setVerbose(None) # sim.setTerminationTime(10.0) sim.setStateSaving("custom") sim.simulate()