reader_station.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. from data_models.mqtt_message import MqttMessage
  2. from pypdevs.DEVS import AtomicDEVS
  3. from pypdevs.infinity import INFINITY
  4. from dataclasses import dataclass, field
  5. from loguru import logger
  6. from data_models.workpiece import Workpiece, WorkpieceColor
  7. from utils.get_timestamp import get_timestamp
  8. from utils.timed_phase_enum import TimedPhaseEnum
  9. class InputRoutine(TimedPhaseEnum):
  10. """
  11. Routine when a new workpiece enters the factory
  12. (Steps along with their timings)
  13. """
  14. IDLE = ('IDLE', INFINITY)
  15. NFC_READING = ('NFC_READING', 1.367)
  16. COLOR_READING = ('COLOR_READING', 0.100)
  17. NFC_WRITING = ('NFC_WRITING', 7.603) # yes, this takes a while
  18. class OutputRoutine(TimedPhaseEnum):
  19. """
  20. Routine before a workpiece gets delivered to the output
  21. (Steps along with their timings)
  22. """
  23. IDLE = ('IDLE', INFINITY)
  24. NFC_WRITING = ('NFC_WRITING', 3.902)
  25. @dataclass
  26. class ReaderStationState:
  27. phase: InputRoutine | OutputRoutine = InputRoutine.IDLE
  28. delta_t: float = INFINITY
  29. workpiece: Workpiece | None = None
  30. color_read: WorkpieceColor = WorkpieceColor.NONE # color read by the color sensor
  31. visual_update_pending: bool = False
  32. class ReaderStation(AtomicDEVS):
  33. """ Station which holds both the NFC reader/writer and the color sensor to detect workpiece color """
  34. def __init__(self, name: str):
  35. # name needs to be unique to refer to it
  36. super(ReaderStation, self).__init__(name)
  37. self.nfc_in = self.addInPort("nfc_in")
  38. self.nfc_out = self.addOutPort("nfc_out")
  39. self.color_in = self.addInPort("color_in")
  40. self.color_out = self.addOutPort("color_out")
  41. self.mqtt_out = self.addOutPort("mqtt_out")
  42. self.state = ReaderStationState()
  43. def change_phase(self, new_phase: InputRoutine | OutputRoutine):
  44. """ Wrapper for changing the phase and time associated with it, helps with logging """
  45. self.state.phase = new_phase
  46. self.state.delta_t = new_phase.timing
  47. logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}")
  48. self.state.visual_update_pending = True
  49. def get_visual_update_data(self) -> MqttMessage:
  50. """ Get visual update data for the animation, contains the action taken and the duration of that action left """
  51. message = MqttMessage()
  52. message.topic = "visualization/nfc"
  53. duration = self.state.delta_t
  54. if duration == INFINITY: duration = None
  55. message.payload = {
  56. "action": self.state.phase.value,
  57. "duration": duration, # should be equal to the timing of the phase
  58. "workpiece": self.state.workpiece.to_dict() if self.state.workpiece else None,
  59. }
  60. return message
  61. def get_nfc_message(self) -> MqttMessage:
  62. """ Get the nfc message for the workpiece """
  63. message = MqttMessage()
  64. message.topic = "f/i/nfc/ds"
  65. message.payload = {
  66. "history": [], # TODO: history of NFC reading
  67. "ts": get_timestamp(),
  68. "workpiece": {
  69. "id": self.state.workpiece.id,
  70. "state": self.state.workpiece.state,
  71. "type": self.state.color_read.value,
  72. }
  73. }
  74. return message
  75. def extTransition(self, inputs):
  76. self.state.delta_t -= self.elapsed
  77. if not self.state.phase == InputRoutine.IDLE:
  78. logger.error(f"{type(self).__name__} '{self.name}' received workpiece while not Idle!")
  79. if self.nfc_in in inputs:
  80. wp: Workpiece = inputs[self.nfc_in][0]
  81. logger.trace(f"{type(self).__name__} '{self.name}' NFC received: {wp}")
  82. if wp.state == "BAKED":
  83. self.change_phase(OutputRoutine.NFC_WRITING)
  84. self.state.color_read = wp.color
  85. wp.state = "PROCESSED" # TODO: check if this logic is correct
  86. if self.state.color_read == WorkpieceColor.NONE:
  87. self.change_phase(InputRoutine.NFC_READING) # new, read
  88. else:
  89. self.change_phase(InputRoutine.NFC_WRITING) # seen, write nfc
  90. self.state.workpiece = wp
  91. if self.color_in in inputs:
  92. wp: Workpiece = inputs[self.color_in][0]
  93. logger.trace(f"{type(self).__name__} '{self.name}' COLOR_SENSOR received: {wp}")
  94. self.change_phase(InputRoutine.COLOR_READING)
  95. self.state.workpiece = wp
  96. return self.state # important, return state
  97. def timeAdvance(self):
  98. if self.state.visual_update_pending:
  99. return 0.0
  100. return self.state.delta_t
  101. def outputFnc(self):
  102. if self.state.visual_update_pending:
  103. return {self.mqtt_out: [self.get_visual_update_data()]}
  104. if self.state.phase in [InputRoutine.NFC_READING, InputRoutine.NFC_WRITING, OutputRoutine.NFC_WRITING]:
  105. return {self.nfc_out: [self.state.workpiece], self.mqtt_out: [self.get_nfc_message()]}
  106. elif self.state.phase == InputRoutine.COLOR_READING:
  107. return {self.color_out: [self.state.workpiece]}
  108. return {}
  109. def intTransition(self):
  110. if self.state.visual_update_pending:
  111. self.state.visual_update_pending = False
  112. return self.state
  113. if self.state.phase == InputRoutine.COLOR_READING:
  114. self.state.color_read = self.state.workpiece.color
  115. if self.state.phase in [InputRoutine.NFC_WRITING, OutputRoutine.NFC_WRITING]:
  116. self.state.color_read = WorkpieceColor.NONE # reset for next input
  117. # Transition to IDLE
  118. self.state.workpiece = None # We have just output the workpiece, so mark it as empty again
  119. self.change_phase(InputRoutine.IDLE)
  120. return self.state