Tester.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. import re
  2. from pypdevs.DEVS import *
  3. from pypdevs.simulator import Simulator
  4. from sccd.runtime.DEVS_statecharts_core import Event
  5. from pypdevs.infinity import INFINITY
  6. def parse_event(line):
  7. # Regular expression to match the desired parts of the line
  8. pattern = re.compile(r'\(event name: (.*?); port: (.*?); parameters: (.*?)\)$')
  9. match = pattern.match(line)
  10. if match:
  11. event_name = match.group(1)
  12. port = match.group(2)
  13. parameters = match.group(3)
  14. return event_name, port, parameters
  15. else:
  16. raise ValueError(f"Line format is incorrect: {line}")
  17. class TesterUnit(AtomicDEVS):
  18. def __init__(self, name, inputfile=None):
  19. AtomicDEVS.__init__(self, name)
  20. self.events = []
  21. if inputfile is not None:
  22. with open(inputfile, 'r') as file:
  23. lines = file.readlines()
  24. self.events = [line.strip() for line in lines]
  25. self.total_time = 0
  26. self.next_event_time = 0
  27. self.to_send = []
  28. self.first = True
  29. def extTransition(self, inputs):
  30. pass
  31. def intTransition(self):
  32. self.total_time += self.next_event_time
  33. self.to_send = []
  34. if self.events:
  35. event = self.events[0]
  36. space_pos = event.find(' ')
  37. if space_pos == -1:
  38. raise ValueError("Line format is incorrect. No space found to split time and event.")
  39. # Extract the time and event parts
  40. self.next_event_time = float(event[:space_pos]) - self.total_time
  41. event_part = event[space_pos + 1:].strip() # Strip to remove any leading/trailing whitespace
  42. name, port, parameters = parse_event(event_part)
  43. actual_event = Event(name, port, parameters)
  44. self.to_send.append(actual_event)
  45. self.events = self.events[1:]
  46. else:
  47. self.next_event_time = INFINITY
  48. def outputFnc(self):
  49. to_output = {}
  50. for event in self.to_send:
  51. for port in self.OPorts:
  52. if f"Test_{event.port}" == port.name:
  53. string_event = f"Event(\"{event.name}\",\"{event.port}\",{event.parameters})"
  54. #to_output[port] = event
  55. to_output[port] = [string_event]
  56. return to_output
  57. def timeAdvance(self):
  58. return self.next_event_time