sld.py 10.0 KB


  1. from dataclasses import dataclass
  2. from loguru import logger
  3. from data_models.mqtt_message import MqttMessage
  4. from data_models.workpiece import Workpiece, WorkpieceColor
  5. from devs_models.sld_holding_bay import HoldingBay
  6. from pypdevs.DEVS import AtomicDEVS, CoupledDEVS
  7. from pypdevs.infinity import INFINITY
  8. from utils.get_timestamp import get_timestamp
  9. from utils.timed_phase_enum import TimedPhaseEnum
  10. class SLDPhase(TimedPhaseEnum):
  11. """ Steps in order, along with their timings """
  12. IDLE = ('IDLE', INFINITY)
  13. # sorting is happening, conveyor with 3 checkpoints (c123) white/red/blue
  14. SORTING_C1_WHITE = ('SORTING_C1_WHITE', 5.97)
  15. SORTING_C2_RED = ('SORTING_C2_RED', 1.467)
  16. SORTING_C3_BLUE = ('SORTING_C3_BLUE', 1.501)
  17. # eject to output bay
  18. EJECT = ('EJECT', 0.300)
  19. # 'delay' until pusher retracted and wp marked as 'sorted'
  20. SORTED = ('SORTED', 0.463)
  21. @dataclass
  22. class SLDState:
  23. delta_t: float = INFINITY
  24. phase: SLDPhase = SLDPhase.IDLE
  25. workpiece: Workpiece | None = None
  26. status_requested: bool = True # whether status is requested -> output status immediately
  27. last_sorted_color: WorkpieceColor | None = None # color of the wp we just sorted
  28. visual_update_pending: bool = False
  29. class SLDConveyor(AtomicDEVS):
  30. """ SLD Sorting Line with color Detection, which sorts the workpiece in the appropriate area """
  31. def __init__(self, name: str):
  32. super(SLDConveyor, self).__init__(name)
  33. self.inp = self.addInPort("inp")
  34. self.mqtt_in = self.addInPort("mqtt_in") # to receive refresh status request
  35. # Out ports to holding bays for all colors
  36. self.out_white = self.addOutPort("out_white")
  37. self.out_red = self.addOutPort("out_red")
  38. self.out_blue = self.addOutPort("out_blue")
  39. self.mqtt_out = self.addOutPort("mqtt_out")
  40. self.state = SLDState()
  41. def change_phase(self, new_phase: SLDPhase):
  42. """ Wrapper for changing the phase and time associated with it, helps with logging """
  43. self.state.phase = new_phase
  44. self.state.delta_t = new_phase.timing
  45. logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}")
  46. if new_phase == SLDPhase.EJECT:
  47. self.state.last_sorted_color = self.state.workpiece.color
  48. self.state.visual_update_pending = True
  49. def get_visual_update_data(self) -> MqttMessage:
  50. """ Get visual update data for the animation, contains the action taken and the duration of that action left """
  51. message = MqttMessage()
  52. message.topic = "visualization/sld"
  53. duration = self.state.delta_t
  54. if duration == INFINITY: duration = None
  55. message.payload = {
  56. "action": self.state.phase.value,
  57. "duration": duration, # should be equal to the timing of the phase
  58. "workpiece": self.state.workpiece.to_dict() if self.state.workpiece else None,
  59. "last_sorted_color": self.state.last_sorted_color.value if self.state.last_sorted_color else None,
  60. }
  61. return message
  62. def _is_active(self):
  63. """ Whether the SLDConveyor is in an active phase (!= IDLE) """
  64. return self.state.phase != SLDPhase.IDLE
  65. def get_status_message(self, use_next_phase=True) -> MqttMessage:
  66. phase = self.state.phase.next() if use_next_phase else self.state.phase
  67. active = 0 if phase == SLDPhase.IDLE else 1
  68. code = 2 if active == 1 else 1
  69. message = MqttMessage()
  70. message.topic = "f/i/state/sld"
  71. message.payload = {
  72. "active": active,
  73. "code": code,
  74. "description": '',
  75. "station": 'sld',
  76. "ts": get_timestamp(),
  77. }
  78. return message
  79. def get_sld_sorted_ack(self) -> MqttMessage:
  80. """ Get the MQTT message which denotes that this SLD has sorted a workpiece, and it is ready for pick-up """
  81. message = MqttMessage()
  82. message.topic = "fl/sld/ack"
  83. message.payload = {
  84. "code": 2,
  85. "colorValue": 777, # TODO: figure out how to determine
  86. "ts": get_timestamp(),
  87. "type": self.state.last_sorted_color.value
  88. }
  89. return message
  90. def extTransition(self, inputs):
  91. self.state.delta_t -= self.elapsed
  92. if self.mqtt_in in inputs:
  93. # Handle MQTT input, which is used for visual update requests
  94. mqtt_message = inputs[self.mqtt_in][0]
  95. if mqtt_message.topic == "simulation/ctrl/all" and mqtt_message.payload.get("action") == "refresh":
  96. self.state.status_requested = True
  97. self.state.visual_update_pending = True
  98. return self.state
  99. elif not self.state.phase == SLDPhase.IDLE:
  100. logger.error(f"{type(self).__name__} '{self.name}' received input while not expecting: {inputs}")
  101. return self.state
  102. elif self.inp in inputs:
  103. wp: Workpiece = inputs[self.inp][0]
  104. self.state.workpiece = wp
  105. self.change_phase(SLDPhase.SORTING_C1_WHITE)
  106. self.state.status_requested = True # request status message
  107. return self.state # important, return state
  108. def timeAdvance(self):
  109. if self.state.visual_update_pending or self.state.status_requested:
  110. return 0.0
  111. return self.state.delta_t
  112. def outputFnc(self):
  113. if self.state.visual_update_pending:
  114. return {self.mqtt_out: [self.get_visual_update_data()]}
  115. if self.state.status_requested:
  116. return {self.mqtt_out: [self.get_status_message(use_next_phase=False)]} # output status
  117. if self.state.phase == SLDPhase.EJECT:
  118. # Output depending on color
  119. output_mapping = {
  120. WorkpieceColor.WHITE: self.out_white,
  121. WorkpieceColor.RED: self.out_red,
  122. WorkpieceColor.BLUE: self.out_blue,
  123. }
  124. target = self.state.workpiece.color
  125. if target in output_mapping:
  126. return {output_mapping[target]: [self.state.workpiece]}
  127. else:
  128. logger.error(f"{type(self).__name__} '{self.name}' unknown sorting target: {target}")
  129. if self.state.phase == SLDPhase.SORTED:
  130. return {self.mqtt_out: [self.get_status_message(), self.get_sld_sorted_ack()]}
  131. return {}
  132. def intTransition(self):
  133. if self.state.visual_update_pending:
  134. self.state.visual_update_pending = False
  135. return self.state
  136. if self.state.status_requested:
  137. self.state.status_requested = False # reset
  138. return self.state
  139. # EJECT workpiece when its color matches the checkpoint for the target
  140. if self.state.workpiece:
  141. if self.state.workpiece.color == WorkpieceColor.WHITE and self.state.phase == SLDPhase.SORTING_C1_WHITE:
  142. self.change_phase(SLDPhase.EJECT)
  143. return self.state
  144. elif self.state.workpiece.color == WorkpieceColor.RED and self.state.phase == SLDPhase.SORTING_C2_RED:
  145. self.change_phase(SLDPhase.EJECT)
  146. return self.state
  147. elif self.state.workpiece.color == WorkpieceColor.BLUE and self.state.phase == SLDPhase.SORTING_C3_BLUE:
  148. self.change_phase(SLDPhase.EJECT) # this one is redundant, the next phase is already EJECT
  149. return self.state
  150. elif self.state.phase == SLDPhase.EJECT:
  151. self.state.workpiece = None # remove the workpiece we just output
  152. # Transition to the next phase
  153. self.change_phase(self.state.phase.next())
  154. return self.state
  155. class SLD(CoupledDEVS):
  156. """ SLD: Sorting Line with Color detection
  157. A Coupled model which combines the sorting line conveyor with 3 colored output bays
  158. """
  159. def __init__(self, name: str):
  160. super(SLD, self).__init__(name)
  161. # Components
  162. self.sld_conveyor: SLDConveyor = self.addSubModel(SLDConveyor(f"{name}_conveyor"))
  163. self.white_bay: HoldingBay = self.addSubModel(HoldingBay(f"{name}_white_bay", "WHITE"))
  164. self.red_bay: HoldingBay = self.addSubModel(HoldingBay(f"{name}_red_bay", "RED"))
  165. self.blue_bay: HoldingBay = self.addSubModel(HoldingBay(f"{name}_blue_bay", "BLUE"))
  166. # Own ports
  167. self.inp = self.addInPort("inp") # SLD conveyor input
  168. self.mqtt_in = self.addInPort("mqtt_in") # for refresh requests
  169. self.white_in = self.addInPort("white_in") # vgr requesting pickup
  170. self.red_in = self.addInPort("red_in") # vgr requesting pickup
  171. self.blue_in = self.addInPort("blue_in") # vgr requesting pickup
  172. self.white_out = self.addOutPort("white_out")
  173. self.red_out = self.addOutPort("red_out")
  174. self.blue_out = self.addOutPort("blue_out")
  175. self.mqtt_out = self.addOutPort("mqtt_out")
  176. # Connect internal ports
  177. self.connectPorts(self.inp, self.sld_conveyor.inp)
  178. self.connectPorts(self.sld_conveyor.out_white, self.white_bay.sld_in)
  179. self.connectPorts(self.sld_conveyor.out_red, self.red_bay.sld_in)
  180. self.connectPorts(self.sld_conveyor.out_blue, self.blue_bay.sld_in)
  181. self.connectPorts(self.white_in, self.white_bay.vgr_in)
  182. self.connectPorts(self.red_in, self.red_bay.vgr_in)
  183. self.connectPorts(self.blue_in, self.blue_bay.vgr_in)
  184. self.connectPorts(self.white_bay.vgr_out, self.white_out)
  185. self.connectPorts(self.red_bay.vgr_out, self.red_out)
  186. self.connectPorts(self.blue_bay.vgr_out, self.blue_out)
  187. # Connect MQTT ports
  188. self.connectPorts(self.mqtt_in, self.sld_conveyor.mqtt_in)
  189. self.connectPorts(self.mqtt_in, self.white_bay.mqtt_in)
  190. self.connectPorts(self.mqtt_in, self.red_bay.mqtt_in)
  191. self.connectPorts(self.mqtt_in, self.blue_bay.mqtt_in)
  192. self.connectPorts(self.sld_conveyor.mqtt_out, self.mqtt_out)
  193. self.connectPorts(self.white_bay.mqtt_out, self.mqtt_out)
  194. self.connectPorts(self.red_bay.mqtt_out, self.mqtt_out)
  195. self.connectPorts(self.blue_bay.mqtt_out, self.mqtt_out)