dsi.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. from data_models.mqtt_message import MqttMessage
  2. from pypdevs.DEVS import AtomicDEVS
  3. from pypdevs.infinity import INFINITY
  4. from dataclasses import dataclass
  5. from loguru import logger
  6. from data_models.workpiece import Workpiece, WorkpieceColor
  7. from utils.get_timestamp import get_timestamp
  8. from utils.timed_phase_enum import TimedPhaseEnum
  9. class DsiPhase(TimedPhaseEnum):
  10. """ Steps in order, along with their timings """
  11. IDLE = ('IDLE', INFINITY)
  12. WP_RECEIVED = ('WP_RECEIVED', 0.2) # delay before sending a message
  13. AWAIT_PICKUP = ('AWAIT_PICKUP', INFINITY) # await pickup by vgr
  14. EXPORT_WP = ('EXPORT_WP', 0.0) # immediately handover wp to vgr
  15. AWAIT_VGR_CONFIRMATION = ('AWAIT_VGR_CONFIRMATION', INFINITY) # await for fl/vgr/do confirmation
  16. @dataclass
  17. class InputBayState:
  18. delta_t: float = INFINITY
  19. phase: DsiPhase = DsiPhase.IDLE
  20. workpiece: Workpiece | None = None
  21. status_requested = True # if true, publish status
  22. visual_update_pending: bool = False
  23. class DSI(AtomicDEVS):
  24. """ Input bay which holds a single workpiece (Delivery Station Input) """
  25. def __init__(self, name: str):
  26. # name needs to be unique to refer to it
  27. super(DSI, self).__init__(name)
  28. self.inp = self.addInPort("inp")
  29. self.out = self.addOutPort("out")
  30. self.mqtt_in = self.addInPort("mqtt_in")
  31. self.mqtt_out = self.addOutPort("mqtt_out")
  32. self.vgr_in = self.addInPort("vgr_in") # input for vgr requesting workpiece
  33. self.state = InputBayState()
  34. def change_phase(self, new_phase: DsiPhase):
  35. """ Wrapper for changing the phase and time associated with it, helps with logging """
  36. self.state.phase = new_phase
  37. self.state.delta_t = new_phase.timing
  38. logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}")
  39. self.state.visual_update_pending = True
  40. def get_visual_update_data(self) -> MqttMessage:
  41. """ Get visual update data for the animation, contains the action taken and the duration of that action left """
  42. message = MqttMessage()
  43. message.topic = "visualization/dsi"
  44. duration = self.state.delta_t
  45. if duration == INFINITY: duration = None
  46. message.payload = {
  47. "action": self.state.phase.value,
  48. "duration": duration, # should be equal to the timing of the phase
  49. "workpiece": self.state.workpiece.to_dict() if self.state.workpiece else None,
  50. }
  51. return message
  52. def get_status_message(self, use_next_phase=True) -> MqttMessage:
  53. """ Get the status message for the input bay """
  54. phase = self.state.phase.next() if use_next_phase else self.state.phase
  55. active = 0 if (phase == DsiPhase.IDLE or phase == DsiPhase.AWAIT_VGR_CONFIRMATION) else 1
  56. code = 1 if (phase == DsiPhase.IDLE) else 0
  57. message = MqttMessage()
  58. message.topic = "f/i/state/dsi"
  59. message.payload = {
  60. "active": active,
  61. "code": code,
  62. "description": '',
  63. "station": 'dsi',
  64. "ts": get_timestamp(),
  65. }
  66. return message
  67. def extTransition(self, inputs):
  68. self.state.delta_t -= self.elapsed
  69. if self.inp in inputs:
  70. if not self.state.phase == DsiPhase.IDLE:
  71. logger.error(f"{type(self).__name__} '{self.name}' received workpiece while not Idle!")
  72. return self.state
  73. wp = inputs[self.inp][0]
  74. logger.trace(f"{type(self).__name__} '{self.name}' received: {wp}")
  75. self.change_phase(DsiPhase.WP_RECEIVED)
  76. self.state.workpiece = wp
  77. if self.mqtt_in in inputs:
  78. msg = inputs[self.mqtt_in][0] # TODO: check if multiple messages are possible
  79. if msg.topic == "simulation/ctrl/all" and msg.payload.get("action") == "refresh":
  80. self.state.status_requested = True
  81. self.state.visual_update_pending = True
  82. return self.state
  83. elif msg.topic == "simulation/ctrl/dsi" and msg.payload['action'] == "workpiece_arrived":
  84. self.change_phase(DsiPhase.WP_RECEIVED)
  85. self.state.workpiece = Workpiece("?", WorkpieceColor.NONE)
  86. self.state.status_requested = True
  87. elif msg.topic == "fl/vgr/do" and msg.payload['code'] == 1:
  88. if self.state.phase == DsiPhase.AWAIT_VGR_CONFIRMATION:
  89. self.change_phase(DsiPhase.IDLE)
  90. self.state.status_requested = True # publish idle update
  91. else:
  92. logger.trace(f"{type(self).__name__} '{self.name}' received VGR confirmation whilst not expecting it")
  93. if self.vgr_in in inputs:
  94. if not self.state.phase == DsiPhase.AWAIT_PICKUP:
  95. logger.trace(f"{type(self).__name__} '{self.name}' received VGR pickup request whilst not expecting it")
  96. else:
  97. self.change_phase(DsiPhase.EXPORT_WP)
  98. return self.state # important, return state
  99. def timeAdvance(self):
  100. if self.state.visual_update_pending or self.state.status_requested:
  101. return 0.0
  102. return self.state.delta_t
  103. def outputFnc(self):
  104. if self.state.visual_update_pending:
  105. return {self.mqtt_out: [self.get_visual_update_data()]}
  106. if self.state.status_requested or self.state.phase == DsiPhase.WP_RECEIVED:
  107. return {self.mqtt_out: [self.get_status_message(use_next_phase=False)]}
  108. if self.state.phase == DsiPhase.EXPORT_WP:
  109. return {self.out: [self.state.workpiece]}
  110. return {}
  111. def intTransition(self):
  112. if self.state.visual_update_pending:
  113. self.state.visual_update_pending = False
  114. return self.state
  115. if self.state.status_requested:
  116. self.state.status_requested = False # if requested, we've handled, so unset
  117. return self.state
  118. if self.state.phase == DsiPhase.EXPORT_WP:
  119. self.state.workpiece = None # We have just output the workpiece, so mark it as empty again
  120. # Transition to next state
  121. self.change_phase(self.state.phase.next())
  122. return self.state