generator.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. from dataclasses import dataclass
  2. from loguru import logger
  3. from data_models.mqtt_message import MqttMessage
  4. from data_models.workpiece import Workpiece, WorkpieceColor
  5. from pypdevs.DEVS import AtomicDEVS
  6. from pypdevs.infinity import INFINITY
  7. @dataclass
  8. class GeneratorState:
  9. generation_delay: float # the time after which the next workpiece will be generated
  10. automatic: bool = True # whether to automatically generate or wait for mqtt instructions
  11. curr_time: int = 0 # the current time (in simulation, in minutes)
  12. id_counter: int = 0 # id counter for workpieces
  13. next_color: WorkpieceColor | None = None
  14. next_id: str | None = None # next workpiece id
  15. delta_t: float = INFINITY
  16. class Generator(AtomicDEVS):
  17. def __init__(self, name: str, automatic: bool, generation_delay: float = 60.0):
  18. # name needs to be unique to refer to it
  19. super(Generator, self).__init__(name)
  20. self.out = self.addOutPort("out")
  21. self.mqtt_in = self.addInPort("mqtt_in")
  22. self.state = GeneratorState(generation_delay=generation_delay, automatic=automatic)
  23. if automatic:
  24. self.state.next_color = [WorkpieceColor.BLUE, WorkpieceColor.RED, WorkpieceColor.WHITE][self.state.id_counter % 3]
  25. self.state.delta_t = 0.0 # start immediately
  26. def get_id(self) -> str:
  27. identifier = self.state.id_counter
  28. self.state.id_counter += 1
  29. return str(identifier)
  30. def extTransition(self, inputs):
  31. self.state.delta_t -= self.elapsed
  32. if self.mqtt_in in inputs:
  33. message: MqttMessage = inputs[self.mqtt_in][0]
  34. if message.topic == "simulation/spawn":
  35. workpiece = message.payload['workpiece']
  36. self.state.next_color = WorkpieceColor(workpiece['type'])
  37. self.state.next_id = self.get_id()
  38. return self.state # important, return state
  39. def timeAdvance(self):
  40. if self.state.automatic:
  41. return self.state.delta_t # Next workpiece generation time
  42. elif self.state.next_color:
  43. return 0.0 # immediately generate the next workpiece
  44. else:
  45. return INFINITY # idle until mqtt input
  46. def outputFnc(self):
  47. next_workpiece = Workpiece(id=self.state.next_id, color=self.state.next_color)
  48. logger.trace(f"Generator '{self.name}' outputs: {next_workpiece}")
  49. return {self.out: [next_workpiece]}
  50. def intTransition(self):
  51. self.state.curr_time += self.timeAdvance()
  52. self.state.next_color = None # we just output a workpiece
  53. if self.state.automatic:
  54. self.state.next_color = [WorkpieceColor.BLUE, WorkpieceColor.RED, WorkpieceColor.WHITE][self.state.id_counter % 3]
  55. self.state.next_id = self.get_id()
  56. # Set the delay between generating the next workpiece
  57. self.state.delta_t = self.state.generation_delay
  58. return self.state # important