test.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. import unittest
  2. from dataclasses import *
  3. from sccd.model.model import *
  4. from sccd.controller.controller import *
  5. from lib.test_parser import *
  6. import threading
  7. import queue
  8. class Test(unittest.TestCase):
  9. def __init__(self, src: str):
  10. super().__init__()
  11. self.src = src
  12. def __str__(self):
  13. return self.src
  14. def runTest(self):
  15. parser = TestParser()
  16. parser.tests.push([])
  17. try:
  18. statechart = parser.parse(self.src)
  19. except Exception as e:
  20. print_debug(e)
  21. raise
  22. test_variants = parser.tests.pop()
  23. for test in test_variants:
  24. print_debug('\n'+test.name)
  25. pipe = queue.Queue()
  26. # interrupt = queue.Queue()
  27. controller = Controller(test.model)
  28. for i in test.input:
  29. controller.add_input(i)
  30. def controller_thread():
  31. try:
  32. # Run as-fast-as-possible, always advancing time to the next item in event queue, no sleeping.
  33. # The call returns when the event queue is empty and therefore the simulation is finished.
  34. controller.run_until(None, pipe)
  35. except Exception as e:
  36. print_debug(e)
  37. pipe.put(e, block=True, timeout=None)
  38. return
  39. # Signal end of output
  40. pipe.put(None, block=True, timeout=None)
  41. # start the controller
  42. thread = threading.Thread(target=controller_thread)
  43. thread.start()
  44. # check output
  45. expected = test.output
  46. actual = []
  47. def fail(msg, kill=False):
  48. if kill:
  49. pass
  50. # interrupt.put(None)
  51. thread.join()
  52. def repr(output):
  53. return '\n'.join("%d: %s" % (i, str(big_step)) for i, big_step in enumerate(output))
  54. self.fail('\n'+test.name + '\n'+msg + "\n\nActual:\n" + repr(actual) + ("\n(possibly more output, instance killed)" if kill else "") + "\n\nExpected:\n" + repr(expected))
  55. while True:
  56. data = pipe.get(block=True, timeout=None)
  57. if isinstance(data, Exception):
  58. raise data # Exception was caught in Controller thread, throw it here instead.
  59. elif data is None:
  60. # End of output
  61. if len(actual) < len(expected):
  62. fail("Less output than expected.")
  63. else:
  64. break
  65. else:
  66. big_step = data
  67. big_step_index = len(actual)
  68. actual.append(big_step)
  69. if len(actual) > len(expected):
  70. fail("More output than expected.", kill=True)
  71. actual_bag = actual[big_step_index]
  72. expected_bag = expected[big_step_index]
  73. if len(actual_bag) != len(expected_bag):
  74. fail("Big step %d: output differs." % big_step_index, kill=True)
  75. # Sort both expected and actual lists of events before comparing.
  76. # In theory the set of events at the end of a big step is unordered.
  77. key_f = lambda e: "%s.%s"%(e.port, e.name)
  78. actual_bag.sort(key=key_f)
  79. expected_bag.sort(key=key_f)
  80. for (act_event, exp_event) in zip(actual_bag, expected_bag):
  81. matches = True
  82. if exp_event.name != act_event.name :
  83. matches = False
  84. if exp_event.port != act_event.port :
  85. matches = False
  86. if len(exp_event.parameters) != len(act_event.parameters) :
  87. matches = False
  88. for index in range(len(exp_event.parameters)) :
  89. if exp_event.parameters[index] != act_event.parameters[index]:
  90. matches = False
  91. if not matches:
  92. fail("Big step %d: output differs." % big_step_index, kill=True)
  93. class FailingTest(Test):
  94. @unittest.expectedFailure
  95. def runTest(self):
  96. super().runTest()