TraceChecker.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. import os
  2. import subprocess
  3. import importlib.util
  4. import re
  5. from sccd.runtime.DEVS_loop import DEVSSimulator
  6. def import_target_module(module_name, file_path):
  7. spec = importlib.util.spec_from_file_location(module_name, file_path)
  8. module = importlib.util.module_from_spec(spec)
  9. spec.loader.exec_module(module)
  10. return module
  11. def extract_pattern(log_file_path, pattern):
  12. with open(log_file_path, 'r') as log_file:
  13. lines = log_file.readlines()
  14. matched_lines = [line.strip() for line in lines if re.search(pattern, line)]
  15. return matched_lines
  16. class SCCDTraceChecker:
  17. def compile(self, directory):
  18. raise NotImplementedError("Compile method must be implemented by the subclass")
  19. def run(self, directory):
  20. raise NotImplementedError("Run method must be implemented by the subclass")
  21. def check(self, directory):
  22. expected_log = os.path.join(directory, "expected_trace.txt")
  23. if os.path.exists(expected_log):
  24. lines_array = []
  25. with open(expected_log, 'r') as file:
  26. lines_array = file.readlines()
  27. return lines_array
  28. else:
  29. print(f"The file {expected_log} does not exist.")
  30. return None
  31. class PythonSCCDTraceChecker(SCCDTraceChecker):
  32. def __init__(self) -> None:
  33. super().__init__()
  34. def __str__(self):
  35. return "Python"
  36. def compile(self, directory):
  37. """
  38. Convert sccd.xml to target.py for the specified tool.
  39. """
  40. sccd_file = os.path.join(directory, 'sccd.xml')
  41. output_file = os.path.join(directory, 'Python', 'target.py')
  42. os.makedirs(os.path.join(directory, 'Python'), exist_ok=True)
  43. command = [
  44. "python",
  45. os.path.join("sccd", "compiler", "sccdc.py"),
  46. "-o", output_file,
  47. "-p", "threads",
  48. "-l", "python",
  49. sccd_file
  50. ]
  51. env = os.environ.copy()
  52. result = subprocess.run(command, env=env, capture_output=True, text=True)
  53. if result.returncode != 0:
  54. print(f"Error converting {sccd_file} for python: {result.stderr}")
  55. return result.returncode
  56. def run(self, directory):
  57. python_target = os.path.join(directory, "Python", "target.py")
  58. # Dynamically import the target module
  59. target = import_target_module("target", python_target)
  60. controller = target.Controller()
  61. controller.keep_running = False
  62. # Create the full path for the log file
  63. log_file_path = os.path.join(directory, "Python", "log.txt")
  64. # Set verbose to the log file path
  65. controller.setVerbose(log_file_path)
  66. controller.start()
  67. controller.tracers.stopTracers()
  68. def extract_output_events(self, log_file_path):
  69. output_events = []
  70. current_time = None
  71. with open(log_file_path, 'r') as file:
  72. lines = file.readlines()
  73. time_pattern = re.compile(r"__ Current Time: +([\d\.]+) +__________________________________________")
  74. event_pattern = re.compile(r'^\s*\\Event: \(event name:.*\)$')
  75. for line in lines:
  76. time_match = time_pattern.match(line)
  77. if time_match:
  78. current_time = float(time_match.group(1))
  79. event_match = event_pattern.match(line)
  80. if event_match and current_time is not None:
  81. event = line.strip()
  82. # Remove everything before '(' in each string
  83. event = event[event.index('('):]
  84. output_events.append(f"{current_time:.2f} {event}")
  85. return output_events
  86. def check(self, directory):
  87. log = os.path.join(directory, "Python", "log.txt")
  88. expected_log = os.path.join(directory, "expected_trace.txt")
  89. expected_events = []
  90. if os.path.exists(expected_log):
  91. with open(expected_log, 'r') as file:
  92. expected_events = [line.strip() for line in file.readlines()]
  93. actual_events = self.extract_output_events(log)
  94. if len(expected_events) != len(actual_events):
  95. return 0
  96. if len(expected_events) == 0 and len(actual_events) == 0:
  97. return 2
  98. for index, (item1, item2) in enumerate(zip(expected_events, actual_events)):
  99. if item1 != item2:
  100. return 0
  101. return 1
  102. class PydevsSCCDTraceChecker(SCCDTraceChecker):
  103. def __init__(self) -> None:
  104. super().__init__()
  105. def __str__(self):
  106. return "PyDEVS"
  107. def compile(self, directory):
  108. """
  109. Convert sccd.xml to target.py for the specified tool.
  110. """
  111. sccd_file = os.path.join(directory, 'sccd.xml')
  112. output_file = os.path.join(directory, "PyDEVS", 'target.py')
  113. os.makedirs(os.path.join(directory, "PyDEVS"), exist_ok=True)
  114. command = [
  115. "python",
  116. os.path.join("sccd", "compiler", "sccdc.py"),
  117. "-o", output_file,
  118. "-p", "pypDEVS",
  119. sccd_file
  120. ]
  121. env = os.environ.copy()
  122. result = subprocess.run(command, env=env, capture_output=True, text=True)
  123. if result.returncode != 0:
  124. print(f"Error converting {sccd_file} for PyDEVS: {result.stderr}")
  125. return result.returncode
  126. def run(self, directory):
  127. pydevs_target = os.path.join(directory, "PyDEVS", "target.py")
  128. # Dynamically import the target module
  129. target = import_target_module("target", pydevs_target)
  130. model = target.Controller(name="controller")
  131. refs = {"ui": model.in_ui}
  132. sim = DEVSSimulator(model, refs)
  133. sim.setRealTime(False)
  134. # Create the full path for the log file
  135. log_file_path = os.path.join(directory, "PyDEVS", "log.txt")
  136. # Set verbose to the log file path
  137. sim.setVerbose(log_file_path)
  138. sim.simulate()
  139. def extract_output_events(self, log_file_path):
  140. output_events = []
  141. current_time = None
  142. with open(log_file_path, 'r') as file:
  143. lines = file.readlines()
  144. time_pattern = re.compile(r"__ Current Time: +([\d\.]+) +__________________________________________")
  145. event_pattern = re.compile(r'^\s*\(event name:.*\)$')
  146. for line in lines:
  147. time_match = time_pattern.match(line)
  148. if time_match:
  149. current_time = float(time_match.group(1))
  150. event_match = event_pattern.match(line)
  151. if event_match and current_time is not None:
  152. event = line.strip()
  153. # Remove everything before '(' in each string
  154. event = event[event.index('('):]
  155. output_events.append(f"{current_time:.2f} {event}")
  156. return output_events
  157. def check(self, directory):
  158. log = os.path.join(directory, "PyDEVS", "log.txt")
  159. expected_log = os.path.join(directory, "expected_trace.txt")
  160. expected_events = []
  161. if os.path.exists(expected_log):
  162. with open(expected_log, 'r') as file:
  163. expected_events = [line.strip() for line in file.readlines()]
  164. actual_events = self.extract_output_events(log)
  165. return_code = 1
  166. if len(expected_events) != len(actual_events):
  167. return_code = 0
  168. if len(expected_events) == 0 and len(actual_events) == 0:
  169. return_code = 2
  170. for index, (item1, item2) in enumerate(zip(expected_events, actual_events)):
  171. if item1 != item2:
  172. return_code = 0
  173. if return_code == 0:
  174. # Write actual events to a file
  175. with open(os.path.join(directory, "PyDEVS", "faulty_log.txt"), 'w') as file:
  176. file.writelines([event + '\n' for event in actual_events])
  177. return return_code