| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- from abc import ABC, abstractmethod
- from collections import defaultdict
- from pystone import pystones
- from pypdevs.DEVS import AtomicDEVS, CoupledDEVS
- from pypdevs.infinity import INFINITY
- from pypdevs.simulator import Simulator
- class DelayedAtomic(AtomicDEVS):
- def __init__(self, name: str, int_delay: float, ext_delay: float, add_out_port: bool = False, prep_time=0):
- super().__init__(name)
- self.int_delay = int_delay
- self.ext_delay = ext_delay
- self.prep_time = prep_time
- self.i_in = self.addInPort("i_in")
- if add_out_port:
- self.o_out = self.addOutPort("o_out")
- # used for dynamic structure models
- self.in_ports = []
- self.out_ports = []
- def intTransition(self):
- if self.int_delay:
- pystones(self.int_delay)
- return "passive"
- def timeAdvance(self):
- if self.state == "active":
- return self.prep_time
- else:
- return INFINITY
- def outputFnc(self):
- if hasattr(self, "o_out"):
- return {self.o_out: [0]}
- return {}
- def extTransition(self, inputs):
- if self.ext_delay:
- pystones(self.ext_delay)
- return "active"
- def modelTransition(self, state):
- if state["LI2HI"]:
- if self.name.split("_")[2] != "1":
- self.in_ports.append(self.addInPort("i_HI"))
- if self.name.split("_")[2] != "2":
- self.out_ports.append(self.addOutPort("o_HI"))
- return True # send to the parent to connect ports
- else:
- return False
- class DelayedAtomicStats(DelayedAtomic):
- def __init__(self, name: str, int_delay: float, ext_delay: float, add_out_port: bool = False, prep_time=0):
- super().__init__(name, int_delay, ext_delay, add_out_port, prep_time)
- self.int_count = 0
- self.ext_count = 0
- def intTransition(self):
- self.int_count += 1
- return super().intTransition()
- def extTransition(self, inputs):
- self.ext_count += 1
- return super().extTransition(inputs)
- class DEVStoneWrapper(CoupledDEVS, ABC):
- def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float,
- add_atomic_out_ports: bool = False, prep_time=0, stats=False):
- super().__init__(name)
- self.depth = depth
- self.width = width
- self.int_delay = int_delay
- self.ext_delay = ext_delay
- self.prep_time = prep_time
- self.stats = stats
- self.add_atomic_out_ports = add_atomic_out_ports
- self.i_in = self.addInPort("i_in")
- self.o_out = self.addOutPort("o_out")
- if depth < 1:
- raise ValueError("Invalid depth")
- if width < 1:
- raise ValueError("Invalid width")
- if int_delay < 0:
- raise ValueError("Invalid int_delay")
- if ext_delay < 0:
- raise ValueError("Invalid ext_delay")
- if depth == 1:
- if self.stats:
- atomic = DelayedAtomicStats("Atomic_0_0", int_delay, ext_delay, add_out_port=True, prep_time=prep_time)
- else:
- atomic = DelayedAtomic("Atomic_0_0", int_delay, ext_delay, add_out_port=True, prep_time=prep_time)
- self.addSubModel(atomic)
- self.connectPorts(self.i_in, atomic.i_in)
- self.connectPorts(atomic.o_out, self.o_out)
- else:
- coupled = self.gen_coupled()
- self.addSubModel(coupled)
- self.connectPorts(self.i_in, coupled.i_in)
- self.connectPorts(coupled.o_out, self.o_out)
- for idx in range(width - 1):
- if self.stats:
- atomic = DelayedAtomicStats("Atomic_%d_%d" % (depth - 1, idx), int_delay, ext_delay,
- add_out_port=add_atomic_out_ports, prep_time=prep_time)
- else:
- atomic = DelayedAtomic("Atomic_%d_%d" % (depth - 1, idx), int_delay, ext_delay,
- add_out_port=add_atomic_out_ports, prep_time=prep_time)
- self.addSubModel(atomic)
- @abstractmethod
- def gen_coupled(self):
- """ :return a coupled method with i_in and o_out ports"""
- pass
- class LI(DEVStoneWrapper):
- def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False):
- super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports=False, prep_time=prep_time,
- stats=stats)
- for idx in range(1, len(self.component_set)):
- assert isinstance(self.component_set[idx], AtomicDEVS)
- self.connectPorts(self.i_in, self.component_set[idx].i_in)
- def gen_coupled(self):
- return LI("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay,
- prep_time=self.prep_time, stats=self.stats)
- class HI(DEVStoneWrapper):
- def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False):
- super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports=True, prep_time=prep_time,
- stats=stats)
- if len(self.component_set) > 1:
- assert isinstance(self.component_set[-1], AtomicDEVS)
- self.connectPorts(self.i_in, self.component_set[-1].i_in)
- for idx in range(1, len(self.component_set) - 1):
- assert isinstance(self.component_set[idx], AtomicDEVS)
- self.connectPorts(self.component_set[idx].o_out, self.component_set[idx + 1].i_in)
- self.connectPorts(self.i_in, self.component_set[idx].i_in)
- def gen_coupled(self):
- return HI("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay,
- prep_time=self.prep_time, stats=self.stats)
- class HO(DEVStoneWrapper):
- def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False):
- super().__init__(name, depth, width, int_delay, ext_delay, add_atomic_out_ports=True, prep_time=prep_time,
- stats=stats)
- self.i_in2 = self.addInPort("i_in2")
- self.o_out2 = self.addOutPort("o_out2")
- assert len(self.component_set) > 0
- if isinstance(self.component_set[0], CoupledDEVS):
- self.connectPorts(self.i_in, self.component_set[0].i_in2)
- if len(self.component_set) > 1:
- assert isinstance(self.component_set[-1], AtomicDEVS)
- self.connectPorts(self.i_in2, self.component_set[-1].i_in)
- self.connectPorts(self.component_set[-1].o_out, self.o_out2)
- for idx in range(1, len(self.component_set) - 1):
- assert isinstance(self.component_set[idx], AtomicDEVS)
- self.connectPorts(self.component_set[idx].o_out, self.component_set[idx + 1].i_in)
- self.connectPorts(self.i_in2, self.component_set[idx].i_in)
- self.connectPorts(self.component_set[idx].o_out, self.o_out2)
- def gen_coupled(self):
- return HO("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay,
- prep_time=self.prep_time, stats=self.stats)
- class HOmod(CoupledDEVS):
- def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False):
- super().__init__(name)
- self.depth = depth
- self.width = width
- self.int_delay = int_delay
- self.ext_delay = ext_delay
- self.prep_time = prep_time
- self.stats = stats
- self.i_in = self.addInPort("i_in")
- self.i_in2 = self.addInPort("i_in2")
- self.o_out = self.addOutPort("o_out")
- if depth < 1:
- raise ValueError("Invalid depth")
- if width < 1:
- raise ValueError("Invalid width")
- if int_delay < 0:
- raise ValueError("Invalid int_delay")
- if ext_delay < 0:
- raise ValueError("Invalid ext_delay")
- if depth == 1:
- if self.stats:
- atomic = DelayedAtomicStats("Atomic_0_0", int_delay, ext_delay, add_out_port=True, prep_time=prep_time)
- else:
- atomic = DelayedAtomic("Atomic_0_0", int_delay, ext_delay, add_out_port=True, prep_time=prep_time)
- self.addSubModel(atomic)
- self.connectPorts(self.i_in, atomic.i_in)
- self.connectPorts(atomic.o_out, self.o_out)
- else:
- coupled = HOmod("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay, prep_time=prep_time, stats=stats)
- self.addSubModel(coupled)
- self.connectPorts(self.i_in, coupled.i_in)
- self.connectPorts(coupled.o_out, self.o_out)
- if width >= 2:
- atomics = defaultdict(list)
- # Generate atomic components
- for i in range(width):
- min_row_idx = 0 if i < 2 else i - 1
- for j in range(min_row_idx, width - 1):
- if self.stats:
- atomic = DelayedAtomicStats("Atomic_%d_%d_%d" % (depth - 1, i, j), int_delay, ext_delay,
- add_out_port=True, prep_time=prep_time)
- else:
- atomic = DelayedAtomic("Atomic_%d_%d_%d" % (depth - 1, i, j), int_delay, ext_delay,
- add_out_port=True, prep_time=prep_time)
- self.addSubModel(atomic)
- atomics[i].append(atomic)
- # Connect EIC
- for atomic in atomics[0]:
- self.connectPorts(self.i_in2, atomic.i_in)
- for i in range(1, width):
- atomic_set = atomics[i]
- self.connectPorts(self.i_in2, atomic_set[0].i_in)
- # Connect IC
- for atomic in atomics[0]: # First row to coupled component
- self.connectPorts(atomic.o_out, coupled.i_in2)
- for i in range(len(atomics[1])): # Second to first rows
- for j in range(len(atomics[0])):
- self.connectPorts(atomics[1][i].o_out, atomics[0][j].i_in)
- for i in range(2, width): # Rest of rows
- for j in range(len(atomics[i])):
- self.connectPorts(atomics[i][j].o_out, atomics[i - 1][j + 1].i_in)
- class LI2HI(DEVStoneWrapper):
- """
- Dynamic DEVStone variant that starts as an LI (loosely interconnected)
- and gradually transforms into an HI (highly interconnected) model
- by adding connections during internal transitions.
- """
- def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float,
- prep_time=0, stats=False):
- super().__init__(name, depth, width, int_delay, ext_delay,
- add_atomic_out_ports=True, prep_time=prep_time, stats=stats)
- # transformation state tracking
- self.converted_connections = 0
- self.total_connections = max(0, len(self.component_set) - 2)
- # initially LI-like connections
- for idx in range(1, len(self.component_set)):
- assert isinstance(self.component_set[idx], AtomicDEVS)
- self.connectPorts(self.i_in, self.component_set[idx].i_in)
- # output connection
- if len(self.component_set) > 1:
- self.connectPorts(self.component_set[-1].o_out, self.o_out)
- def gen_coupled(self):
- return LI2HI("Coupled_%d" % (self.depth - 1),
- self.depth - 1, self.width, self.int_delay, self.ext_delay,
- prep_time=self.prep_time, stats=self.stats)
- def modelTransition(self, state):
- """
- Gradually adds HI-style interconnections between atomics.
- Called automatically during internal transitions.
- """
- print("CHANGE")
- # Each step adds one connection
- if self.converted_connections < self.total_connections:
- src_idx = self.converted_connections + 1
- if src_idx < len(self.component_set) - 1:
- src = self.component_set[src_idx]
- dst = self.component_set[src_idx + 1]
- self.connectPorts(src.o_out, dst.i_in)
- self.converted_connections += 1
- return False
- if __name__ == '__main__':
- import sys
- sys.setrecursionlimit(10000)
- root = HOmod("Root", 4, 3, 0, 0)
- sim = Simulator(root)
- sim.setVerbose(None)
- # sim.setTerminationTime(10.0)
- sim.setStateSaving("custom")
- sim.simulate()
|