TraceChecker.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. import os
  2. import re
  3. import subprocess
  4. import importlib.util
  5. from sccd.runtime.statecharts_core import Event
  6. import TraceVerifier.DEVSTesterUnit as DEVSTesterUnit
  7. from sccd.runtime.DEVSSimulatorWrapper import DEVSSimulator
  8. def import_target_module(module_name, file_path):
  9. # Dynamically import a target module for the runner to execute
  10. spec = importlib.util.spec_from_file_location(module_name, file_path)
  11. module = importlib.util.module_from_spec(spec)
  12. spec.loader.exec_module(module)
  13. return module
  14. class SCCDTraceChecker:
  15. '''
  16. Abstract class, this class defines the methods and parameters a custom trace checker
  17. needs to function in the testing framework
  18. '''
  19. def __init__(self) -> None:
  20. self.config = None
  21. self.directory = None
  22. def compile(self):
  23. raise NotImplementedError("Compile method must be implemented by the subclass")
  24. def run(self):
  25. raise NotImplementedError("Run method must be implemented by the subclass")
  26. def filter(self):
  27. raise NotImplementedError("Check method must be implemented by the subclass")
  28. class PythonSCCDTraceChecker(SCCDTraceChecker):
  29. def __init__(self) -> None:
  30. super().__init__()
  31. self.id_dict = {}
  32. def __str__(self):
  33. return "Python"
  34. def compile(self):
  35. '''
  36. Convert sccd.xml to target.py for the specified tool.
  37. '''
  38. sccd_file = os.path.join(self.directory, self.config['model'])
  39. output_file = os.path.join(self.directory, 'Python', 'target.py')
  40. os.makedirs(os.path.join(self.directory, 'Python'), exist_ok=True)
  41. command = [
  42. "python",
  43. os.path.join("sccd", "compiler", "sccdc.py"),
  44. "-o", output_file,
  45. "-p", "threads",
  46. "-l", "python",
  47. sccd_file
  48. ]
  49. env = os.environ.copy()
  50. result = subprocess.run(command, env=env, capture_output=True, text=True)
  51. if result.returncode != 0:
  52. print(f"Error converting {sccd_file} for python: {result.stderr}")
  53. return result.returncode
  54. def run(self):
  55. '''
  56. Run the model (target.py) with a defined execution function
  57. '''
  58. # Dynamically import the target module
  59. python_target = os.path.join(self.directory, "Python", "target.py")
  60. target = import_target_module("target", python_target)
  61. controller = target.Controller()
  62. controller.keep_running = False
  63. # Check if there is an input file for input events
  64. input_file = os.path.join(self.directory, self.config["input"])
  65. if os.path.exists(input_file):
  66. # add the inputs before the simulation is started with the correct event time
  67. with open(input_file, 'r') as file:
  68. lines = file.readlines()
  69. input_events = [line.strip() for line in lines]
  70. for event in input_events:
  71. space_pos = event.find(' ')
  72. if space_pos == -1:
  73. raise ValueError("Line format is incorrect. No space found to split time and event.")
  74. # Extract the time and event parts
  75. next_event_time = float(event[:space_pos]) * 1000
  76. event_part = event[space_pos + 1:].strip() # Strip to remove any leading/trailing whitespace
  77. name, port, parameters = DEVSTesterUnit.parse_event(event_part)
  78. # list is as string, convert it to actual list
  79. parameters = eval(parameters)
  80. actual_event = Event(name, port, parameters)
  81. controller.addInput(actual_event, next_event_time)
  82. # Create the full path for the log file
  83. log_file_path = os.path.join(self.directory, "Python", "log.txt")
  84. # Set verbose to the log file path
  85. controller.setVerbose(log_file_path)
  86. # Start the execution
  87. controller.start()
  88. controller.tracers.stopTracers()
  89. def extract_globalio(self, line, context):
  90. # Extract global output event from the line if present
  91. event_pattern = re.compile(r'^\s*\\Event: \(event name:.*\)$')
  92. event_match = event_pattern.match(line)
  93. if event_match and context["time"] is not None:
  94. if context["context"] == "global output":
  95. # Remove everything before the event string
  96. event = line.strip()
  97. event = event[event.index('('):]
  98. # Narrow casts are also matched by regex so filter these out
  99. if not "<narrow_cast>" in event:
  100. return f"{context["time"]:.2f} {event}"
  101. return None
  102. def extract_internalio(self, line, context):
  103. # Extract internal events, e.g. events sended to and from the object manager and classes
  104. event_pattern = re.compile(r'^\s*\\Event: \(event name:.*\)$')
  105. event_match = event_pattern.match(line)
  106. if event_match and context["context"] is not None:
  107. if context["context"] == "internal input" or context["context"] == "internal output":
  108. # Remove everything before the event in string
  109. event = line.strip()
  110. event = event[event.index('('):]
  111. # Special for python SCCD, the first parameter is sometimes the source but the source is a memory address, give it a unique index
  112. # So that it will match DEVS and is consistent in traces.
  113. id_pattern = re.compile(
  114. r'.*\(event name:.*; port:.*; parameters: \[\<[\w\.]+ object at (0x[0-9a-fA-F]+)\>.*\]'
  115. )
  116. id_match = id_pattern.search(event)
  117. if id_match:
  118. address = id_match.group(1)
  119. if address not in self.id_dict:
  120. self.id_dict[address] = len(self.id_dict)
  121. # Replace the entire object reference wit a unique id
  122. start_idx = event.find('<')
  123. end_idx = event.find('>', start_idx) + 1
  124. if start_idx != -1 and end_idx != -1:
  125. event = event[:start_idx] + str(self.id_dict[address]) + event[end_idx:]
  126. return f"{context["time"]:.2f} {event}"
  127. return None
  128. def extract_statechart(self, line, context):
  129. # extract statechart statements
  130. if line != "\n":
  131. if "TRANSITION FIRED" in line:
  132. context["extra_info"] = "transition"
  133. elif "EXIT STATE" in line:
  134. context["extra_info"] = "exit"
  135. elif "ENTER STATE" in line:
  136. context["extra_info"] = "enter"
  137. else:
  138. # Remove all unnecessary characters
  139. if context["extra_info"] == "transition":
  140. line = line[line.index('('):line.rfind(')')+1]
  141. else:
  142. line = line[line.index('/'):]
  143. line = line.replace('\n', '')
  144. line = line.replace('\t', '')
  145. return f"{context["time"]:.2f} {context['model']}: {context["extra_info"]} {line}"
  146. return None
  147. def check_state(self, line, context):
  148. time_pattern = re.compile(r"__ Current Time: +([\d\.]+) +__________________________________________")
  149. time_match = time_pattern.match(line)
  150. if time_match:
  151. context = {
  152. "time": float(time_match.group(1)),
  153. "model": None,
  154. "context": None,
  155. "extra_info": None
  156. }
  157. if "INPUT EVENT from <ObjectManager>" in line:
  158. context = {
  159. "time": context["time"],
  160. "context": "internal input",
  161. }
  162. elif "OUTPUT EVENT to <ObjectManager>" in line:
  163. context = {
  164. "time": context["time"],
  165. "context": "internal output",
  166. }
  167. elif "INPUT EVENT" in line:
  168. context = {
  169. "time": context["time"],
  170. "context": "global input",
  171. }
  172. elif "OUTPUT EVENT" in line:
  173. context = {
  174. "time": context["time"],
  175. "context": "global output",
  176. "extra_info": None
  177. }
  178. elif "EXIT STATE" in line:
  179. pattern = r"<(.*?)>"
  180. # Search for the pattern in the string
  181. match = re.search(pattern, line)
  182. context = {
  183. "time": context["time"],
  184. "model": match.group(1),
  185. "context": "state",
  186. "extra_info": "Exit "
  187. }
  188. elif "ENTER STATE" in line:
  189. pattern = r"<(.*?)>"
  190. # Search for the pattern in the string
  191. match = re.search(pattern, line)
  192. context = {
  193. "time": context["time"],
  194. "model": match.group(1),
  195. "context": "state",
  196. "extra_info": "Enter "
  197. }
  198. elif "TRANSITION FIRED" in line:
  199. pattern = r"<(.*?)>"
  200. # Search for the pattern in the string
  201. match = re.search(pattern, line)
  202. context = {
  203. "time": context["time"],
  204. "model": match.group(1),
  205. "context": "state",
  206. "extra_info": "Transition "
  207. }
  208. return context
  209. def filter(self):
  210. log = os.path.join(self.directory, "Python", "log.txt")
  211. output_events = []
  212. context = {
  213. "time": None,
  214. "model": None,
  215. "context": None,
  216. "extra_info": None
  217. }
  218. with open(log, 'r') as file:
  219. lines = file.readlines()
  220. for line in lines:
  221. context = self.check_state(line, context)
  222. if "external" in self.config["trace"]:
  223. io_event = self.extract_globalio(line, context)
  224. if io_event is not None:
  225. output_events.append(io_event)
  226. if "internal" in self.config["trace"]:
  227. internal_event = self.extract_internalio(line, context)
  228. if internal_event is not None:
  229. output_events.append(internal_event)
  230. if "statechart" in self.config["trace"]:
  231. if context['context'] == "state":
  232. statechart_event = self.extract_statechart(line, context)
  233. if statechart_event is not None:
  234. output_events.append(statechart_event)
  235. return output_events
  236. class ClassicDevsSCCDTraceChecker(SCCDTraceChecker):
  237. def __init__(self) -> None:
  238. super().__init__()
  239. def __str__(self):
  240. return "ClassicDEVS"
  241. def compile(self):
  242. """
  243. Convert sccd.xml to target.py.
  244. """
  245. sccd_file = os.path.join(self.directory, self.config["model"])
  246. output_file = os.path.join(self.directory, "ClassicDEVS", 'target.py')
  247. os.makedirs(os.path.join(self.directory, "ClassicDEVS"), exist_ok=True)
  248. command = [
  249. "python",
  250. os.path.join("sccd", "compiler", "sccdc.py"),
  251. "-o", output_file,
  252. "-p", "classicdevs",
  253. sccd_file
  254. ]
  255. env = os.environ.copy()
  256. result = subprocess.run(command, env=env, capture_output=True, text=True)
  257. if result.returncode != 0:
  258. print(f"Error converting {sccd_file} for ClassicDEVS: {result.stderr}")
  259. return result.returncode
  260. def run(self):
  261. # Dynamically import the target module
  262. pydevs_target = os.path.join(self.directory, "ClassicDEVS", "target.py")
  263. target = import_target_module("target", pydevs_target)
  264. # Check if there is an input file
  265. input_file = os.path.join(self.directory, self.config["input"])
  266. if not os.path.exists(input_file):
  267. input_file = None
  268. # Initialize the to be tested model
  269. test_model = target.Controller("Controller")
  270. # Add a new atomicDEVS to it which will be the Tester and link it to the model to sent events
  271. tester = test_model.addSubModel(DEVSTesterUnit.TesterUnit("Tester", input_file))
  272. # Connect the outports of the tester to the global inports of the model
  273. # Now directly connected to the atomicDEVS input because out/in relation can not be of coupled (SEE DEVS.PY)
  274. for in_port in test_model.IPorts:
  275. test_output = tester.addOutPort(f"Test_{in_port.name}")
  276. for atomic in test_model.atomics:
  277. test_model.connectPorts(test_output, atomic.input)
  278. # Connect the private ports (with their general names)
  279. for an_atomic in test_model.atomics:
  280. for private_port in an_atomic.IPorts:
  281. if private_port.name != 'obj_manager_in' and private_port.name != 'input':
  282. test_output = tester.addOutPort(f"Test_{private_port.name}")
  283. test_model.connectPorts(test_output, private_port)
  284. sim = DEVSSimulator(test_model)
  285. sim.setRealTime(False)
  286. # Add the verbose tracer to add traces to .txt file
  287. log_file_path = os.path.join(self.directory, "ClassicDEVS", "log.txt")
  288. sim.setVerbose(log_file_path)
  289. # Start the simulation correctly
  290. sim.setClassicDEVS()
  291. sim.simulate()
  292. def extract_globalio(self, line, context):
  293. event_pattern = re.compile(r'^\s*\(event name:.*\)$')
  294. event_match = event_pattern.match(line)
  295. if event_match and context["time"] is not None:
  296. event = line.strip()
  297. # Remove everything before '(' in each string
  298. event = event[event.index('('):]
  299. return f"{context["time"]:.2f} {event}"
  300. return None
  301. def extract_internalio(self, line, context):
  302. if (context["extra_info"] == "obj_manager_out" and context['transition_type'] == 'internal'):
  303. # Pattern to match the outermost '(event name: ...)' including a single nested '(event name: ...)'
  304. event_pattern = re.compile(r'\(event name:.*?(?:\(event name:.*?\))?.*?\)')
  305. event_match = re.search(event_pattern, line)
  306. if event_match and context["time"] is not None:
  307. event = event_match.group(0)
  308. # quick fix
  309. if "event name: narrow_cast" in event:
  310. event += "])"
  311. return f"{context["time"]:.2f} {event}"
  312. elif (context["extra_info"] == "obj_manager_in" and context['transition_type'] == 'external') and ("<narrow_cast>" in line):
  313. event_pattern = r'\(event name.*?\)'
  314. event_matches = re.findall(event_pattern, line)
  315. if event_matches and context["time"] is not None:
  316. if len(event_matches) > 1:
  317. pass
  318. # Get the last match
  319. event = event_matches[-1]
  320. return f"{context['time']:.2f} {event}"
  321. return None
  322. def extract_statechart(self, line, context):
  323. if line != "\n" and "\t\tNew State:" not in line:
  324. if "TRANSITION FIRED" in line:
  325. context["extra_info"] = "transition"
  326. elif "EXIT STATE" in line:
  327. context["extra_info"] = "exit"
  328. elif "ENTER STATE" in line:
  329. context["extra_info"] = "enter"
  330. else:
  331. if context["extra_info"] == "transition":
  332. line = line[line.index('('):line.rfind(')')+1]
  333. line = line.replace('\n', '')
  334. line = line.replace('\t', '') # Remove all tab character
  335. else:
  336. line = line[line.index('/'):line.rfind(')')]
  337. line = line.replace('\n', '')
  338. line = line.replace('\t', '') # Remove all tab characters
  339. return f"{context["time"]:.2f} {context['model']}: {context["extra_info"]} {line}"
  340. return None
  341. def check_state(self, line, context):
  342. if "EXTERNAL TRANSITION" in line:
  343. # Use regular expressions to find everything between <>
  344. pattern = r"<(.*?)>"
  345. # Search for the pattern in the string
  346. match = re.search(pattern, line)
  347. context = {
  348. "time": context["time"],
  349. "transition_type": "external",
  350. "model": match.group(1),
  351. "context": None,
  352. "extra_info": None
  353. }
  354. elif "INTERNAL TRANSITION" in line:
  355. # Use regular expressions to find everything between <>
  356. pattern = r"<Controller.(.*?)>"
  357. # Search for the pattern in the string
  358. match = re.search(pattern, line)
  359. context = {
  360. "time": context["time"],
  361. "transition_type": "internal",
  362. "model": match.group(1),
  363. "context": None,
  364. "extra_info": None
  365. }
  366. elif "New State:" in line:
  367. context["context"] = "state"
  368. elif "Input Port Configuration:" in line:
  369. context["context"] = "input"
  370. elif "Output Port Configuration:" in line:
  371. context["context"] = "output"
  372. elif "Next scheduled internal transition at time" in line:
  373. context["context"] = "next"
  374. elif context["context"] == "input" and "obj_manager_in" in line:
  375. context["extra_info"] = "obj_manager_in"
  376. elif context["context"] == "output" and "obj_manager_out" in line:
  377. context["extra_info"] = "obj_manager_out"
  378. elif context["context"] == "output" and "port <" in line:
  379. context["extra_info"] = "other_port"
  380. return context
  381. def filter(self):
  382. log = os.path.join(self.directory, "ClassicDEVS", "log.txt")
  383. output_events = []
  384. current_time = None
  385. context = {
  386. "time": None,
  387. "transition_type": None,
  388. "model": None,
  389. "context": None,
  390. "extra_info": None
  391. }
  392. with open(log, 'r') as file:
  393. lines = file.readlines()
  394. time_pattern = re.compile(r"__ Current Time: +([\d\.]+) +__________________________________________")
  395. for line in lines:
  396. time_match = time_pattern.match(line)
  397. if time_match:
  398. context["time"] = float(time_match.group(1))
  399. context = self.check_state(line, context)
  400. if "external" in self.config["trace"]:
  401. io_event = self.extract_globalio(line, context)
  402. if io_event is not None:
  403. output_events.append(io_event)
  404. if "internal" in self.config["trace"]:
  405. internal_event = self.extract_internalio(line, context)
  406. if internal_event is not None:
  407. output_events.append(internal_event)
  408. if "statechart" in self.config["trace"]:
  409. if context['context'] == "state":
  410. statechart_event = self.extract_statechart(line, context)
  411. if statechart_event is not None:
  412. output_events.append(statechart_event)
  413. return output_events