generator.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. from dataclasses import dataclass
  2. from loguru import logger
  3. from data_models.mqtt_message import MqttMessage
  4. from data_models.workpiece import Workpiece, WorkpieceColor
  5. from pypdevs.DEVS import AtomicDEVS
  6. from pypdevs.infinity import INFINITY
  7. @dataclass
  8. class GeneratorState:
  9. generation_delay: float # the time after which the next workpiece will be generated
  10. automatic: bool = True # whether to automatically generate or wait for mqtt instructions
  11. curr_time: int = 0 # the current time (in simulation, in minutes)
  12. id_counter: int = 0 # id counter for workpieces
  13. next_workpiece: Workpiece | None = None
  14. delta_t: float = INFINITY
  15. class Generator(AtomicDEVS):
  16. def __init__(self, name: str, automatic: bool, generation_delay: float = 30.0):
  17. # name needs to be unique to refer to it
  18. super(Generator, self).__init__(name)
  19. self.out = self.addOutPort("out")
  20. self.mqtt_in = self.addInPort("mqtt_in")
  21. self.state = GeneratorState(generation_delay=generation_delay, automatic=automatic)
  22. if automatic:
  23. self.state.delta_t = 0.0 # start immediately
  24. def get_id(self) -> str:
  25. identifier = self.state.id_counter
  26. self.state.id_counter += 1
  27. return str(identifier)
  28. def extTransition(self, inputs):
  29. self.state.delta_t -= self.elapsed
  30. if self.mqtt_in in inputs:
  31. message: MqttMessage = inputs[self.mqtt_in][0]
  32. if message.topic == "simulation/spawn":
  33. workpiece = message.payload['workpiece']
  34. color = WorkpieceColor(workpiece['type'])
  35. self.state.next_workpiece = Workpiece(id=self.get_id(), color=color, state=workpiece["state"])
  36. return self.state # important, return state
  37. def timeAdvance(self):
  38. if self.state.automatic:
  39. return self.state.delta_t # Next workpiece generation time
  40. elif self.state.next_workpiece:
  41. return 0.0 # immediately generate piece
  42. else:
  43. return INFINITY # idle until mqtt input
  44. def outputFnc(self):
  45. next_workpiece = self.state.next_workpiece
  46. if self.state.automatic:
  47. next_workpiece = Workpiece(id=self.get_id(), color=WorkpieceColor.RED, state="raw")
  48. logger.trace(f"Generator '{self.name}' outputs: {next_workpiece}")
  49. return {self.out: [next_workpiece]}
  50. def intTransition(self):
  51. self.state.curr_time += self.timeAdvance()
  52. self.state.next_workpiece = None # we just output a workpiece
  53. # Determine the delay between generating the next workpiece
  54. self.state.delta_t = self.state.generation_delay
  55. return self.state # important