| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560 |
- from abc import ABC, abstractmethod
- from collections import defaultdict
- from pystone import pystones
- from pypdevs.DEVS import AtomicDEVS, CoupledDEVS
- from pypdevs.infinity import INFINITY
- from pypdevs.simulator import Simulator
- from graphviz import Source
- import time
- class DelayedAtomic(AtomicDEVS):
- def __init__(self, name: str, int_delay: float, ext_delay: float, add_out_port: bool = False, prep_time=0.0, mode: str = ""):
- super().__init__(name)
- self.int_delay = int_delay
- self.ext_delay = ext_delay
- self.prep_time = prep_time
- self.i_in = self.addInPort("i_in")
- if add_out_port: # for HI and HO models
- self.o_out = self.addOutPort("o_out")
- # Dynamic Structure extras
- self.mode = mode
- self.out_ports = [] # used for dynamic structure model LI2HI
- def intTransition(self):
- if self.int_delay:
- pystones(self.int_delay)
- return "passive"
- def timeAdvance(self):
- if self.state == "active":
- return self.prep_time
- else:
- return INFINITY
- def outputFnc(self):
- if hasattr(self, "o_out") and self.o_out is not None:
- return {self.o_out: [0]}
- # for DSDEVS LI2HI
- if len(self.out_ports) == 1 and self.out_ports[0].outline != []:
- # print(f"{self.name}: should be last call!!")
- # print("Send to ", self.out_ports[0].outline)
- return {self.out_ports[0]: [0]}
- return {}
- def extTransition(self, inputs):
- if self.ext_delay:
- pystones(self.ext_delay)
- return "active"
- def modelTransition(self, state):
- result = False
- if self.mode == "LI2HI":
- result = self.LI2HI(state)
- elif self.mode == "HI2LI":
- result = self.HI2LI(state)
- return result
- def LI2HI(self, state):
- if len(self.out_ports) == 0 and not hasattr(self, "o_out"):
- # every automic dev gets the extra out port
- # even though the last atomic could miss it see fig
- # except the atomic model in the coupled model at max depth
- # which has a output port by default
- state["LI2HI"] = True
- state["name"] = self.name
- self.out_ports.append(self.addOutPort("o_HI")) # standard LI model doesn't have by default
- # print(f"{self.name}: should be first call!!")
- return True # send to the parent to connect ports
- return False
- def HI2LI(self, state):
- if hasattr(self, "o_out") and self.o_out is not None:
- state["HI2LI"] = True
- state["name"] = self.name
- self.removePort(self.o_out)
- self.o_out = None
- return False
- class DelayedAtomicStats(DelayedAtomic):
- def __init__(self, name: str, int_delay: float, ext_delay: float, add_out_port: bool = False, prep_time=0, mode:str = ""):
- super().__init__(name, int_delay, ext_delay, add_out_port, prep_time, mode)
- self.int_count = 0
- self.ext_count = 0
- def intTransition(self):
- self.int_count += 1
- return super().intTransition()
- def extTransition(self, inputs):
- self.ext_count += 1
- return super().extTransition(inputs)
- class DEVStoneWrapper(CoupledDEVS, ABC):
- def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float,
- add_atomic_out_ports: bool = False, prep_time=0, stats=False, mode: str = "", dynamic: bool = False):
- super().__init__(name)
- self.depth = depth
- self.width = width
- self.int_delay = int_delay
- self.ext_delay = ext_delay
- self.prep_time = prep_time
- self.stats = stats
- self.add_atomic_out_ports = add_atomic_out_ports
- self.i_in = self.addInPort("i_in")
- self.o_out = self.addOutPort("o_out")
- self.models = [] # stores all models part of the coupled model
- # added to deal with DS DEVS which don't keep it all in the component_set
- if depth < 1:
- raise ValueError("Invalid depth")
- if width < 1:
- raise ValueError("Invalid width")
- if int_delay < 0:
- raise ValueError("Invalid int_delay")
- if ext_delay < 0:
- raise ValueError("Invalid ext_delay")
- if depth == 1:
- if self.stats:
- atomic = DelayedAtomicStats("Atomic_0_0", int_delay, ext_delay, add_out_port=True, prep_time=1.0, mode=mode)
- else:
- atomic = DelayedAtomic("Atomic_0_0", int_delay, ext_delay, add_out_port=True, prep_time=1.0, mode=mode)
- self.models.append(self.addSubModel(atomic))
- self.connectPorts(self.i_in, atomic.i_in)
- self.connectPorts(atomic.o_out, self.o_out)
- else:
- coupled = self.gen_coupled()
- self.addSubModel(coupled)
- self.connectPorts(self.i_in, coupled.i_in)
- self.connectPorts(coupled.o_out, self.o_out)
- if dynamic:
- return # width will be dynamically generated
- # + (idx * 1e-6) to address too many transitions at the same time
- # => problem for Classic DEVS only it seems
- for idx in range(width - 1):
- if self.stats:
- atomic = DelayedAtomicStats("Atomic_%d_%d" % (depth - 1, idx), int_delay, ext_delay,
- add_out_port=add_atomic_out_ports, prep_time=1.0 + (idx * 1e-6), mode=mode)
- else:
- atomic = DelayedAtomic("Atomic_%d_%d" % (depth - 1, idx), int_delay, ext_delay,
- add_out_port=add_atomic_out_ports, prep_time=1.0 + (idx * 1e-6), mode=mode)
- self.models.append(self.addSubModel(atomic))
- def dot(self, name) -> None:
- result = ("\n\ndigraph {\n\tlayout=dot;"
- "\n\tnodesep=0.25;"
- "\n\tranksep=0.2;"
- "\n\trankdir=LR;"
- "\n\tsplines=ortho;\n")
- result += f"\tCoupled_{self.depth}[shape=box, label=\"C_{self.depth}\"];\n"
- coupled = self.component_set[0]
- result += f"\t{coupled.name}[shape=box, label=\"{'C_' + '_'.join(coupled.name.split('_')[1:3])}\"];\n"
- for model in self.models:
- result += f"\t{model.name}[shape=ellipse, label=\"{'A_' + '_'.join(model.name.split('_')[1:3])}\"];\n"
- for connections in self.i_in.outline:
- result += f"\tCoupled_{self.depth}:i_in -> {str(connections).split('.')[-2]}:{connections.name};\n"
- for model in self.models:
- if not hasattr(model, "o_out"):
- continue
- for connections in model.o_out.outline:
- result += f"\t{model.name}:o_out -> {str(connections).split('.')[2]}:{connections.name};\n"
- result += "}\n"
- src = Source(result)
- src.view(filename=f'{name}_{self.depth}')
- @abstractmethod
- def gen_coupled(self):
- """ :return a coupled method with i_in and o_out ports"""
- pass
- class LI(DEVStoneWrapper):
- def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False):
- super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports=False, prep_time=prep_time,
- stats=stats)
- for idx in range(1, len(self.component_set)):
- assert isinstance(self.component_set[idx], AtomicDEVS)
- self.connectPorts(self.i_in, self.component_set[idx].i_in)
- def gen_coupled(self):
- return LI("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay,
- prep_time=self.prep_time, stats=self.stats)
- class HI(DEVStoneWrapper):
- def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False):
- super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports=True, prep_time=prep_time,
- stats=stats)
- if len(self.component_set) > 1:
- assert isinstance(self.component_set[-1], AtomicDEVS)
- self.connectPorts(self.i_in, self.component_set[-1].i_in)
- for idx in range(1, len(self.component_set) - 1):
- assert isinstance(self.component_set[idx], AtomicDEVS)
- self.connectPorts(self.component_set[idx].o_out, self.component_set[idx + 1].i_in)
- self.connectPorts(self.i_in, self.component_set[idx].i_in)
- def gen_coupled(self):
- return HI("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay,
- prep_time=self.prep_time, stats=self.stats)
- class HO(DEVStoneWrapper):
- def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False):
- super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports=True, prep_time=prep_time,
- stats=stats)
- self.i_in2 = self.addInPort("i_in2")
- self.o_out2 = self.addOutPort("o_out2")
- assert len(self.component_set) > 0
- if isinstance(self.component_set[0], CoupledDEVS):
- self.connectPorts(self.i_in, self.component_set[0].i_in2)
- if len(self.component_set) > 1:
- assert isinstance(self.component_set[-1], AtomicDEVS)
- self.connectPorts(self.i_in2, self.component_set[-1].i_in)
- self.connectPorts(self.component_set[-1].o_out, self.o_out2)
- for idx in range(1, len(self.component_set) - 1):
- assert isinstance(self.component_set[idx], AtomicDEVS)
- self.connectPorts(self.component_set[idx].o_out, self.component_set[idx + 1].i_in)
- self.connectPorts(self.i_in2, self.component_set[idx].i_in)
- self.connectPorts(self.component_set[idx].o_out, self.o_out2)
- def gen_coupled(self):
- return HO("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay,
- prep_time=self.prep_time, stats=self.stats)
- class HOmod(CoupledDEVS):
- def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False):
- super().__init__(name)
- self.depth = depth
- self.width = width
- self.int_delay = int_delay
- self.ext_delay = ext_delay
- self.prep_time = prep_time
- self.stats = stats
- self.i_in = self.addInPort("i_in")
- self.i_in2 = self.addInPort("i_in2")
- self.o_out = self.addOutPort("o_out")
- if depth < 1:
- raise ValueError("Invalid depth")
- if width < 1:
- raise ValueError("Invalid width")
- if int_delay < 0:
- raise ValueError("Invalid int_delay")
- if ext_delay < 0:
- raise ValueError("Invalid ext_delay")
- if depth == 1:
- if self.stats:
- atomic = DelayedAtomicStats("Atomic_0_0", int_delay, ext_delay, add_out_port=True, prep_time=prep_time)
- else:
- atomic = DelayedAtomic("Atomic_0_0", int_delay, ext_delay, add_out_port=True, prep_time=prep_time)
- self.addSubModel(atomic)
- self.connectPorts(self.i_in, atomic.i_in)
- self.connectPorts(atomic.o_out, self.o_out)
- else:
- coupled = HOmod("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay, prep_time=prep_time, stats=stats)
- self.addSubModel(coupled)
- self.connectPorts(self.i_in, coupled.i_in)
- self.connectPorts(coupled.o_out, self.o_out)
- if width >= 2:
- atomics = defaultdict(list)
- # Generate atomic components
- for i in range(width):
- min_row_idx = 0 if i < 2 else i - 1
- for j in range(min_row_idx, width - 1):
- if self.stats:
- atomic = DelayedAtomicStats("Atomic_%d_%d_%d" % (depth - 1, i, j), int_delay, ext_delay,
- add_out_port=True, prep_time=prep_time)
- else:
- atomic = DelayedAtomic("Atomic_%d_%d_%d" % (depth - 1, i, j), int_delay, ext_delay,
- add_out_port=True, prep_time=prep_time)
- self.addSubModel(atomic)
- atomics[i].append(atomic)
- # Connect EIC
- for atomic in atomics[0]:
- self.connectPorts(self.i_in2, atomic.i_in)
- for i in range(1, width):
- atomic_set = atomics[i]
- self.connectPorts(self.i_in2, atomic_set[0].i_in)
- # Connect IC
- for atomic in atomics[0]: # First row to coupled component
- self.connectPorts(atomic.o_out, coupled.i_in2)
- for i in range(len(atomics[1])): # Second to first rows
- for j in range(len(atomics[0])):
- self.connectPorts(atomics[1][i].o_out, atomics[0][j].i_in)
- for i in range(2, width): # Rest of rows
- for j in range(len(atomics[i])):
- self.connectPorts(atomics[i][j].o_out, atomics[i - 1][j + 1].i_in)
- class LI2HI(DEVStoneWrapper):
- """
- Dynamic DEVStone variant that starts as an LI
- and (gradually??) transforms into an HI model
- """
- def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False):
- super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports=False, prep_time=prep_time,
- stats=stats, mode="LI2HI")
- for idx in range(1, len(self.component_set)):
- assert isinstance(self.component_set[idx], AtomicDEVS)
- self.connectPorts(self.i_in, self.component_set[idx].i_in)
- def gen_coupled(self):
- return LI2HI("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay,
- prep_time=self.prep_time, stats=self.stats)
- def modelTransition(self, state):
- if state.get("LI2HI", True):
- for i in range(len(self.component_set) - 1): # -1 because the last model can't be connected
- # basically checks if it is an atomic model and if it got the extra output port
- if hasattr(self.component_set[i], "out_ports") and len(self.component_set[i].out_ports) == 1:
- # print(f"Connect {self.component_set[i].name} to {self.component_set[i+1].name}")
- self.connectPorts(self.component_set[i].out_ports[0], self.component_set[i + 1].i_in)
- return False
- return False
- class HI2LI(DEVStoneWrapper):
- """
- Dynamic DEVStone variant that starts as an HI
- and (gradually??) transforms into an LI model
- """
- def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False):
- super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports=True, prep_time=prep_time,
- stats=stats, mode="HI2LI")
- if len(self.component_set) > 1:
- assert isinstance(self.component_set[-1], AtomicDEVS)
- self.connectPorts(self.i_in, self.component_set[-1].i_in)
- for idx in range(1, len(self.component_set) - 1):
- assert isinstance(self.component_set[idx], AtomicDEVS)
- self.connectPorts(self.component_set[idx].o_out, self.component_set[idx + 1].i_in)
- self.connectPorts(self.i_in, self.component_set[idx].i_in)
- def gen_coupled(self):
- return HI2LI("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay,
- prep_time=self.prep_time, stats=self.stats)
- TRANSITION_TIME = 0
- class DynamicGenerator(AtomicDEVS):
- def __init__(self, name: str, period: float = 1, repeat: int = 1):
- super().__init__(name)
- self.period = period
- self.repeat = repeat
- self._counter = 0
- def intTransition(self):
- return "passive" # state is unimportant
- def timeAdvance(self):
- if self._counter >= self.repeat:
- return INFINITY
- return self.period
- def outputFnc(self):
- return {}
- def modelTransition(self, state):
- # every time TA elapses, this will be called on the atomic:
- # set a flag the parent coupled can read
- # increment counter and request modelTransition (return True)
- global TRANSITION_TIME
- start = time.time()
- if self._counter >= self.repeat:
- TRANSITION_TIME += time.time() - start
- return False
- self._counter += 1
- state["generate"] = True
- state["created"] = 0
- TRANSITION_TIME += time.time() - start
- return True
- class dLI(DEVStoneWrapper):
- """
- A LI model which grows in width
- """
- def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, generator_mode: str = "uniform", root=True, prep_time=0, stats=False):
- super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports=False, prep_time=prep_time,
- stats=stats, mode="LI-Dynamic", dynamic=True)
- self.root = root
- for idx in range(1, len(self.component_set)):
- assert isinstance(self.component_set[idx], AtomicDEVS)
- self.connectPorts(self.i_in, self.component_set[idx].i_in)
- self.created = 0
- self.current_width = 1
- self.max_gen = width - 1 # dynamically generate atomic components until width is reached excludes the coupled model
- if self.depth == 1:
- self.generator = self.addSubModel(DynamicGenerator(self.name + "_gen", repeat=self.max_gen))
- def gen_coupled(self):
- return dLI("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay,
- prep_time=self.prep_time, stats=self.stats, root=False)
- def modelTransition(self, state):
- if state.get("generate", True) and self.depth == 1:
- return True
- if state.get("generate", True) and self.depth > 1:
- name = "Atomic_%d_%d" % (self.depth - 1, self.current_width) if self.depth > 1 else "Atomic_0_%d" % self.current_width
- if self.stats:
- new_atom = DelayedAtomicStats(name, self.int_delay, self.ext_delay,
- add_out_port=self.add_atomic_out_ports, prep_time=self.prep_time, mode="LI-Dynamic")
- else:
- new_atom = DelayedAtomic(name, self.int_delay, self.ext_delay,
- add_out_port=self.add_atomic_out_ports, prep_time=self.prep_time, mode="LI-Dynamic")
- self.models.append(self.addSubModel(new_atom))
- self.connectPorts(self.i_in, new_atom.i_in)
- self.current_width += 1
- if self.current_width > self.width:
- raise RuntimeError(f"The width has grown beyond what was specified!"
- f"\n current width: {self.current_width}"
- f"\n specified width: {self.width}")
- # enables visualisation of each level (coupled model)
- # if self.current_width == self.width:
- # self.dot(f"dLI_{self.depth}")
- if not self.root:
- state["created"] += 1
- return True
- self.created += state["created"] + 1
- # print(f"Created {self.created} atomic models")
- return False
- return False
- class dHI(DEVStoneWrapper):
- """
- A HI model which grows in width
- """
- def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, generator_mode: str = "uniform", root=True, prep_time=0, stats=False):
- super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports=True, prep_time=prep_time,
- stats=stats, mode="HI-Dynamic", dynamic=True)
- self.root = root
- if len(self.component_set) > 1:
- assert isinstance(self.component_set[-1], AtomicDEVS)
- self.connectPorts(self.i_in, self.component_set[-1].i_in)
- for idx in range(1, len(self.component_set) - 1):
- assert isinstance(self.component_set[idx], AtomicDEVS)
- self.connectPorts(self.component_set[idx].o_out, self.component_set[idx + 1].i_in)
- self.connectPorts(self.i_in, self.component_set[idx].i_in)
- self.created = 0
- self.current_width = 1
- self.max_gen = width - 1 # dynamically generate atomic components until width is reached excludes the coupled model
- if self.depth == 1:
- self.generator = self.addSubModel(DynamicGenerator(self.name + "_gen", repeat=self.max_gen))
- def gen_coupled(self):
- return dHI("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay,
- prep_time=self.prep_time, stats=self.stats, root=False)
- def modelTransition(self, state):
- start = time.time()
- global TRANSITION_TIME
- if state.get("generate", True) and self.depth == 1:
- TRANSITION_TIME += time.time() - start
- return True
- if state.get("generate", True) and self.depth > 1:
- name = "Atomic_%d_%d" % (self.depth - 1, self.current_width) if self.depth > 1 else "Atomic_0_%d" % self.current_width
- if self.stats:
- new_atom = DelayedAtomicStats(name, self.int_delay, self.ext_delay,
- add_out_port=self.add_atomic_out_ports, prep_time=self.prep_time, mode="LI-Dynamic")
- else:
- new_atom = DelayedAtomic(name, self.int_delay, self.ext_delay,
- add_out_port=self.add_atomic_out_ports, prep_time=self.prep_time, mode="LI-Dynamic")
- self.models.append(self.addSubModel(new_atom))
- self.connectPorts(self.i_in, new_atom.i_in)
- if len(self.models) > 1:
- prev = self.models[-2]
- if isinstance(prev, AtomicDEVS) and hasattr(prev, "o_out"):
- self.connectPorts(prev.o_out, new_atom.i_in)
- self.current_width += 1
- if self.current_width > self.width:
- TRANSITION_TIME += time.time() - start
- raise RuntimeError(f"The width has grown beyond what was specified!"
- f"\n current width: {self.current_width}"
- f"\n specified width: {self.width}")
- # enables visualisation of each level (coupled model)
- # if self.current_width == self.width:
- # self.dot(f"dLI_{self.depth}")
- if not self.root:
- state["created"] += 1
- TRANSITION_TIME += time.time() - start
- return True
- self.created += state["created"] + 1
- # print(f"Created {self.created} atomic models")
- TRANSITION_TIME += time.time() - start
- # print(f"TRANSITION TIME: {TRANSITION_TIME}")
- return False
- return False
- if __name__ == '__main__':
- import sys
- sys.setrecursionlimit(10000)
- root = HOmod("Root", 4, 3, 0, 0)
- sim = Simulator(root)
- sim.setVerbose(None)
- # sim.setTerminationTime(10.0)
- sim.setStateSaving("custom")
- sim.simulate()
|