reader_station.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. from data_models.mqtt_message import MqttMessage
  2. from pypdevs.DEVS import AtomicDEVS
  3. from pypdevs.infinity import INFINITY
  4. from dataclasses import dataclass, field
  5. from loguru import logger
  6. from data_models.workpiece import Workpiece, WorkpieceColor
  7. from utils.get_timestamp import get_timestamp
  8. from utils.timed_phase_enum import TimedPhaseEnum
  9. class InputRoutine(TimedPhaseEnum):
  10. """
  11. Routine when a new workpiece enters the factory
  12. (Steps along with their timings)
  13. """
  14. IDLE = ('IDLE', INFINITY)
  15. NFC_READING = ('NFC_READING', 1.367)
  16. IDLE_WAIT = ('IDLE_WAIT', INFINITY) # wait for VGR to move to color sensor
  17. COLOR_READING = ('COLOR_READING', 0.100)
  18. IDLE_WAIT_2 = ('IDLE_WAIT_2', INFINITY) # wait for VGR to move to NFC reader
  19. NFC_WRITING = ('NFC_WRITING', 7.603) # yes, this takes a while
  20. class OutputRoutine(TimedPhaseEnum):
  21. """
  22. Routine before a workpiece gets delivered to the output
  23. (Steps along with their timings)
  24. """
  25. IDLE = ('IDLE', INFINITY)
  26. NFC_WRITING = ('NFC_WRITING', 3.902)
  27. @dataclass
  28. class ReaderStationState:
  29. phase: InputRoutine | OutputRoutine = InputRoutine.IDLE
  30. delta_t: float = INFINITY
  31. workpiece: Workpiece | None = None
  32. color_read: WorkpieceColor = WorkpieceColor.NONE # color read by the color sensor
  33. visual_update_pending: bool = False
  34. class ReaderStation(AtomicDEVS):
  35. """ Station which holds both the NFC reader/writer and the color sensor to detect workpiece color """
  36. def __init__(self, name: str):
  37. # name needs to be unique to refer to it
  38. super(ReaderStation, self).__init__(name)
  39. self.nfc_in = self.addInPort("nfc_in")
  40. self.nfc_out = self.addOutPort("nfc_out")
  41. self.color_in = self.addInPort("color_in")
  42. self.color_out = self.addOutPort("color_out")
  43. self.mqtt_in = self.addInPort("mqtt_in")
  44. self.mqtt_out = self.addOutPort("mqtt_out")
  45. self.state = ReaderStationState()
  46. def change_phase(self, new_phase: InputRoutine | OutputRoutine):
  47. """ Wrapper for changing the phase and time associated with it, helps with logging """
  48. self.state.phase = new_phase
  49. self.state.delta_t = new_phase.timing
  50. logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}")
  51. self.state.visual_update_pending = True
  52. def get_visual_update_data(self) -> MqttMessage:
  53. """ Get visual update data for the animation, contains the action taken and the duration of that action left """
  54. message = MqttMessage()
  55. message.topic = "visualization/nfc"
  56. duration = self.state.delta_t
  57. if duration == INFINITY: duration = None
  58. message.payload = {
  59. "action": self.state.phase.value,
  60. "duration": duration, # should be equal to the timing of the phase
  61. "workpiece": self.state.workpiece.to_dict() if self.state.workpiece else None,
  62. }
  63. return message
  64. def get_nfc_message(self) -> MqttMessage:
  65. """ Get the nfc message for the workpiece """
  66. message = MqttMessage()
  67. message.topic = "f/i/nfc/ds"
  68. message.payload = {
  69. "history": [], # TODO: history of NFC reading
  70. "ts": get_timestamp(),
  71. "workpiece": {
  72. "id": self.state.workpiece.id,
  73. "state": self.state.workpiece.state,
  74. "type": self.state.color_read.value,
  75. }
  76. }
  77. return message
  78. def extTransition(self, inputs):
  79. self.state.delta_t -= self.elapsed
  80. if self.mqtt_in in inputs:
  81. msg = inputs[self.mqtt_in][0]
  82. if msg.topic == "simulation/ctrl/nfc":
  83. print(msg)
  84. if self.state.workpiece and msg.payload["action"] == "id_update":
  85. self.state.workpiece.id = msg.payload["workpiece"]["id"]
  86. elif self.state.workpiece and msg.payload["action"] == "color_update":
  87. self.state.workpiece.color = WorkpieceColor[msg.payload["workpiece"]["type"]]
  88. if msg.topic == "simulation/ctrl/all" and msg.payload.get("action") == "refresh":
  89. self.state.visual_update_pending = True
  90. return self.state
  91. elif not self.state.phase in [InputRoutine.IDLE, InputRoutine.IDLE_WAIT, InputRoutine.IDLE_WAIT_2, OutputRoutine.IDLE]:
  92. logger.error(f"{type(self).__name__} '{self.name}' received workpiece while not Idle!")
  93. if self.nfc_in in inputs:
  94. wp: Workpiece = inputs[self.nfc_in][0]
  95. logger.trace(f"{type(self).__name__} '{self.name}' NFC received: {wp}")
  96. if wp.state == "BAKED":
  97. self.change_phase(OutputRoutine.NFC_WRITING)
  98. self.state.color_read = wp.color
  99. wp.state = "PROCESSED" # TODO: check if this logic is correct
  100. elif self.state.phase in [InputRoutine.IDLE, OutputRoutine.IDLE]:
  101. self.change_phase(InputRoutine.NFC_READING) # new, read
  102. elif self.state.phase == InputRoutine.IDLE_WAIT_2:
  103. self.change_phase(InputRoutine.NFC_WRITING) # seen, write nfc
  104. self.state.workpiece = wp
  105. if self.color_in in inputs:
  106. wp: Workpiece = inputs[self.color_in][0]
  107. logger.trace(f"{type(self).__name__} '{self.name}' COLOR_SENSOR received: {wp}")
  108. self.change_phase(InputRoutine.COLOR_READING)
  109. self.state.workpiece = wp
  110. return self.state # important, return state
  111. def timeAdvance(self):
  112. if self.state.visual_update_pending:
  113. return 0.0
  114. return self.state.delta_t
  115. def outputFnc(self):
  116. if self.state.visual_update_pending:
  117. return {self.mqtt_out: [self.get_visual_update_data()]}
  118. if self.state.phase in [InputRoutine.NFC_READING, InputRoutine.NFC_WRITING, OutputRoutine.NFC_WRITING]:
  119. return {self.nfc_out: [self.state.workpiece], self.mqtt_out: [self.get_nfc_message()]}
  120. elif self.state.phase == InputRoutine.COLOR_READING:
  121. return {self.color_out: [self.state.workpiece]}
  122. return {}
  123. def intTransition(self):
  124. if self.state.visual_update_pending:
  125. self.state.visual_update_pending = False
  126. return self.state
  127. if self.state.phase == InputRoutine.COLOR_READING:
  128. self.state.color_read = self.state.workpiece.color
  129. if self.state.phase in [InputRoutine.NFC_WRITING, OutputRoutine.NFC_WRITING]:
  130. self.state.color_read = WorkpieceColor.NONE # reset for next input
  131. # Transition to next phase (idle phase)
  132. self.state.workpiece = None # We have just output the workpiece, so mark it as empty again
  133. self.change_phase(self.state.phase.next())
  134. return self.state