crate_transporter.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. from dataclasses import dataclass
  2. from enum import Enum
  3. from loguru import logger
  4. from data_models.mqtt_message import MqttMessage
  5. from pypdevs.DEVS import AtomicDEVS
  6. from pypdevs.infinity import INFINITY
  7. from data_models.crate import Crate
  8. from utils.timed_phase_enum import TimedPhaseEnum
  9. class CrateTransporterPhase(TimedPhaseEnum):
  10. IDLE = ('IDLE', INFINITY)
  11. MOVE_L_TO_R = ('MOVE_L_TO_R', 1.150)
  12. MOVE_R_TO_L = ('MOVE_R_TO_L', 0.965)
  13. @dataclass
  14. class CrateTransporterState:
  15. cargo: Crate | None = None
  16. phase: CrateTransporterPhase = CrateTransporterPhase.IDLE
  17. delta_t: float = INFINITY
  18. visual_update_pending: bool = False
  19. class CrateTransporter(AtomicDEVS):
  20. """
  21. A crate transporter which transports crates from one side to the other (takes exactly 1 second)
  22. [HBW] left_in -----> right_out [VGR]
  23. left_out <----- right_in
  24. """
  25. def __init__(self, name: str):
  26. super(CrateTransporter, self).__init__(name)
  27. # IN PORTS
  28. self.left_in = self.addInPort("left_in")
  29. self.right_in = self.addInPort("right_in")
  30. self.mqtt_in = self.addInPort("mqtt_in")
  31. # OUT PORTS
  32. self.right_out = self.addOutPort("right_out")
  33. self.left_out = self.addOutPort("left_out")
  34. self.mqtt_out = self.addOutPort("mqtt_out")
  35. self.state = CrateTransporterState()
  36. def change_phase(self, new_phase: CrateTransporterPhase):
  37. """ Wrapper for changing the phase and time associated with it, helps with logging """
  38. self.state.phase = new_phase
  39. self.state.delta_t = new_phase.timing
  40. logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}")
  41. self.state.visual_update_pending = True
  42. def get_visual_update_data(self) -> MqttMessage:
  43. """ Get visual update data for the animation, contains the action taken and the duration of that action left """
  44. message = MqttMessage()
  45. message.topic = "visualization/ct"
  46. duration = self.state.delta_t
  47. if duration == INFINITY: duration = None
  48. message.payload = {
  49. "action": self.state.phase.value,
  50. "duration": duration, # should be equal to the timing of the phase
  51. "crate": self.state.cargo.to_dict() if self.state.cargo else None, # cargo (crate), if present
  52. }
  53. return message
  54. def _is_input_valid(self, input_item) -> bool:
  55. """ Checks if the received input is valid for the current state"""
  56. if not isinstance(input_item, Crate):
  57. logger.error(f"{type(self).__name__} '{self.name}' received input which is not a crate: {input_item}")
  58. return False
  59. if self.state.cargo is not None:
  60. logger.error(f"{type(self).__name__} '{self.name}' received input crate whilst already holding a crate")
  61. return False
  62. return True
  63. def extTransition(self, inputs):
  64. self.state.delta_t -= self.elapsed
  65. if self.mqtt_in in inputs:
  66. # Handle MQTT input, which is used for visual update requests
  67. mqtt_message = inputs[self.mqtt_in][0]
  68. if mqtt_message.topic == "simulation/ctrl/all" and mqtt_message.payload.get("action") == "refresh":
  69. self.state.visual_update_pending = True
  70. return self.state
  71. # Check if input while moving
  72. elif not self.state.phase == CrateTransporterPhase.IDLE:
  73. logger.error(f"{type(self).__name__} '{self.name}' received input while moving {self.state.phase}")
  74. return self.state
  75. if self.left_in in inputs or self.right_in in inputs:
  76. if self.left_in in inputs:
  77. item = inputs[self.left_in][0]
  78. new_phase = CrateTransporterPhase.MOVE_L_TO_R
  79. else: # right_in
  80. item = inputs[self.right_in][0]
  81. new_phase = CrateTransporterPhase.MOVE_R_TO_L
  82. if self._is_input_valid(item):
  83. # Add the item and proceed to the next state
  84. logger.trace(f"{type(self).__name__} '{self.name}' received: {item}")
  85. self.state.cargo = item
  86. self.change_phase(new_phase)
  87. return self.state # important, return state
  88. def timeAdvance(self):
  89. if self.state.visual_update_pending:
  90. return 0.0
  91. return self.state.delta_t
  92. def outputFnc(self):
  93. if self.state.visual_update_pending:
  94. return {self.mqtt_out: [self.get_visual_update_data()]}
  95. logger.trace(f"{type(self).__name__} '{self.name}' outputs: {self.state.cargo}")
  96. if self.state.phase == CrateTransporterPhase.MOVE_R_TO_L:
  97. return {self.left_out: [self.state.cargo]}
  98. elif self.state.phase == CrateTransporterPhase.MOVE_L_TO_R:
  99. return {self.right_out: [self.state.cargo]}
  100. return {}
  101. def intTransition(self):
  102. if self.state.visual_update_pending:
  103. self.state.visual_update_pending = False
  104. return self.state
  105. # We have just output a crate, so set idle again and remove the cargo
  106. self.change_phase(CrateTransporterPhase.IDLE)
  107. self.state.cargo = None
  108. return self.state