test.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import unittest
  2. import functools
  3. from dataclasses import *
  4. from sccd.model.model import *
  5. from sccd.controller.controller import *
  6. from lib.test_parser import *
  7. import threading
  8. import queue
  9. class Test(unittest.TestCase):
  10. def __init__(self, src: str):
  11. super().__init__()
  12. self.src = src
  13. def __str__(self):
  14. return self.src
  15. def runTest(self):
  16. statechart_parser = functools.partial(create_statechart_parser, src_file=self.src)
  17. test_parser = create_test_parser(statechart_parser)
  18. try:
  19. test_variants = parse_f(self.src, test_parser)
  20. except Exception as e:
  21. print_debug(e)
  22. raise e
  23. for test in test_variants:
  24. print_debug('\n'+test.name)
  25. pipe = queue.Queue()
  26. # interrupt = queue.Queue()
  27. controller = Controller(test.model)
  28. for i in test.input:
  29. controller.add_input(i)
  30. def controller_thread():
  31. try:
  32. # Run as-fast-as-possible, always advancing time to the next item in event queue, no sleeping.
  33. # The call returns when the event queue is empty and therefore the simulation is finished.
  34. controller.run_until(None, pipe)
  35. except Exception as e:
  36. print_debug(e)
  37. pipe.put(e, block=True, timeout=None)
  38. return
  39. # Signal end of output
  40. pipe.put(None, block=True, timeout=None)
  41. # start the controller
  42. thread = threading.Thread(target=controller_thread)
  43. thread.start()
  44. # check output
  45. expected = test.output
  46. actual = []
  47. def fail(msg):
  48. thread.join()
  49. def repr(output):
  50. return '\n'.join("%d: %s" % (i, str(big_step)) for i, big_step in enumerate(output))
  51. self.fail('\n'+test.name + '\n'+msg + "\n\nActual:\n" + repr(actual) + "\n\nExpected:\n" + repr(expected))
  52. while True:
  53. data = pipe.get(block=True, timeout=None)
  54. if isinstance(data, Exception):
  55. raise data # Exception was caught in Controller thread, throw it here instead.
  56. elif data is None:
  57. # End of output
  58. if len(actual) < len(expected):
  59. fail("Less output than expected.")
  60. else:
  61. break
  62. else:
  63. big_step = data
  64. big_step_index = len(actual)
  65. actual.append(big_step)
  66. if len(actual) > len(expected):
  67. fail("More output than expected.")
  68. actual_bag = actual[big_step_index]
  69. expected_bag = expected[big_step_index]
  70. if len(actual_bag) != len(expected_bag):
  71. fail("Big step %d: output differs." % big_step_index)
  72. # Sort both expected and actual lists of events before comparing.
  73. # In theory the set of events at the end of a big step is unordered.
  74. # key_f = lambda e: "%s.%s"%(e.port, e.name)
  75. # actual_bag.sort(key=key_f)
  76. # expected_bag.sort(key=key_f)
  77. for (act_event, exp_event) in zip(actual_bag, expected_bag):
  78. if act_event != exp_event:
  79. fail("Big step %d: output differs." % big_step_index)
  80. class FailingTest(Test):
  81. @unittest.expectedFailure
  82. def runTest(self):
  83. super().runTest()