test.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. import os
  2. import importlib
  3. import unittest
  4. import argparse
  5. import threading
  6. from sccd.compiler.sccdc import generate
  7. from sccd.compiler.generic_generator import Platforms
  8. from sccd.compiler.compiler_exceptions import *
  9. from sccd.runtime.statecharts_core import *
  10. BUILD_DIR = "build"
  11. class PyTestCase(unittest.TestCase):
  12. def __init__(self, src_file):
  13. unittest.TestCase.__init__(self)
  14. self.src_file = src_file
  15. self.name = os.path.splitext(self.src_file)[0]
  16. self.target_file = os.path.join(BUILD_DIR, self.name+".py")
  17. def __str__(self):
  18. return self.name
  19. def runTest(self):
  20. # Get src_file and target_file modification times
  21. src_file_mtime = os.path.getmtime(self.src_file)
  22. target_file_mtime = 0
  23. try:
  24. target_file_mtime = os.path.getmtime(self.target_file)
  25. except FileNotFoundError:
  26. pass
  27. if src_file_mtime > target_file_mtime:
  28. os.makedirs(os.path.dirname(self.target_file), exist_ok=True)
  29. try:
  30. generate(self.src_file, self.target_file, "python", Platforms.Threads)
  31. except TargetLanguageException :
  32. self.skipTest("meant for different target language.")
  33. return
  34. module = importlib.import_module(os.path.join(BUILD_DIR, self.name).replace(os.path.sep, "."))
  35. inputs = module.Test.input_events
  36. expected = module.Test.expected_events # list of lists of Event objects
  37. controller = module.Controller(False)
  38. output_ports = set()
  39. expected_result = [] # what happens here is basically a deep-copy of the list-of-lists, why?
  40. for s in expected:
  41. slot = []
  42. for event in s:
  43. slot.append(event)
  44. output_ports.add(event.port)
  45. if slot:
  46. expected_result.append(slot)
  47. output_listener = controller.createOutputListener(list(output_ports))
  48. # generate input
  49. if inputs:
  50. for i in inputs:
  51. controller.addInput(Event(i.name, i.port, i.parameters), int(i.time_offset * 1000))
  52. # start the controller
  53. thread = threading.Thread(target=controller.start())
  54. thread.start()
  55. # check output
  56. for (slot_index, slot) in enumerate(expected_result) :
  57. output_events = output_listener.fetch_blocking()
  58. print("slot:", slot_index, ", events: ", output_events)
  59. # sort both expected and actual lists of events before comparing,
  60. # in theory the set of events at the end of a big step is unordered
  61. sortkey_f = lambda e: "%s.%s"%(e.port, e.name)
  62. slot.sort(key=sortkey_f)
  63. output_events.sort(key=sortkey_f)
  64. self.assertEqual(len(slot), len(output_events), "Expected %d output events, instead got: %d" % (len(slot), len(output_events)))
  65. for (expected, actual) in zip(slot, output_events):
  66. matches = True
  67. if expected.name != actual.name :
  68. matches = False
  69. if expected.port != actual.port :
  70. matches = False
  71. actual_parameters = actual.getParameters()
  72. if len(expected.parameters) != len(actual_parameters) :
  73. matches = False
  74. for index in range(len(expected.parameters)) :
  75. if expected.parameters[index] != actual_parameters[index]:
  76. matches = False
  77. self.assertTrue(matches, self.src_file + ", expected results slot " + str(slot_index) + " mismatch. Expected " + str(expected) + ", but got " + str(actual) + " instead.") # no match found in the options
  78. # wait for controller to finish
  79. thread.join()
  80. # check if there are no extra events
  81. next_event = output_listener.fetch_nonblocking()
  82. self.assertEqual(next_event, None, "More output events than expected on selected ports: " + str(next_event))
  83. if __name__ == '__main__':
  84. suite = unittest.TestSuite()
  85. parser = argparse.ArgumentParser(description="Run SCCD tests.")
  86. parser.add_argument('test', metavar='test_path', type=str, nargs='*', help="Test to run. Can be a XML file or a directory. If a directory, it will be recursively scanned for XML files.")
  87. args = parser.parse_args()
  88. already_have = set()
  89. src_files = []
  90. def add_file(path):
  91. if path not in already_have:
  92. already_have.add(path)
  93. src_files.append(path)
  94. for p in args.test:
  95. if os.path.isdir(p):
  96. # recursively scan directories
  97. for r, dirs, files in os.walk(p):
  98. files.sort()
  99. for f in files:
  100. if f.endswith('.xml'):
  101. add_file(os.path.join(r,f))
  102. elif os.path.isfile(p):
  103. add_file(p)
  104. else:
  105. print("%s: not a file or a directory, skipped." % p)
  106. # src_files should now contain a list of XML files that need to be compiled an ran
  107. for src_file in src_files:
  108. suite.addTest(PyTestCase(src_file))
  109. unittest.TextTestRunner(verbosity=2).run(suite)
  110. if len(src_files) == 0:
  111. print("Note: no test files specified.")
  112. print()
  113. parser.print_usage()