123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- from dataclasses import dataclass
- from loguru import logger
- from data_models.mqtt_message import MqttMessage
- from data_models.workpiece import Workpiece, WorkpieceColor
- from pypdevs.DEVS import AtomicDEVS
- from pypdevs.infinity import INFINITY
- @dataclass
- class GeneratorState:
- generation_delay: float # the time after which the next workpiece will be generated
- automatic: bool = True # whether to automatically generate or wait for mqtt instructions
- curr_time: int = 0 # the current time (in simulation, in minutes)
- id_counter: int = 0 # id counter for workpieces
- next_workpiece: Workpiece | None = None
- delta_t: float = INFINITY
- class Generator(AtomicDEVS):
- def __init__(self, name: str, automatic: bool, generation_delay: float = 30.0):
- # name needs to be unique to refer to it
- super(Generator, self).__init__(name)
- self.out = self.addOutPort("out")
- self.mqtt_in = self.addInPort("mqtt_in")
- self.state = GeneratorState(generation_delay=generation_delay, automatic=automatic)
- if automatic:
- self.state.delta_t = 0.0 # start immediately
- def get_id(self) -> str:
- identifier = self.state.id_counter
- self.state.id_counter += 1
- return str(identifier)
- def extTransition(self, inputs):
- self.state.delta_t -= self.elapsed
- if self.mqtt_in in inputs:
- message: MqttMessage = inputs[self.mqtt_in][0]
- if message.topic == "simulation/spawn":
- workpiece = message.payload['workpiece']
- color = WorkpieceColor(workpiece['type'])
- self.state.next_workpiece = Workpiece(id=self.get_id(), color=color, state=workpiece["state"])
- return self.state # important, return state
- def timeAdvance(self):
- if self.state.automatic:
- return self.state.delta_t # Next workpiece generation time
- elif self.state.next_workpiece:
- return 0.0 # immediately generate piece
- else:
- return INFINITY # idle until mqtt input
- def outputFnc(self):
- next_workpiece = self.state.next_workpiece
- if self.state.automatic:
- next_workpiece = Workpiece(id=self.get_id(), color=WorkpieceColor.RED, state="raw")
- logger.trace(f"Generator '{self.name}' outputs: {next_workpiece}")
- return {self.out: [next_workpiece]}
- def intTransition(self):
- self.state.curr_time += self.timeAdvance()
- self.state.next_workpiece = None # we just output a workpiece
- # Determine the delay between generating the next workpiece
- self.state.delta_t = self.state.generation_delay
- return self.state # important
|