فهرست منبع

HOmod test for pypdevs

khvilaboa 5 سال پیش
والد
کامیت
ef9647114f
2فایلهای تغییر یافته به همراه86 افزوده شده و 9 حذف شده
  1. 16 6
      devstone/pythonpdevs/devstone.py
  2. 70 3
      devstone/pythonpdevs/test_devstone.py

+ 16 - 6
devstone/pythonpdevs/devstone.py

@@ -183,13 +183,15 @@ class HO(DEVStoneWrapper):
 
 
 class HOmod(CoupledDEVS):
 class HOmod(CoupledDEVS):
 
 
-    def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float):
+    def __init__(self, name: str, depth: int, width: int, int_delay: float, ext_delay: float, prep_time=0, stats=False):
         super().__init__(name)
         super().__init__(name)
 
 
         self.depth = depth
         self.depth = depth
         self.width = width
         self.width = width
         self.int_delay = int_delay
         self.int_delay = int_delay
         self.ext_delay = ext_delay
         self.ext_delay = ext_delay
+        self.prep_time = prep_time
+        self.stats = stats
 
 
         self.i_in = self.addInPort("i_in")
         self.i_in = self.addInPort("i_in")
         self.i_in2 = self.addInPort("i_in2")
         self.i_in2 = self.addInPort("i_in2")
@@ -205,13 +207,16 @@ class HOmod(CoupledDEVS):
             raise ValueError("Invalid ext_delay")
             raise ValueError("Invalid ext_delay")
 
 
         if depth == 1:
         if depth == 1:
-            atomic = DelayedAtomic("Atomic_0_0", int_delay, ext_delay, add_out_port=True)
+            if self.stats:
+                atomic = DelayedAtomicStats("Atomic_0_0", int_delay, ext_delay, add_out_port=True, prep_time=prep_time)
+            else:
+                atomic = DelayedAtomic("Atomic_0_0", int_delay, ext_delay, add_out_port=True, prep_time=prep_time)
             self.addSubModel(atomic)
             self.addSubModel(atomic)
 
 
             self.connectPorts(self.i_in, atomic.i_in)
             self.connectPorts(self.i_in, atomic.i_in)
             self.connectPorts(atomic.o_out, self.o_out)
             self.connectPorts(atomic.o_out, self.o_out)
         else:
         else:
-            coupled = HOmod("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay)
+            coupled = HOmod("Coupled_%d" % (self.depth - 1), self.depth - 1, self.width, self.int_delay, self.ext_delay, prep_time=prep_time, stats=stats)
             self.addSubModel(coupled)
             self.addSubModel(coupled)
             self.connectPorts(self.i_in, coupled.i_in)
             self.connectPorts(self.i_in, coupled.i_in)
             self.connectPorts(coupled.o_out, self.o_out)
             self.connectPorts(coupled.o_out, self.o_out)
@@ -223,8 +228,12 @@ class HOmod(CoupledDEVS):
                 for i in range(width):
                 for i in range(width):
                     min_row_idx = 0 if i < 2 else i - 1
                     min_row_idx = 0 if i < 2 else i - 1
                     for j in range(min_row_idx, width - 1):
                     for j in range(min_row_idx, width - 1):
-                        atomic = DelayedAtomic("Atomic_%d_%d_%d" % (depth - 1, i, j), int_delay, ext_delay,
-                                               add_out_port=True)
+                        if self.stats:
+                            atomic = DelayedAtomicStats("Atomic_%d_%d_%d" % (depth - 1, i, j), int_delay, ext_delay,
+                                                   add_out_port=True, prep_time=prep_time)
+                        else:
+                            atomic = DelayedAtomic("Atomic_%d_%d_%d" % (depth - 1, i, j), int_delay, ext_delay,
+                                                   add_out_port=True, prep_time=prep_time)
                         self.addSubModel(atomic)
                         self.addSubModel(atomic)
                         atomics[i].append(atomic)
                         atomics[i].append(atomic)
 
 
@@ -239,7 +248,8 @@ class HOmod(CoupledDEVS):
                 for atomic in atomics[0]:  # First row to coupled component
                 for atomic in atomics[0]:  # First row to coupled component
                     self.connectPorts(atomic.o_out, coupled.i_in2)
                     self.connectPorts(atomic.o_out, coupled.i_in2)
                 for i in range(len(atomics[1])):  # Second to first rows
                 for i in range(len(atomics[1])):  # Second to first rows
-                    self.connectPorts(atomics[1][i].o_out, atomics[0][i].i_in)
+                    for j in range(len(atomics[0])):
+                        self.connectPorts(atomics[1][i].o_out, atomics[0][j].i_in)
                 for i in range(2, width):  # Rest of rows
                 for i in range(2, width):  # Rest of rows
                     for j in range(len(atomics[i])):
                     for j in range(len(atomics[i])):
                         self.connectPorts(atomics[i][j].o_out, atomics[i - 1][j + 1].i_in)
                         self.connectPorts(atomics[i][j].o_out, atomics[i - 1][j + 1].i_in)

+ 70 - 3
devstone/pythonpdevs/test_devstone.py

@@ -5,7 +5,7 @@ import random
 from pypdevs.DEVS import AtomicDEVS, CoupledDEVS
 from pypdevs.DEVS import AtomicDEVS, CoupledDEVS
 from pypdevs.simulator import Simulator
 from pypdevs.simulator import Simulator
 
 
-from devstone import LI, DelayedAtomic, DelayedAtomicStats, HI, HO
+from devstone import LI, DelayedAtomic, DelayedAtomicStats, HI, HO, HOmod
 from generator import Generator
 from generator import Generator
 from main import DEVStoneEnvironment
 from main import DEVStoneEnvironment
 
 
@@ -260,7 +260,7 @@ class TestHO(DevstoneUtilsTestCase):
         """
         """
         Check behaviour params: number of int and ext transitions.
         Check behaviour params: number of int and ext transitions.
         """
         """
-        for params_tuple in self.valid_low_params:
+        for params_tuple in self.valid_low_params[0]:
             params = dict(zip(("depth", "width", "int_delay", "ext_delay"), params_tuple))
             params = dict(zip(("depth", "width", "int_delay", "ext_delay"), params_tuple))
 
 
             with self.subTest(**params):
             with self.subTest(**params):
@@ -277,4 +277,71 @@ class TestHO(DevstoneUtilsTestCase):
                 self.assertEqual(ext_count, (((params["width"] - 1) * params["width"]) / 2) * (params["depth"] - 1) + 1)
                 self.assertEqual(ext_count, (((params["width"] - 1) * params["width"]) / 2) * (params["depth"] - 1) + 1)
 
 
     def test_invalid_inputs(self):
     def test_invalid_inputs(self):
-        super().check_invalid_inputs(HO)
+        super().check_invalid_inputs(HO)
+
+
+class TestHOmod(DevstoneUtilsTestCase):
+
+    def test_structure(self):
+        """
+        Check structure params: atomic modules, ic's, eic's and eoc's.
+        """
+        for params_tuple in self.valid_high_params:
+            params = dict(zip(("depth", "width", "int_delay", "ext_delay"), params_tuple))
+
+            with self.subTest(**params):
+                self._check_structure(**params)
+
+    def test_structure_corner_cases(self):
+        params = {"depth": 10, "width": 1, "int_delay": 1, "ext_delay": 1}
+        self._check_structure(**params)
+        params["depth"] = 1
+        self._check_structure(**params)
+
+    def _check_structure(self, **params):
+        homod_root = HOmod("HOmod_root", **params)
+        self.assertEqual(Utils.count_atomics(homod_root),
+                         ((params["width"] - 1) + ((params["width"] - 1) * params["width"]) / 2) * (
+                                     params["depth"] - 1) + 1)
+        self.assertEqual(Utils.count_eic(homod_root), (2 * (params["width"] - 1) + 1) * (params["depth"] - 1) + 1)
+        self.assertEqual(Utils.count_eoc(homod_root), params["depth"])
+        # ICs relative to the "triangular" section
+        exp_ic = (((params["width"] - 2) * (params["width"] - 1)) / 2) if params["width"] > 1 else 0
+        # Plus the ones relative to the connection from the 2nd to 1st row...
+        exp_ic += (params["width"] - 1) ** 2
+        # ...and from the 1st to the couple component
+        exp_ic += params["width"] - 1
+        # Multiplied by the number of layers (except the deepest one, that doesn't have ICs)
+        exp_ic *= (params["depth"] - 1)
+        self.assertEqual(Utils.count_ic(homod_root), exp_ic)
+
+    def _test_behavior(self, coord_type):
+        """
+        Check behaviour params: number of int and ext transitions.
+        """
+        for params_tuple in self.valid_low_params:
+            params = dict(zip(("depth", "width", "int_delay", "ext_delay"), params_tuple))
+
+            with self.subTest(**params):
+                homod_root = HOmod("HOmod_root", stats=True, **params)
+                homod_env = DEVStoneEnvironment("HOmod_env", homod_root)
+                sim = Simulator(homod_env)
+                sim.setVerbose(None)
+                # sim.setTerminationTime(10.0)
+                # sim.setStateSaving("custom")
+                sim.simulate()
+
+                calc_in = lambda x, w: 1 + (x - 1) * (w - 1)
+                exp_trans = 1
+                tr_atomics = sum(range(1, params["width"]))
+                for i in range(1, params["depth"]):
+                    num_inputs = calc_in(i, params["width"])
+                    trans_first_row = (params["width"] - 1) * (num_inputs + params["width"] - 1)
+                    exp_trans += num_inputs * tr_atomics + trans_first_row
+
+                int_count, ext_count = Utils.count_transitions(homod_env)
+                self.assertEqual(int_count, exp_trans)
+                self.assertEqual(ext_count, exp_trans)
+
+    def test_invalid_inputs(self):
+        super().check_invalid_inputs(HOmod)