simple_conveyor.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. from data_models.mqtt_message import MqttMessage
  2. from pypdevs.DEVS import AtomicDEVS
  3. from pypdevs.infinity import INFINITY
  4. from dataclasses import dataclass
  5. from loguru import logger
  6. from data_models.workpiece import Workpiece
  7. from utils.timed_phase_enum import TimedPhaseEnum
  8. class ConveyorPhase(TimedPhaseEnum):
  9. """ Steps in order, along with their timings """
  10. IDLE = ('IDLE', INFINITY)
  11. RUNNING = ('RUNNING', 2.368)
  12. @dataclass
  13. class ConveyorState:
  14. delta_t: float = INFINITY
  15. phase: ConveyorPhase = ConveyorPhase.IDLE
  16. workpiece: Workpiece | None = None
  17. visual_update_pending: bool = False
  18. class SimpleConveyor(AtomicDEVS):
  19. """ A simple conveyor belt which moves a workpiece across its belt and detects when it has reached the end """
  20. def __init__(self, name: str):
  21. super(SimpleConveyor, self).__init__(name)
  22. self.inp = self.addInPort("inp")
  23. self.out = self.addOutPort("out")
  24. self.mqtt_in = self.addInPort("mqtt_in") # to receive visual update requests
  25. self.mqtt_out = self.addOutPort("mqtt_out")
  26. self.state = ConveyorState()
  27. def change_phase(self, new_phase: ConveyorPhase):
  28. """ Wrapper for changing the phase and time associated with it, helps with logging """
  29. self.state.phase = new_phase
  30. self.state.delta_t = new_phase.timing
  31. logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}")
  32. self.state.visual_update_pending = True
  33. def get_visual_update_data(self) -> MqttMessage:
  34. """ Get visual update data for the animation, contains the action taken and the duration of that action left """
  35. message = MqttMessage()
  36. message.topic = "visualization/conveyor"
  37. duration = self.state.delta_t
  38. if duration == INFINITY: duration = None
  39. message.payload = {
  40. "action": self.state.phase.value,
  41. "duration": duration, # should be equal to the timing of the phase
  42. "workpiece": self.state.workpiece.to_dict() if self.state.workpiece else None,
  43. }
  44. return message
  45. def extTransition(self, inputs):
  46. if self.mqtt_in in inputs:
  47. msg = inputs[self.mqtt_in][0]
  48. if msg.topic == "simulation/ctrl/all" and msg.payload.get("action") == "refresh":
  49. self.state.visual_update_pending = True
  50. return self.state
  51. if self.inp in inputs:
  52. new_workpiece = inputs[self.inp][0]
  53. logger.trace(f"{type(self).__name__} '{self.name}' received: {new_workpiece}")
  54. self.state.workpiece = new_workpiece
  55. self.change_phase(ConveyorPhase.RUNNING)
  56. return self.state # important, return state
  57. def timeAdvance(self):
  58. if self.state.visual_update_pending:
  59. return 0.0
  60. return self.state.delta_t
  61. def outputFnc(self):
  62. if self.state.visual_update_pending:
  63. return {self.mqtt_out: [self.get_visual_update_data()]}
  64. logger.trace(f"{type(self).__name__} '{self.name}' outputs: {self.state.workpiece}")
  65. return {self.out: [self.state.workpiece]}
  66. def intTransition(self):
  67. if self.state.visual_update_pending:
  68. self.state.visual_update_pending = False
  69. return self.state
  70. # We have just output the workpiece, so mark it as empty again
  71. self.state.workpiece = None
  72. self.change_phase(ConveyorPhase.IDLE)
  73. return self.state