dso.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. from data_models.mqtt_message import MqttMessage
  2. from pypdevs.DEVS import AtomicDEVS
  3. from pypdevs.infinity import INFINITY
  4. from dataclasses import dataclass
  5. from loguru import logger
  6. from data_models.workpiece import Workpiece
  7. from utils.get_timestamp import get_timestamp
  8. from utils.timed_phase_enum import TimedPhaseEnum
  9. class DsoPhase(TimedPhaseEnum):
  10. """ Steps in order, along with their timings """
  11. IDLE = ('IDLE', INFINITY)
  12. WP_RECEIVED = ('WP_RECEIVED', 0.2) # delay before sending a message
  13. AWAIT_PICKUP = ('AWAIT_PICKUP', INFINITY) # await pickup by user
  14. PICKUP = ('PICKUP', 0.0) # immediately handover wp to collector
  15. @dataclass
  16. class DsoState:
  17. delta_t: float = INFINITY
  18. phase: DsoPhase = DsoPhase.IDLE
  19. workpiece: Workpiece | None = None
  20. status_requested = True # if true, publish status
  21. visual_update_pending: bool = False
  22. class DSO(AtomicDEVS):
  23. """ Output bay which holds a single workpiece """
  24. def __init__(self, name: str):
  25. super(DSO, self).__init__(name)
  26. self.inp = self.addInPort("inp")
  27. self.out = self.addOutPort("out")
  28. self.mqtt_in = self.addInPort("mqtt_in") # to receive 'pickup' simulation message to clear output
  29. self.mqtt_out = self.addOutPort("mqtt_out")
  30. self.state = DsoState()
  31. def change_phase(self, new_phase: DsoPhase):
  32. """ Wrapper for changing the phase and time associated with it, helps with logging """
  33. self.state.phase = new_phase
  34. self.state.delta_t = new_phase.timing
  35. logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}")
  36. self.state.visual_update_pending = True
  37. def get_visual_update_data(self) -> MqttMessage:
  38. """ Get visual update data for the animation, contains the action taken and the duration of that action left """
  39. message = MqttMessage()
  40. message.topic = "visualization/dso"
  41. duration = self.state.delta_t
  42. if duration == INFINITY: duration = None
  43. message.payload = {
  44. "action": self.state.phase.value,
  45. "duration": duration, # should be equal to the timing of the phase
  46. "workpiece": self.state.workpiece.to_dict() if self.state.workpiece else None,
  47. }
  48. return message
  49. def get_status_message(self, use_next_phase=True) -> MqttMessage:
  50. """ Get the status message for the input bay """
  51. phase = self.state.phase.next() if use_next_phase else self.state.phase
  52. active = 1 if (self.state.workpiece is not None) else 0
  53. code = 1 if (phase == DsoPhase.IDLE) else 0 # TODO: figure out this logic
  54. message = MqttMessage()
  55. message.topic = "f/i/state/dso"
  56. message.payload = {
  57. "active": active,
  58. "code": code,
  59. "description": '',
  60. "station": 'dso',
  61. "ts": get_timestamp(),
  62. }
  63. return message
  64. def extTransition(self, inputs):
  65. self.state.delta_t -= self.elapsed
  66. if self.inp in inputs:
  67. if not self.state.phase == DsoPhase.IDLE:
  68. logger.error(f"{type(self).__name__} '{self.name}' received workpiece while not Idle!")
  69. return self.state
  70. wp = inputs[self.inp][0]
  71. logger.trace(f"{type(self).__name__} '{self.name}' received: {wp}")
  72. self.change_phase(DsoPhase.WP_RECEIVED)
  73. self.state.workpiece = wp
  74. if self.mqtt_in in inputs:
  75. msg: MqttMessage = inputs[self.mqtt_in][0]
  76. if msg.topic == "simulation/ctrl/dso" and msg.payload["action"] == "clear":
  77. self.change_phase(DsoPhase.PICKUP)
  78. elif msg.topic == "simulation/ctrl/all" and msg.payload.get("action") == "refresh":
  79. self.state.status_requested = True
  80. self.state.visual_update_pending = True
  81. return self.state
  82. return self.state # important, return state
  83. def timeAdvance(self):
  84. if self.state.visual_update_pending or self.state.status_requested:
  85. return 0.0
  86. return self.state.delta_t
  87. def outputFnc(self):
  88. if self.state.visual_update_pending:
  89. return {self.mqtt_out: [self.get_visual_update_data()]}
  90. if self.state.status_requested or self.state.phase == DsoPhase.WP_RECEIVED:
  91. return {self.mqtt_out: [self.get_status_message(use_next_phase=False)]}
  92. if self.state.phase == DsoPhase.PICKUP:
  93. return {self.out: [self.state.workpiece]} # output the workpiece to the collector
  94. return {}
  95. def intTransition(self):
  96. if self.state.visual_update_pending:
  97. self.state.visual_update_pending = False
  98. return self.state
  99. if self.state.status_requested:
  100. self.state.status_requested = False # if requested, we've handled, so unset
  101. return self.state
  102. if self.state.phase == DsoPhase.PICKUP:
  103. self.state.workpiece = None # clear the workpiece after handing it over
  104. # Transition to next state
  105. self.change_phase(self.state.phase.next())
  106. return self.state