test_devstone.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. from abc import abstractmethod
  2. from unittest import TestCase
  3. import random
  4. from pypdevs.DEVS import AtomicDEVS, CoupledDEVS
  5. from pypdevs.simulator import Simulator
  6. from devstone import LI, DelayedAtomic, DelayedAtomicStats, HI, HO, HOmod
  7. from generator import Generator
  8. from main import DEVStoneEnvironment
  9. class Utils:
  10. @staticmethod
  11. def count_atomics(coupled):
  12. """
  13. :return: Number of atomic components in a coupled model
  14. """
  15. atomic_count = 0
  16. for comp in coupled.component_set:
  17. if isinstance(comp, AtomicDEVS):
  18. atomic_count += 1
  19. elif isinstance(comp, CoupledDEVS):
  20. atomic_count += Utils.count_atomics(comp)
  21. else:
  22. raise RuntimeError("Unrecognized type of component")
  23. return atomic_count
  24. @staticmethod
  25. def count_ic(coupled):
  26. """
  27. :return: Number of ic couplings in a coupled model
  28. """
  29. ic_count = 0
  30. for comp in coupled.component_set:
  31. for src_port in comp.OPorts:
  32. for dst_port in src_port.outline:
  33. if dst_port.host_DEVS != coupled:
  34. ic_count += 1
  35. for comp in coupled.component_set:
  36. if isinstance(comp, CoupledDEVS):
  37. ic_count += Utils.count_ic(comp)
  38. elif not isinstance(comp, AtomicDEVS):
  39. raise RuntimeError("Unrecognized type of component")
  40. return ic_count
  41. @staticmethod
  42. def count_eic(coupled):
  43. """
  44. :return: Number of eic couplings in a coupled model
  45. """
  46. eic_count = sum([len(port.outline) for port in coupled.IPorts])
  47. for comp in coupled.component_set:
  48. if isinstance(comp, CoupledDEVS):
  49. eic_count += Utils.count_eic(comp)
  50. elif not isinstance(comp, AtomicDEVS):
  51. raise RuntimeError("Unrecognized type of component")
  52. return eic_count
  53. @staticmethod
  54. def count_eoc(coupled):
  55. """
  56. :return: Number of eoc couplings in a coupled model
  57. """
  58. eoc_count = sum([len(port.inline) for port in coupled.OPorts])
  59. for comp in coupled.component_set:
  60. if isinstance(comp, CoupledDEVS):
  61. eoc_count += Utils.count_eoc(comp)
  62. elif not isinstance(comp, AtomicDEVS):
  63. raise RuntimeError("Unrecognized type of component")
  64. return eoc_count
  65. @staticmethod
  66. def count_transitions(coupled):
  67. """
  68. :return: Number of atomic components in a coupled model
  69. """
  70. int_count = 0
  71. ext_count = 0
  72. for comp in coupled.component_set:
  73. if isinstance(comp, DelayedAtomicStats):
  74. int_count += comp.int_count
  75. ext_count += comp.ext_count
  76. elif isinstance(comp, CoupledDEVS):
  77. pic, pec = Utils.count_transitions(comp)
  78. int_count += pic
  79. ext_count += pec
  80. elif not isinstance(comp, Generator):
  81. raise RuntimeError("Unexpected type of component: %s" % comp.__class__.__name__)
  82. return int_count, ext_count
  83. class DevstoneUtilsTestCase(TestCase):
  84. def __init__(self, name, num_valid_params_sets: int = 10):
  85. super().__init__(name)
  86. self.valid_high_params = []
  87. self.valid_low_params = []
  88. for _ in range(int(num_valid_params_sets)):
  89. self.valid_high_params.append([random.randint(1, 100), random.randint(1, 200),
  90. random.randint(1, 1000), random.randint(1, 1000)])
  91. for _ in range(int(num_valid_params_sets)):
  92. self.valid_low_params.append([random.randint(1, 20), random.randint(1, 30),
  93. random.randint(0, 10), random.randint(0, 10)])
  94. def check_invalid_inputs(self, base_class):
  95. self.assertRaises(ValueError, base_class, "root", 0, 1, 1, 1)
  96. self.assertRaises(ValueError, base_class, "root", 1, 0, 1, 1)
  97. self.assertRaises(ValueError, base_class, "root", 1, 1, -1, 0)
  98. self.assertRaises(ValueError, base_class, "root", 1, 1, 0, -1)
  99. self.assertRaises(ValueError, base_class, "root", 0, 1, -1, -1)
  100. self.assertRaises(ValueError, base_class, "root", 0, 0, -1, -1)
  101. def test_behavior_sequential(self):
  102. self._test_behavior(Simulator)
  103. #def test_behavior_parallel(self):
  104. # self._test_behavior(ParallelThreadCoordinator)
  105. @abstractmethod
  106. def _test_behavior(self, Coordinator):
  107. pass
  108. class TestLI(DevstoneUtilsTestCase):
  109. def test_structure(self):
  110. """
  111. Check structure params: atomic modules, ic's, eic's and eoc's.
  112. """
  113. for params_tuple in self.valid_high_params:
  114. params = dict(zip(("depth", "width", "int_delay", "ext_delay"), params_tuple))
  115. with self.subTest(**params):
  116. self._check_structure(**params)
  117. def test_structure_corner_cases(self):
  118. params = {"depth": 10, "width": 1, "int_delay": 1, "ext_delay": 1}
  119. self._check_structure(**params)
  120. params["depth"] = 1
  121. self._check_structure(**params)
  122. def _check_structure(self, **params):
  123. li_root = LI("LI_root", **params)
  124. self.assertEqual(Utils.count_atomics(li_root), (params["width"] - 1) * (params["depth"] - 1) + 1)
  125. self.assertEqual(Utils.count_eic(li_root), params["width"] * (params["depth"] - 1) + 1)
  126. self.assertEqual(Utils.count_eoc(li_root), params["depth"])
  127. self.assertEqual(Utils.count_ic(li_root), 0)
  128. def _test_behavior(self, coord_type):
  129. """
  130. Check behaviour params: number of int and ext transitions.
  131. """
  132. for params_tuple in self.valid_low_params:
  133. params = dict(zip(("depth", "width", "int_delay", "ext_delay"), params_tuple))
  134. with self.subTest(**params):
  135. li_root = LI("LI_root", stats=True, **params)
  136. li_env = DEVStoneEnvironment("LI_env", li_root)
  137. sim = Simulator(li_env)
  138. sim.setVerbose(None)
  139. # sim.setTerminationTime(10.0)
  140. # sim.setStateSaving("custom")
  141. sim.simulate()
  142. int_count, ext_count = Utils.count_transitions(li_root)
  143. self.assertEqual(int_count, (params["width"] - 1) * (params["depth"] - 1) + 1)
  144. self.assertEqual(ext_count, (params["width"] - 1) * (params["depth"] - 1) + 1)
  145. def test_invalid_inputs(self):
  146. super().check_invalid_inputs(LI)
  147. class TestHI(DevstoneUtilsTestCase):
  148. def test_structure(self):
  149. """
  150. Check structure params: atomic modules, ic's, eic's and eoc's.
  151. """
  152. for params_tuple in self.valid_high_params:
  153. params = dict(zip(("depth", "width", "int_delay", "ext_delay"), params_tuple))
  154. with self.subTest(**params):
  155. self._check_structure(**params)
  156. def test_structure_corner_cases(self):
  157. params = {"depth": 10, "width": 1, "int_delay": 1, "ext_delay": 1}
  158. self._check_structure(**params)
  159. params["depth"] = 1
  160. self._check_structure(**params)
  161. def _check_structure(self, **params):
  162. hi_root = HI("HI_root", **params)
  163. self.assertEqual(Utils.count_atomics(hi_root), (params["width"] - 1) * (params["depth"] - 1) + 1)
  164. self.assertEqual(Utils.count_eic(hi_root), params["width"] * (params["depth"] - 1) + 1)
  165. self.assertEqual(Utils.count_eoc(hi_root), params["depth"])
  166. self.assertEqual(Utils.count_ic(hi_root),
  167. (params["width"] - 2) * (params["depth"] - 1) if params["width"] > 2 else 0)
  168. def _test_behavior(self, coord_type):
  169. """
  170. Check behaviour params: number of int and ext transitions.
  171. """
  172. for params_tuple in self.valid_low_params:
  173. params = dict(zip(("depth", "width", "int_delay", "ext_delay"), params_tuple))
  174. with self.subTest(**params):
  175. hi_root = HI("HI_root", stats=True, **params)
  176. hi_env = DEVStoneEnvironment("HI_env", hi_root)
  177. sim = Simulator(hi_env)
  178. sim.setVerbose(None)
  179. # sim.setTerminationTime(10.0)
  180. # sim.setStateSaving("custom")
  181. sim.simulate()
  182. int_count, ext_count = Utils.count_transitions(hi_root)
  183. self.assertEqual(int_count, (((params["width"] - 1) * params["width"]) / 2) * (params["depth"] - 1) + 1)
  184. self.assertEqual(ext_count, (((params["width"] - 1) * params["width"]) / 2) * (params["depth"] - 1) + 1)
  185. def test_invalid_inputs(self):
  186. super().check_invalid_inputs(HI)
  187. class TestHO(DevstoneUtilsTestCase):
  188. def test_structure(self):
  189. """
  190. Check structure params: atomic modules, ic's, eic's and eoc's.
  191. """
  192. for params_tuple in self.valid_high_params:
  193. params = dict(zip(("depth", "width", "int_delay", "ext_delay"), params_tuple))
  194. with self.subTest(**params):
  195. self._check_structure(**params)
  196. def test_structure_corner_cases(self):
  197. params = {"depth": 10, "width": 1, "int_delay": 1, "ext_delay": 1}
  198. self._check_structure(**params)
  199. params["depth"] = 1
  200. self._check_structure(**params)
  201. def _check_structure(self, **params):
  202. ho_root = HO("HO_root", **params)
  203. self.assertEqual(Utils.count_atomics(ho_root), (params["width"] - 1) * (params["depth"] - 1) + 1)
  204. self.assertEqual(Utils.count_eic(ho_root), (params["width"] + 1) * (params["depth"] - 1) + 1)
  205. self.assertEqual(Utils.count_eoc(ho_root), params["width"] * (params["depth"] - 1) + 1)
  206. self.assertEqual(Utils.count_ic(ho_root),
  207. (params["width"] - 2) * (params["depth"] - 1) if params["width"] > 2 else 0)
  208. def _test_behavior(self, coord_type):
  209. """
  210. Check behaviour params: number of int and ext transitions.
  211. """
  212. for params_tuple in self.valid_low_params[0]:
  213. params = dict(zip(("depth", "width", "int_delay", "ext_delay"), params_tuple))
  214. with self.subTest(**params):
  215. ho_root = HO("HO_root", stats=True, **params)
  216. ho_env = DEVStoneEnvironment("HO_env", ho_root)
  217. sim = Simulator(ho_env)
  218. sim.setVerbose(None)
  219. # sim.setTerminationTime(10.0)
  220. # sim.setStateSaving("custom")
  221. sim.simulate()
  222. int_count, ext_count = Utils.count_transitions(ho_env)
  223. self.assertEqual(int_count, (((params["width"] - 1) * params["width"]) / 2) * (params["depth"] - 1) + 1)
  224. self.assertEqual(ext_count, (((params["width"] - 1) * params["width"]) / 2) * (params["depth"] - 1) + 1)
  225. def test_invalid_inputs(self):
  226. super().check_invalid_inputs(HO)
  227. class TestHOmod(DevstoneUtilsTestCase):
  228. def test_structure(self):
  229. """
  230. Check structure params: atomic modules, ic's, eic's and eoc's.
  231. """
  232. for params_tuple in self.valid_high_params:
  233. params = dict(zip(("depth", "width", "int_delay", "ext_delay"), params_tuple))
  234. with self.subTest(**params):
  235. self._check_structure(**params)
  236. def test_structure_corner_cases(self):
  237. params = {"depth": 10, "width": 1, "int_delay": 1, "ext_delay": 1}
  238. self._check_structure(**params)
  239. params["depth"] = 1
  240. self._check_structure(**params)
  241. def _check_structure(self, **params):
  242. homod_root = HOmod("HOmod_root", **params)
  243. self.assertEqual(Utils.count_atomics(homod_root),
  244. ((params["width"] - 1) + ((params["width"] - 1) * params["width"]) / 2) * (
  245. params["depth"] - 1) + 1)
  246. self.assertEqual(Utils.count_eic(homod_root), (2 * (params["width"] - 1) + 1) * (params["depth"] - 1) + 1)
  247. self.assertEqual(Utils.count_eoc(homod_root), params["depth"])
  248. # ICs relative to the "triangular" section
  249. exp_ic = (((params["width"] - 2) * (params["width"] - 1)) / 2) if params["width"] > 1 else 0
  250. # Plus the ones relative to the connection from the 2nd to 1st row...
  251. exp_ic += (params["width"] - 1) ** 2
  252. # ...and from the 1st to the couple component
  253. exp_ic += params["width"] - 1
  254. # Multiplied by the number of layers (except the deepest one, that doesn't have ICs)
  255. exp_ic *= (params["depth"] - 1)
  256. self.assertEqual(Utils.count_ic(homod_root), exp_ic)
  257. def _test_behavior(self, coord_type):
  258. """
  259. Check behaviour params: number of int and ext transitions.
  260. """
  261. for params_tuple in self.valid_low_params:
  262. params = dict(zip(("depth", "width", "int_delay", "ext_delay"), params_tuple))
  263. with self.subTest(**params):
  264. homod_root = HOmod("HOmod_root", stats=True, **params)
  265. homod_env = DEVStoneEnvironment("HOmod_env", homod_root)
  266. sim = Simulator(homod_env)
  267. sim.setVerbose(None)
  268. # sim.setTerminationTime(10.0)
  269. # sim.setStateSaving("custom")
  270. sim.simulate()
  271. calc_in = lambda x, w: 1 + (x - 1) * (w - 1)
  272. exp_trans = 1
  273. tr_atomics = sum(range(1, params["width"]))
  274. for i in range(1, params["depth"]):
  275. num_inputs = calc_in(i, params["width"])
  276. trans_first_row = (params["width"] - 1) * (num_inputs + params["width"] - 1)
  277. exp_trans += num_inputs * tr_atomics + trans_first_row
  278. int_count, ext_count = Utils.count_transitions(homod_env)
  279. self.assertEqual(int_count, exp_trans)
  280. self.assertEqual(ext_count, exp_trans)
  281. def test_invalid_inputs(self):
  282. super().check_invalid_inputs(HOmod)