dso.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. from data_models.mqtt_message import MqttMessage
  2. from pypdevs.DEVS import AtomicDEVS
  3. from pypdevs.infinity import INFINITY
  4. from dataclasses import dataclass
  5. from loguru import logger
  6. from data_models.workpiece import Workpiece
  7. from utils.get_timestamp import get_timestamp
  8. from utils.timed_phase_enum import TimedPhaseEnum
  9. class DsoPhase(TimedPhaseEnum):
  10. """ Steps in order, along with their timings """
  11. IDLE = ('IDLE', INFINITY)
  12. WP_RECEIVED = ('WP_RECEIVED', 0.2) # delay before sending a message
  13. AWAIT_PICKUP = ('AWAIT_PICKUP', INFINITY) # await pickup by user
  14. @dataclass
  15. class DsoState:
  16. delta_t: float = INFINITY
  17. phase: DsoPhase = DsoPhase.IDLE
  18. workpiece: Workpiece | None = None
  19. status_requested = True # if true, publish status
  20. visual_update_pending: bool = False
  21. class DSO(AtomicDEVS):
  22. """ Output bay which holds a single workpiece """
  23. def __init__(self, name: str):
  24. super(DSO, self).__init__(name)
  25. self.inp = self.addInPort("inp")
  26. self.out = self.addOutPort("out")
  27. self.mqtt_in = self.addInPort("mqtt_in") # to receive 'pickup' simulation message to clear output
  28. self.mqtt_out = self.addOutPort("mqtt_out")
  29. self.state = DsoState()
  30. def change_phase(self, new_phase: DsoPhase):
  31. """ Wrapper for changing the phase and time associated with it, helps with logging """
  32. self.state.phase = new_phase
  33. self.state.delta_t = new_phase.timing
  34. logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}")
  35. self.state.visual_update_pending = True
  36. def get_visual_update_data(self) -> MqttMessage:
  37. """ Get visual update data for the animation, contains the action taken and the duration of that action left """
  38. message = MqttMessage()
  39. message.topic = "visualization/dso"
  40. duration = self.state.delta_t
  41. if duration == INFINITY: duration = None
  42. message.payload = {
  43. "action": self.state.phase.value,
  44. "duration": duration, # should be equal to the timing of the phase
  45. "workpiece": self.state.workpiece.to_dict() if self.state.workpiece else None,
  46. }
  47. return message
  48. def get_status_message(self, use_next_phase=True) -> MqttMessage:
  49. """ Get the status message for the input bay """
  50. phase = self.state.phase.next() if use_next_phase else self.state.phase
  51. active = 1 if (self.state.workpiece is not None) else 0
  52. code = 1 if (phase == DsoPhase.IDLE) else 0 # TODO: figure out this logic
  53. message = MqttMessage()
  54. message.topic = "f/i/state/dso"
  55. message.payload = {
  56. "active": active,
  57. "code": code,
  58. "description": '',
  59. "station": 'dso',
  60. "ts": get_timestamp(),
  61. }
  62. return message
  63. def extTransition(self, inputs):
  64. self.state.delta_t -= self.elapsed
  65. if self.inp in inputs:
  66. if not self.state.phase == DsoPhase.IDLE:
  67. logger.error(f"{type(self).__name__} '{self.name}' received workpiece while not Idle!")
  68. return self.state
  69. wp = inputs[self.inp][0]
  70. logger.trace(f"{type(self).__name__} '{self.name}' received: {wp}")
  71. self.change_phase(DsoPhase.WP_RECEIVED)
  72. self.state.workpiece = wp
  73. if self.mqtt_in in inputs:
  74. msg: MqttMessage = inputs[self.mqtt_in][0]
  75. if msg.topic == "simulation/ctrl/dso" and msg.payload["action"] == "clear":
  76. self.state.workpiece = None
  77. self.change_phase(DsoPhase.IDLE)
  78. self.state.status_requested = True
  79. # TODO: output to collector ?
  80. elif msg.topic == "simulation/ctrl/all" and msg.payload.get("action") == "refresh":
  81. self.state.status_requested = True
  82. self.state.visual_update_pending = True
  83. return self.state
  84. return self.state # important, return state
  85. def timeAdvance(self):
  86. if self.state.visual_update_pending or self.state.status_requested:
  87. return 0.0
  88. return self.state.delta_t
  89. def outputFnc(self):
  90. if self.state.visual_update_pending:
  91. return {self.mqtt_out: [self.get_visual_update_data()]}
  92. if self.state.status_requested or self.state.phase == DsoPhase.WP_RECEIVED:
  93. return {self.mqtt_out: [self.get_status_message(use_next_phase=False)]}
  94. return {}
  95. def intTransition(self):
  96. if self.state.visual_update_pending:
  97. self.state.visual_update_pending = False
  98. return self.state
  99. if self.state.status_requested:
  100. self.state.status_requested = False # if requested, we've handled, so unset
  101. return self.state
  102. # Transition to next state
  103. self.change_phase(self.state.phase.next())
  104. return self.state