test.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. import os
  2. import importlib
  3. import unittest
  4. import argparse
  5. import threading
  6. import queue
  7. from sccd.compiler.sccdc import generate
  8. from sccd.compiler.generic_generator import Platforms
  9. from sccd.runtime.infinity import INFINITY
  10. from sccd.runtime.event import Event
  11. from sccd.compiler.compiler_exceptions import *
  12. from sccd.runtime.controller import Controller
  13. BUILD_DIR = "build"
  14. class PyTestCase(unittest.TestCase):
  15. def __init__(self, src_file):
  16. unittest.TestCase.__init__(self)
  17. self.src_file = src_file
  18. self.name = os.path.splitext(self.src_file)[0]
  19. self.target_file = os.path.join(BUILD_DIR, self.name+".py")
  20. def __str__(self):
  21. return self.name
  22. def runTest(self):
  23. # Get src_file and target_file modification times
  24. src_file_mtime = os.path.getmtime(self.src_file)
  25. target_file_mtime = 0
  26. try:
  27. target_file_mtime = os.path.getmtime(self.target_file)
  28. except FileNotFoundError:
  29. pass
  30. if src_file_mtime > target_file_mtime:
  31. # (Re-)Compile test
  32. os.makedirs(os.path.dirname(self.target_file), exist_ok=True)
  33. try:
  34. generate(self.src_file, self.target_file, "python", Platforms.Threads)
  35. except TargetLanguageException :
  36. self.skipTest("meant for different target language.")
  37. return
  38. # Load compiled test
  39. module = importlib.import_module(os.path.join(BUILD_DIR, self.name).replace(os.path.sep, "."))
  40. inputs = module.Test.input_events
  41. expected = module.Test.expected_events # list of lists of Event objects
  42. model = module.Model()
  43. controller = Controller(model)
  44. # generate input
  45. if inputs:
  46. for i in inputs:
  47. controller.add_input(Event(i.name, i.port, i.parameters), int(i.time_offset))
  48. pipe = queue.Queue()
  49. def model_thread():
  50. try:
  51. # Run as-fast-as-possible, always advancing time to the next item in event queue, no sleeping.
  52. # The call returns when the event queue is empty and therefore the simulation is finished.
  53. controller.run_until(INFINITY, pipe)
  54. except Exception as e:
  55. pipe.put(e, block=True, timeout=None)
  56. return
  57. pipe.put(None, block=True, timeout=None)
  58. # start the controller
  59. thread = threading.Thread(target=model_thread)
  60. thread.start()
  61. # check output
  62. slot_index = 0
  63. while True:
  64. output = pipe.get(block=True, timeout=None)
  65. if isinstance(output, Exception):
  66. thread.join()
  67. raise output # Exception was caught in Controller thread, throw it here instead.
  68. elif output is None:
  69. self.assertEqual(slot_index, len(expected), "Less output was received than expected.")
  70. thread.join()
  71. return
  72. else:
  73. self.assertLess(slot_index, len(expected), "More output was received than expected.")
  74. exp_slot = expected[slot_index]
  75. # print("slot:", slot_index, ", events: ", output)
  76. self.assertEqual(len(exp_slot), len(output), "Slot %d length differs: Expected %s, but got %s instead." % (slot_index, exp_slot, output))
  77. # sort both expected and actual lists of events before comparing,
  78. # in theory the set of events at the end of a big step is unordered
  79. key_f = lambda e: "%s.%s"%(e.port, e.name)
  80. exp_slot.sort(key=key_f)
  81. output.sort(key=key_f)
  82. for (exp_event, event) in zip(exp_slot, output):
  83. matches = True
  84. if exp_event.name != event.name :
  85. matches = False
  86. if exp_event.port != event.port :
  87. matches = False
  88. if len(exp_event.parameters) != len(event.parameters) :
  89. matches = False
  90. for index in range(len(exp_event.parameters)) :
  91. if exp_event.parameters[index] != event.parameters[index]:
  92. matches = False
  93. self.assertTrue(matches, "Slot %d entry differs: Expected %s, but got %s instead." % (slot_index, exp_slot, output))
  94. slot_index += 1
  95. if __name__ == '__main__':
  96. suite = unittest.TestSuite()
  97. parser = argparse.ArgumentParser(description="Run SCCD tests.")
  98. parser.add_argument('test', metavar='test_path', type=str, nargs='*', help="Test to run. Can be a XML file or a directory. If a directory, it will be recursively scanned for XML files.")
  99. args = parser.parse_args()
  100. already_have = set()
  101. src_files = []
  102. def add_file(path):
  103. if path not in already_have:
  104. already_have.add(path)
  105. src_files.append(path)
  106. for p in args.test:
  107. if os.path.isdir(p):
  108. # recursively scan directories
  109. for r, dirs, files in os.walk(p):
  110. files.sort()
  111. for f in files:
  112. if f.endswith('.xml'):
  113. add_file(os.path.join(r,f))
  114. elif os.path.isfile(p):
  115. add_file(p)
  116. else:
  117. print("%s: not a file or a directory, skipped." % p)
  118. # src_files should now contain a list of XML files that need to be compiled an ran
  119. for src_file in src_files:
  120. suite.addTest(PyTestCase(src_file))
  121. unittest.TextTestRunner(verbosity=2).run(suite)
  122. if len(src_files) == 0:
  123. print("Note: no test files specified.")
  124. print()
  125. parser.print_usage()