sld.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. from dataclasses import dataclass
  2. from loguru import logger
  3. from data_models.mqtt_message import MqttMessage
  4. from data_models.workpiece import Workpiece, WorkpieceColor
  5. from devs_models.holding_bay import HoldingBay
  6. from pypdevs.DEVS import AtomicDEVS, CoupledDEVS
  7. from pypdevs.infinity import INFINITY
  8. from utils.get_timestamp import get_timestamp
  9. from utils.timed_phase_enum import TimedPhaseEnum
  10. class SLDPhase(TimedPhaseEnum):
  11. """ Steps in order, along with their timings """
  12. IDLE = ('IDLE', INFINITY)
  13. # sorting is happening, conveyor with 3 checkpoints (c123) white/red/blue
  14. SORTING_C1_WHITE = ('SORTING_C1_WHITE', 5.97)
  15. SORTING_C2_RED = ('SORTING_C2_RED', 1.467)
  16. SORTING_C3_BLUE = ('SORTING_C3_BLUE', 1.501)
  17. # eject to output bay
  18. EJECT = ('EJECT', 0.300)
  19. # 'delay' until pusher retracted and wp marked as 'sorted'
  20. SORTED = ('SORTED', 0.463)
  21. @dataclass
  22. class SLDState:
  23. delta_t: float = INFINITY
  24. phase: SLDPhase = SLDPhase.IDLE
  25. workpiece: Workpiece | None = None
  26. status_requested: bool = True # whether status is requested -> output status immediately
  27. last_sorted_color: WorkpieceColor | None = None # color of the wp we just sorted
  28. visual_update_pending: bool = False
  29. class SLDConveyor(AtomicDEVS):
  30. """ SLD Sorting Line with color Detection, which sorts the workpiece in the appropriate area """
  31. def __init__(self, name: str):
  32. super(SLDConveyor, self).__init__(name)
  33. self.inp = self.addInPort("inp")
  34. # Out ports to holding bays for all colors
  35. self.out_white = self.addOutPort("out_white")
  36. self.out_red = self.addOutPort("out_red")
  37. self.out_blue = self.addOutPort("out_blue")
  38. self.mqtt_out = self.addOutPort("mqtt_out")
  39. self.state = SLDState()
  40. def change_phase(self, new_phase: SLDPhase):
  41. """ Wrapper for changing the phase and time associated with it, helps with logging """
  42. self.state.phase = new_phase
  43. self.state.delta_t = new_phase.timing
  44. logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}")
  45. if new_phase == SLDPhase.EJECT:
  46. self.state.last_sorted_color = self.state.workpiece.color
  47. self.state.visual_update_pending = True
  48. def get_visual_update_data(self) -> MqttMessage:
  49. """ Get visual update data for the animation, contains the action taken and the duration of that action left """
  50. message = MqttMessage()
  51. message.topic = "visualization/sld"
  52. duration = self.state.delta_t
  53. if duration == INFINITY: duration = None
  54. message.payload = {
  55. "action": self.state.phase.value,
  56. "duration": duration, # should be equal to the timing of the phase
  57. "workpiece": self.state.workpiece.to_dict() if self.state.workpiece else None,
  58. "last_sorted_color": self.state.last_sorted_color.value if self.state.last_sorted_color else None,
  59. }
  60. return message
  61. def _is_active(self):
  62. """ Whether the SLDConveyor is in an active phase (!= IDLE) """
  63. return self.state.phase != SLDPhase.IDLE
  64. def get_status_message(self, use_next_phase=True) -> MqttMessage:
  65. phase = self.state.phase.next() if use_next_phase else self.state.phase
  66. active = 0 if phase == SLDPhase.IDLE else 1
  67. code = 2 if active == 1 else 1
  68. message = MqttMessage()
  69. message.topic = "f/i/state/sld"
  70. message.payload = {
  71. "active": active,
  72. "code": code,
  73. "description": '',
  74. "station": 'sld',
  75. "ts": get_timestamp(),
  76. }
  77. return message
  78. def get_sld_sorted_ack(self) -> MqttMessage:
  79. """ Get the MQTT message which denotes that this SLD has sorted a workpiece, and it is ready for pick-up """
  80. message = MqttMessage()
  81. message.topic = "fl/sld/ack"
  82. message.payload = {
  83. "code": 2,
  84. "colorValue": 777, # TODO: figure out how to determine
  85. "ts": get_timestamp(),
  86. "type": self.state.last_sorted_color.value
  87. }
  88. return message
  89. def extTransition(self, inputs):
  90. self.state.delta_t -= self.elapsed
  91. if not self.state.phase == SLDPhase.IDLE:
  92. logger.error(f"{type(self).__name__} '{self.name}' received input while not expecting: {inputs}")
  93. return self.state
  94. elif self.inp in inputs:
  95. wp: Workpiece = inputs[self.inp][0]
  96. self.state.workpiece = wp
  97. self.change_phase(SLDPhase.SORTING_C1_WHITE)
  98. self.state.status_requested = True # request status message
  99. return self.state # important, return state
  100. def timeAdvance(self):
  101. if self.state.visual_update_pending or self.state.status_requested:
  102. return 0.0
  103. return self.state.delta_t
  104. def outputFnc(self):
  105. if self.state.visual_update_pending:
  106. return {self.mqtt_out: [self.get_visual_update_data()]}
  107. if self.state.status_requested:
  108. return {self.mqtt_out: [self.get_status_message(use_next_phase=False)]} # output status
  109. if self.state.phase == SLDPhase.EJECT:
  110. # Output depending on color
  111. output_mapping = {
  112. WorkpieceColor.WHITE: self.out_white,
  113. WorkpieceColor.RED: self.out_red,
  114. WorkpieceColor.BLUE: self.out_blue,
  115. }
  116. target = self.state.workpiece.color
  117. if target in output_mapping:
  118. return {output_mapping[target]: [self.state.workpiece]}
  119. else:
  120. logger.error(f"{type(self).__name__} '{self.name}' unknown sorting target: {target}")
  121. if self.state.phase == SLDPhase.SORTED:
  122. return {self.mqtt_out: [self.get_status_message(), self.get_sld_sorted_ack()]}
  123. return {}
  124. def intTransition(self):
  125. if self.state.visual_update_pending:
  126. self.state.visual_update_pending = False
  127. return self.state
  128. if self.state.status_requested:
  129. self.state.status_requested = False # reset
  130. return self.state
  131. # EJECT workpiece when its color matches the checkpoint for the target
  132. if self.state.workpiece:
  133. if self.state.workpiece.color == WorkpieceColor.WHITE and self.state.phase == SLDPhase.SORTING_C1_WHITE:
  134. self.change_phase(SLDPhase.EJECT)
  135. return self.state
  136. elif self.state.workpiece.color == WorkpieceColor.RED and self.state.phase == SLDPhase.SORTING_C2_RED:
  137. self.change_phase(SLDPhase.EJECT)
  138. return self.state
  139. elif self.state.workpiece.color == WorkpieceColor.BLUE and self.state.phase == SLDPhase.SORTING_C3_BLUE:
  140. self.change_phase(SLDPhase.EJECT) # this one is redundant, the next phase is already EJECT
  141. return self.state
  142. elif self.state.phase == SLDPhase.EJECT:
  143. self.state.workpiece = None # remove the workpiece we just output
  144. # Transition to the next phase
  145. self.change_phase(self.state.phase.next())
  146. return self.state
  147. class SLD(CoupledDEVS):
  148. """ SLD: Sorting Line with Color detection
  149. A Coupled model which combines the sorting line conveyor with 3 colored output bays
  150. """
  151. def __init__(self, name: str):
  152. super(SLD, self).__init__(name)
  153. # Components
  154. self.sld_conveyor: SLDConveyor = self.addSubModel(SLDConveyor(f"{name}_conveyor"))
  155. self.white_bay: HoldingBay = self.addSubModel(HoldingBay(f"{name}_white_bay", "WHITE"))
  156. self.red_bay: HoldingBay = self.addSubModel(HoldingBay(f"{name}_red_bay", "RED"))
  157. self.blue_bay: HoldingBay = self.addSubModel(HoldingBay(f"{name}_blue_bay", "BLUE"))
  158. # Own ports
  159. self.inp = self.addInPort("inp") # SLD conveyor input
  160. self.white_in = self.addInPort("white_in") # vgr requesting pickup
  161. self.red_in = self.addInPort("red_in") # vgr requesting pickup
  162. self.blue_in = self.addInPort("blue_in") # vgr requesting pickup
  163. self.white_out = self.addOutPort("white_out")
  164. self.red_out = self.addOutPort("red_out")
  165. self.blue_out = self.addOutPort("blue_out")
  166. self.mqtt_out = self.addOutPort("mqtt_out")
  167. # Connect internal ports
  168. self.connectPorts(self.inp, self.sld_conveyor.inp)
  169. self.connectPorts(self.sld_conveyor.out_white, self.white_bay.sld_in)
  170. self.connectPorts(self.sld_conveyor.out_red, self.red_bay.sld_in)
  171. self.connectPorts(self.sld_conveyor.out_blue, self.blue_bay.sld_in)
  172. self.connectPorts(self.white_in, self.white_bay.vgr_in)
  173. self.connectPorts(self.red_in, self.red_bay.vgr_in)
  174. self.connectPorts(self.blue_in, self.blue_bay.vgr_in)
  175. self.connectPorts(self.white_bay.vgr_out, self.white_out)
  176. self.connectPorts(self.red_bay.vgr_out, self.red_out)
  177. self.connectPorts(self.blue_bay.vgr_out, self.blue_out)
  178. # Connect MQTT ports
  179. self.connectPorts(self.sld_conveyor.mqtt_out, self.mqtt_out)
  180. self.connectPorts(self.white_bay.mqtt_out, self.mqtt_out)
  181. self.connectPorts(self.red_bay.mqtt_out, self.mqtt_out)
  182. self.connectPorts(self.blue_bay.mqtt_out, self.mqtt_out)