sld_holding_bay.py 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. from dataclasses import dataclass
  2. from loguru import logger
  3. from data_models.mqtt_message import MqttMessage
  4. from data_models.workpiece import Workpiece
  5. from pypdevs.DEVS import AtomicDEVS
  6. from pypdevs.infinity import INFINITY
  7. @dataclass
  8. class HoldingBayState:
  9. color: str # 'WHITE' | 'RED' | 'BLUE'
  10. workpiece: Workpiece | None = None
  11. output_requested = False # if true, output workpiece
  12. visual_update_pending: bool = False
  13. class HoldingBay(AtomicDEVS):
  14. """ Holding bay which holds a single (color) workpiece, part of the SLD. Handles pickup interaction with VGR """
  15. def __init__(self, name: str, color: str):
  16. # name needs to be unique to refer to it
  17. super(HoldingBay, self).__init__(name)
  18. self.sld_in = self.addInPort("sld_in") # input of workpiece via SLD
  19. self.vgr_in = self.addInPort("vgr_in") # input for VGR requesting workpiece
  20. self.mqtt_in = self.addInPort("mqtt_in") # input for MQTT messages (e.g. visual refresh request)
  21. self.vgr_out = self.addOutPort("vgr_out") # output for workpiece to the VGR
  22. self.mqtt_out = self.addOutPort("mqtt_out") # output for visuals
  23. self.state = HoldingBayState(color)
  24. def get_visual_update_data(self) -> MqttMessage:
  25. """ Get visual update data for the animation, contains the action taken and the duration of that action left """
  26. message = MqttMessage()
  27. message.topic = f"visualization/sld_bay/{self.state.color}"
  28. message.payload = {
  29. "workpiece": self.state.workpiece.to_dict() if self.state.workpiece else None,
  30. }
  31. return message
  32. def extTransition(self, inputs):
  33. if self.mqtt_in in inputs:
  34. msg = inputs[self.mqtt_in][0]
  35. if msg.topic == "simulation/ctrl/all" and msg.payload.get("action") == "refresh":
  36. self.state.visual_update_pending = True
  37. return self.state
  38. if self.sld_in in inputs:
  39. self.state.visual_update_pending = True # every transition requires a new visual update
  40. if self.state.workpiece is not None:
  41. logger.error(f"{type(self).__name__} '{self.name}' received workpiece while already holding one!")
  42. return self.state
  43. wp = inputs[self.sld_in][0]
  44. self.state.workpiece = wp
  45. logger.trace(f"{type(self).__name__} '{self.name}' received: {wp}")
  46. if self.vgr_in in inputs:
  47. self.state.visual_update_pending = True # every transition requires a new visual update
  48. if not self.state.workpiece:
  49. logger.trace(
  50. f"{type(self).__name__} '{self.name}' received VGR pickup request whilst not having workpiece!")
  51. else:
  52. self.state.output_requested = True
  53. return self.state # important, return state
  54. def timeAdvance(self):
  55. if self.state.output_requested or self.state.visual_update_pending:
  56. return 0.0
  57. return INFINITY
  58. def outputFnc(self):
  59. if self.state.output_requested:
  60. return {self.vgr_out: [self.state.workpiece]}
  61. if self.state.visual_update_pending:
  62. return {self.mqtt_out: [self.get_visual_update_data()]}
  63. return {}
  64. def intTransition(self):
  65. # ORDER IS IMPORTANT HERE
  66. if self.state.output_requested:
  67. self.state.output_requested = False
  68. self.state.workpiece = None # We just output a workpiece, so remove it from our state
  69. elif self.state.visual_update_pending:
  70. self.state.visual_update_pending = False
  71. return self.state
  72. return self.state