TraceChecker.py 8.7 KB


  1. import os
  2. import subprocess
  3. import importlib.util
  4. import re
  5. from sccd.runtime.DEVS_loop import DEVSSimulator
  6. import Tester
  7. def import_target_module(module_name, file_path):
  8. spec = importlib.util.spec_from_file_location(module_name, file_path)
  9. module = importlib.util.module_from_spec(spec)
  10. spec.loader.exec_module(module)
  11. return module
  12. def extract_pattern(log_file_path, pattern):
  13. with open(log_file_path, 'r') as log_file:
  14. lines = log_file.readlines()
  15. matched_lines = [line.strip() for line in lines if re.search(pattern, line)]
  16. return matched_lines
  17. class SCCDTraceChecker:
  18. def compile(self, directory):
  19. raise NotImplementedError("Compile method must be implemented by the subclass")
  20. def run(self, directory):
  21. raise NotImplementedError("Run method must be implemented by the subclass")
  22. def check(self, directory):
  23. expected_log = os.path.join(directory, "expected_trace.txt")
  24. if os.path.exists(expected_log):
  25. lines_array = []
  26. with open(expected_log, 'r') as file:
  27. lines_array = file.readlines()
  28. return lines_array
  29. else:
  30. print(f"The file {expected_log} does not exist.")
  31. return None
  32. class PythonSCCDTraceChecker(SCCDTraceChecker):
  33. def __init__(self) -> None:
  34. super().__init__()
  35. def __str__(self):
  36. return "Python"
  37. def compile(self, directory):
  38. """
  39. Convert sccd.xml to target.py for the specified tool.
  40. """
  41. sccd_file = os.path.join(directory, 'sccd.xml')
  42. output_file = os.path.join(directory, 'Python', 'target.py')
  43. os.makedirs(os.path.join(directory, 'Python'), exist_ok=True)
  44. command = [
  45. "python",
  46. os.path.join("sccd", "compiler", "sccdc.py"),
  47. "-o", output_file,
  48. "-p", "threads",
  49. "-l", "python",
  50. sccd_file
  51. ]
  52. env = os.environ.copy()
  53. result = subprocess.run(command, env=env, capture_output=True, text=True)
  54. if result.returncode != 0:
  55. print(f"Error converting {sccd_file} for python: {result.stderr}")
  56. return result.returncode
  57. def run(self, directory):
  58. python_target = os.path.join(directory, "Python", "target.py")
  59. # Dynamically import the target module
  60. target = import_target_module("target", python_target)
  61. controller = target.Controller()
  62. controller.keep_running = False
  63. # Create the full path for the log file
  64. log_file_path = os.path.join(directory, "Python", "log.txt")
  65. # Set verbose to the log file path
  66. controller.setVerbose(log_file_path)
  67. controller.start()
  68. controller.tracers.stopTracers()
  69. def extract_output_events(self, log_file_path):
  70. output_events = []
  71. current_time = None
  72. with open(log_file_path, 'r') as file:
  73. lines = file.readlines()
  74. time_pattern = re.compile(r"__ Current Time: +([\d\.]+) +__________________________________________")
  75. event_pattern = re.compile(r'^\s*\\Event: \(event name:.*\)$')
  76. for line in lines:
  77. time_match = time_pattern.match(line)
  78. if time_match:
  79. current_time = float(time_match.group(1))
  80. event_match = event_pattern.match(line)
  81. if event_match and current_time is not None:
  82. event = line.strip()
  83. # Remove everything before '(' in each string
  84. event = event[event.index('('):]
  85. output_events.append(f"{current_time:.2f} {event}")
  86. return output_events
  87. def check(self, directory):
  88. log = os.path.join(directory, "Python", "log.txt")
  89. expected_log = os.path.join(directory, "expected_trace.txt")
  90. expected_events = []
  91. if os.path.exists(expected_log):
  92. with open(expected_log, 'r') as file:
  93. expected_events = [line.strip() for line in file.readlines()]
  94. actual_events = self.extract_output_events(log)
  95. if len(expected_events) != len(actual_events):
  96. return 0
  97. if len(expected_events) == 0 and len(actual_events) == 0:
  98. return 2
  99. for index, (item1, item2) in enumerate(zip(expected_events, actual_events)):
  100. if item1 != item2:
  101. return 0
  102. return 1
  103. class PydevsSCCDTraceChecker(SCCDTraceChecker):
  104. def __init__(self) -> None:
  105. super().__init__()
  106. def __str__(self):
  107. return "PyDEVS"
  108. def compile(self, directory):
  109. """
  110. Convert sccd.xml to target.py for the specified tool.
  111. """
  112. sccd_file = os.path.join(directory, 'sccd.xml')
  113. output_file = os.path.join(directory, "PyDEVS", 'target.py')
  114. os.makedirs(os.path.join(directory, "PyDEVS"), exist_ok=True)
  115. command = [
  116. "python",
  117. os.path.join("sccd", "compiler", "sccdc.py"),
  118. "-o", output_file,
  119. "-p", "pypDEVS",
  120. sccd_file
  121. ]
  122. env = os.environ.copy()
  123. result = subprocess.run(command, env=env, capture_output=True, text=True)
  124. if result.returncode != 0:
  125. print(f"Error converting {sccd_file} for PyDEVS: {result.stderr}")
  126. return result.returncode
  127. def run(self, directory):
  128. '''
  129. pydevs_target = os.path.join(directory, "PyDEVS", "target.py")
  130. # Dynamically import the target module
  131. target = import_target_module("target", pydevs_target)
  132. model = target.Controller(name="controller")
  133. refs = {"ui": model.in_ui}
  134. sim = DEVSSimulator(model, refs)
  135. sim.setRealTime(False)
  136. # Create the full path for the log file
  137. log_file_path = os.path.join(directory, "PyDEVS", "log.txt")
  138. # Set verbose to the log file path
  139. sim.setVerbose(log_file_path)
  140. sim.simulate()
  141. '''
  142. # Dynamically import the target module
  143. pydevs_target = os.path.join(directory, "PyDEVS", "target.py")
  144. target = import_target_module("target", pydevs_target)
  145. # Check if there is an input file
  146. input_file = os.path.join(directory, "input.txt")
  147. if not os.path.exists(input_file):
  148. input_file = None
  149. test_model = target.Controller("Controller")
  150. tester = Tester.Tester(test_model, input_file)
  151. sim = DEVSSimulator(tester)
  152. sim.setRealTime(False)
  153. # Create the full path for the log file
  154. log_file_path = os.path.join(directory, "PyDEVS", "log.txt")
  155. # Set verbose to the log file path
  156. sim.setVerbose(log_file_path)
  157. #sim.setClassicDEVS()
  158. sim.simulate()
  159. def extract_output_events(self, log_file_path):
  160. output_events = []
  161. current_time = None
  162. with open(log_file_path, 'r') as file:
  163. lines = file.readlines()
  164. time_pattern = re.compile(r"__ Current Time: +([\d\.]+) +__________________________________________")
  165. event_pattern = re.compile(r'^\s*\(event name:.*\)$')
  166. for line in lines:
  167. time_match = time_pattern.match(line)
  168. if time_match:
  169. current_time = float(time_match.group(1))
  170. event_match = event_pattern.match(line)
  171. if event_match and current_time is not None:
  172. event = line.strip()
  173. # Remove everything before '(' in each string
  174. event = event[event.index('('):]
  175. output_events.append(f"{current_time:.2f} {event}")
  176. return output_events
  177. def check(self, directory):
  178. log = os.path.join(directory, "PyDEVS", "log.txt")
  179. expected_log = os.path.join(directory, "expected_trace.txt")
  180. expected_events = []
  181. if os.path.exists(expected_log):
  182. with open(expected_log, 'r') as file:
  183. expected_events = [line.strip() for line in file.readlines()]
  184. actual_events = self.extract_output_events(log)
  185. return_code = 1
  186. if len(expected_events) != len(actual_events):
  187. return_code = 0
  188. if len(expected_events) == 0 and len(actual_events) == 0:
  189. return_code = 2
  190. for index, (item1, item2) in enumerate(zip(expected_events, actual_events)):
  191. if item1 != item2:
  192. return_code = 0
  193. if return_code == 0:
  194. # Write actual events to a file
  195. with open(os.path.join(directory, "PyDEVS", "faulty_log.txt"), 'w') as file:
  196. file.writelines([event + '\n' for event in actual_events])
  197. return return_code