DEVS_loop.py 1023 B

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. from pypdevs.simulator import Simulator
  2. import re
  3. def get_port(text):
  4. match = re.search(r'private_\d+_(\w+)', text)
  5. if match:
  6. result = match.group(1)
  7. return result
  8. else:
  9. return text
  10. class DEVSSimulator(Simulator):
  11. def __init__(self, model, inputs={}):
  12. super().__init__(model)
  13. # TODO: Add the input ports here so it works without manually adding them
  14. inputs ={}
  15. # Add global inports
  16. for global_in in model.IPorts:
  17. inputs[global_in.name] = global_in
  18. # Add private ports (can't send to it unless it knows the id)
  19. for aclass in model.atomics:
  20. pass
  21. self.setRealTimePorts(inputs)
  22. def addInput(self, event):
  23. port_name = get_port(event.port)
  24. event_string = f"Event(\"{event.name}\",\"{event.port}\",{event.parameters})"
  25. event_string = event_string.replace(" ", "")
  26. interrupt_string = f"{port_name} {event_string}"
  27. #event_string = f"Event(\"{event.name}\",\"{"ui"}\",{event.parameters})"
  28. #interrupt_string = f"ui {event_string}"
  29. self.realtime_interrupt(interrupt_string)