mpo_oven.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. from pypdevs.DEVS import AtomicDEVS
  2. from pypdevs.infinity import INFINITY
  3. from dataclasses import dataclass
  4. from loguru import logger
  5. from utils.timed_phase_enum import TimedPhaseEnum
  6. from data_models.workpiece import Workpiece
  7. from data_models.mqtt_message import MqttMessage
  8. from utils.get_timestamp import get_timestamp
  9. class BakeOvenSteps(TimedPhaseEnum):
  10. """ Group steps together with the time it takes to complete them"""
  11. IDLE = ('IDLE' ,INFINITY)
  12. DETECTION = ('DETECTION', 14.2565) # artificial delay in starting
  13. OPEN_DOOR = ('OPEN_DOOR', 2.584)
  14. SLIDE_ENTER = ('SLIDE_ENTER', 2.758)
  15. CLOSE_DOOR = ('CLOSE_DOOR', 0.994) # close doors & notify gripper to come
  16. BAKE = ('BAKE', 6.513)
  17. OPEN_DOOR_EXIT = ('OPEN_DOOR_EXIT', 0.280)
  18. SLIDE_EXIT = ('SLIDE_EXIT', 2.568)
  19. AWAIT_PICKUP = ('AWAIT_PICKUP', INFINITY)
  20. OUTPUT_WP = ('OUTPUT_WP', 0.0) # output workpiece to the gripper
  21. CLOSE_DOOR_QUIT = ('CLOSE_DOOR_QUIT', 4.0) # this seems to happen after a very long delay? maybe not intentional
  22. @dataclass
  23. class BakeOvenState:
  24. delta_t: float = INFINITY
  25. phase: BakeOvenSteps = BakeOvenSteps.IDLE
  26. workpiece: Workpiece | None = None
  27. status_requested: bool = True # if current status needs to be sent
  28. visual_update_pending: bool = False
  29. def get_mpo_started_msg() -> MqttMessage:
  30. """ Get the 'MPO_STARTED' mqtt message """
  31. message = MqttMessage()
  32. message.topic = "fl/mpo/ack"
  33. message.payload = {
  34. "code": 1,
  35. "ts": get_timestamp(),
  36. }
  37. return message
  38. class MPOOven(AtomicDEVS):
  39. """ A bake oven which bakes a workpiece. Has a slide to pull it in, and a door to open/close """
  40. def __init__(self, name: str):
  41. # name needs to be unique to refer to it
  42. super(MPOOven, self).__init__(name)
  43. self.vgr_in = self.addInPort("vgr_in")
  44. # gripper is transport between oven and table saw
  45. self.gripper_in = self.addInPort("gripper_in")
  46. self.gripper_out = self.addOutPort("gripper_out")
  47. self.mqtt_in = self.addInPort("mqtt_in") # input for MQTT messages (e.g. visual refresh request)
  48. self.mqtt_out = self.addOutPort("mqtt_out")
  49. self.state = BakeOvenState()
  50. def change_phase(self, new_phase: BakeOvenSteps):
  51. """ Wrapper for changing the phase and time associated with it, helps with logging """
  52. self.state.phase = new_phase
  53. self.state.delta_t = new_phase.timing
  54. logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}")
  55. self.state.visual_update_pending = True
  56. def get_visual_update_data(self) -> MqttMessage:
  57. """ Get visual update data for the animation, contains the action taken and the duration of that action left """
  58. message = MqttMessage()
  59. message.topic = "visualization/mpo/oven"
  60. duration = self.state.delta_t
  61. if duration == INFINITY: duration = None
  62. message.payload = {
  63. "action": self.state.phase.value,
  64. "duration": duration, # should be equal to the timing of the phase
  65. "workpiece": self.state.workpiece.to_dict() if self.state.workpiece else None,
  66. }
  67. return message
  68. def get_status_message(self, use_next_phase=True) -> MqttMessage:
  69. phase = self.state.phase.next() if use_next_phase else self.state.phase
  70. active = 0 if (phase == BakeOvenSteps.IDLE) else 1
  71. code = 2 if active == 1 else 1
  72. message = MqttMessage()
  73. message.topic = "f/i/state/mpo"
  74. message.payload = {
  75. "active": active,
  76. "code": code,
  77. "description": '',
  78. "station": 'mpo',
  79. "ts": get_timestamp(),
  80. }
  81. return message
  82. def extTransition(self, inputs):
  83. self.state.delta_t -= self.elapsed
  84. if self.mqtt_in in inputs:
  85. msg = inputs[self.mqtt_in][0]
  86. if msg.topic == "simulation/ctrl/all" and msg.payload.get("action") == "refresh":
  87. self.state.status_requested = True
  88. self.state.visual_update_pending = True
  89. return self.state
  90. logger.trace(f"BakeOven '{self.name}' received: {inputs}")
  91. if not (self.state.phase == BakeOvenSteps.IDLE or self.state.phase == BakeOvenSteps.AWAIT_PICKUP):
  92. logger.error(f"{type(self).__name__} '{self.name}' received workpiece whilst not IDLE, workpiece will be dropped")
  93. return self.state
  94. elif self.vgr_in in inputs:
  95. new_workpiece = inputs[self.vgr_in][0]
  96. logger.trace(f"BakeOven '{self.name}' received: {new_workpiece}")
  97. self.state.workpiece = new_workpiece
  98. # update step
  99. self.change_phase(BakeOvenSteps.DETECTION)
  100. elif self.gripper_in in inputs:
  101. # gripper is here to pick up the workpiece
  102. self.change_phase(BakeOvenSteps.OUTPUT_WP)
  103. return self.state # important, return state
  104. def timeAdvance(self):
  105. if self.state.visual_update_pending or self.state.status_requested:
  106. return 0.0
  107. return self.state.delta_t
  108. def outputFnc(self):
  109. if self.state.visual_update_pending:
  110. return {self.mqtt_out: [self.get_visual_update_data()]}
  111. if self.state.status_requested:
  112. return {self.mqtt_out: [self.get_status_message(use_next_phase=False)]}
  113. if self.state.phase == BakeOvenSteps.DETECTION:
  114. return {self.mqtt_out: [get_mpo_started_msg(), self.get_status_message()]} # output that we've started
  115. elif self.state.phase == BakeOvenSteps.CLOSE_DOOR:
  116. return {self.gripper_out: {}, self.mqtt_out: [self.get_status_message()]} # signal gripper to come here
  117. elif self.state.phase == BakeOvenSteps.OUTPUT_WP:
  118. return {self.gripper_out: [self.state.workpiece], self.mqtt_out: [self.get_status_message()]}
  119. return {self.mqtt_out: [self.get_status_message()]}
  120. def intTransition(self):
  121. if self.state.visual_update_pending:
  122. self.state.visual_update_pending = False
  123. return self.state
  124. if self.state.status_requested:
  125. self.state.status_requested = False
  126. return self.state # no transition
  127. if self.state.phase == BakeOvenSteps.OUTPUT_WP:
  128. self.state.workpiece = None # sent to output
  129. elif self.state.phase == BakeOvenSteps.BAKE:
  130. self.state.workpiece.state = "BAKED" # TODO: change
  131. # Go to the next phase
  132. self.change_phase(self.state.phase.next())
  133. return self.state