test.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. import unittest
  2. from dataclasses import *
  3. from sccd.model.model import *
  4. from sccd.controller.controller import *
  5. from lib.test_parser import *
  6. import threading
  7. import queue
  8. class Test(unittest.TestCase):
  9. def __init__(self, src: str):
  10. super().__init__()
  11. self.src = src
  12. def __str__(self):
  13. return self.src
  14. def runTest(self):
  15. parser = create_test_parser(self.src)
  16. try:
  17. test_variants = parse(self.src, parser)
  18. except Exception as e:
  19. print_debug(e)
  20. raise e
  21. for test in test_variants:
  22. print_debug('\n'+test.name)
  23. pipe = queue.Queue()
  24. # interrupt = queue.Queue()
  25. controller = Controller(test.model)
  26. for i in test.input:
  27. controller.add_input(i)
  28. def controller_thread():
  29. try:
  30. # Run as-fast-as-possible, always advancing time to the next item in event queue, no sleeping.
  31. # The call returns when the event queue is empty and therefore the simulation is finished.
  32. controller.run_until(None, pipe)
  33. except Exception as e:
  34. print_debug(e)
  35. pipe.put(e, block=True, timeout=None)
  36. return
  37. # Signal end of output
  38. pipe.put(None, block=True, timeout=None)
  39. # start the controller
  40. thread = threading.Thread(target=controller_thread)
  41. thread.start()
  42. # check output
  43. expected = test.output
  44. actual = []
  45. def fail(msg):
  46. thread.join()
  47. def repr(output):
  48. return '\n'.join("%d: %s" % (i, str(big_step)) for i, big_step in enumerate(output))
  49. self.fail('\n'+test.name + '\n'+msg + "\n\nActual:\n" + repr(actual) + "\n\nExpected:\n" + repr(expected))
  50. while True:
  51. data = pipe.get(block=True, timeout=None)
  52. if isinstance(data, Exception):
  53. raise data # Exception was caught in Controller thread, throw it here instead.
  54. elif data is None:
  55. # End of output
  56. if len(actual) < len(expected):
  57. fail("Less output than expected.")
  58. else:
  59. break
  60. else:
  61. big_step = data
  62. big_step_index = len(actual)
  63. actual.append(big_step)
  64. if len(actual) > len(expected):
  65. fail("More output than expected.")
  66. actual_bag = actual[big_step_index]
  67. expected_bag = expected[big_step_index]
  68. if len(actual_bag) != len(expected_bag):
  69. fail("Big step %d: output differs." % big_step_index)
  70. # Sort both expected and actual lists of events before comparing.
  71. # In theory the set of events at the end of a big step is unordered.
  72. # key_f = lambda e: "%s.%s"%(e.port, e.name)
  73. # actual_bag.sort(key=key_f)
  74. # expected_bag.sort(key=key_f)
  75. for (act_event, exp_event) in zip(actual_bag, expected_bag):
  76. if act_event != exp_event:
  77. fail("Big step %d: output differs." % big_step_index)
  78. class FailingTest(Test):
  79. @unittest.expectedFailure
  80. def runTest(self):
  81. super().runTest()