|
|
@@ -1,5 +1,4 @@
|
|
|
from dataclasses import dataclass
|
|
|
-
|
|
|
from loguru import logger
|
|
|
|
|
|
from data_models.mqtt_message import MqttMessage
|
|
|
@@ -14,12 +13,13 @@ class GeneratorState:
|
|
|
automatic: bool = True # whether to automatically generate or wait for mqtt instructions
|
|
|
curr_time: int = 0 # the current time (in simulation, in minutes)
|
|
|
id_counter: int = 0 # id counter for workpieces
|
|
|
- next_workpiece: Workpiece | None = None
|
|
|
+ next_color: WorkpieceColor | None = None
|
|
|
+ next_id: str | None = None # next workpiece id
|
|
|
delta_t: float = INFINITY
|
|
|
|
|
|
|
|
|
class Generator(AtomicDEVS):
|
|
|
- def __init__(self, name: str, automatic: bool, generation_delay: float = 30.0):
|
|
|
+ def __init__(self, name: str, automatic: bool, generation_delay: float = 60.0):
|
|
|
# name needs to be unique to refer to it
|
|
|
super(Generator, self).__init__(name)
|
|
|
self.out = self.addOutPort("out")
|
|
|
@@ -28,6 +28,7 @@ class Generator(AtomicDEVS):
|
|
|
|
|
|
self.state = GeneratorState(generation_delay=generation_delay, automatic=automatic)
|
|
|
if automatic:
|
|
|
+ self.state.next_color = [WorkpieceColor.BLUE, WorkpieceColor.RED, WorkpieceColor.WHITE][self.state.id_counter % 3]
|
|
|
self.state.delta_t = 0.0 # start immediately
|
|
|
|
|
|
def get_id(self) -> str:
|
|
|
@@ -42,30 +43,32 @@ class Generator(AtomicDEVS):
|
|
|
message: MqttMessage = inputs[self.mqtt_in][0]
|
|
|
if message.topic == "simulation/spawn":
|
|
|
workpiece = message.payload['workpiece']
|
|
|
- color = WorkpieceColor(workpiece['type'])
|
|
|
- self.state.next_workpiece = Workpiece(id=self.get_id(), color=color, state=workpiece["state"])
|
|
|
+ self.state.next_color = WorkpieceColor(workpiece['type'])
|
|
|
+ self.state.next_id = self.get_id()
|
|
|
|
|
|
return self.state # important, return state
|
|
|
|
|
|
def timeAdvance(self):
|
|
|
if self.state.automatic:
|
|
|
return self.state.delta_t # Next workpiece generation time
|
|
|
- elif self.state.next_workpiece:
|
|
|
- return 0.0 # immediately generate piece
|
|
|
+ elif self.state.next_color:
|
|
|
+ return 0.0 # immediately generate the next workpiece
|
|
|
else:
|
|
|
return INFINITY # idle until mqtt input
|
|
|
|
|
|
def outputFnc(self):
|
|
|
- next_workpiece = self.state.next_workpiece
|
|
|
- if self.state.automatic:
|
|
|
- next_workpiece = Workpiece(id=self.get_id(), color=WorkpieceColor.RED, state="raw")
|
|
|
-
|
|
|
+ next_workpiece = Workpiece(id=self.state.next_id, color=self.state.next_color)
|
|
|
logger.trace(f"Generator '{self.name}' outputs: {next_workpiece}")
|
|
|
return {self.out: [next_workpiece]}
|
|
|
|
|
|
def intTransition(self):
|
|
|
self.state.curr_time += self.timeAdvance()
|
|
|
- self.state.next_workpiece = None # we just output a workpiece
|
|
|
- # Determine the delay between generating the next workpiece
|
|
|
+ self.state.next_color = None # we just output a workpiece
|
|
|
+
|
|
|
+ if self.state.automatic:
|
|
|
+ self.state.next_color = [WorkpieceColor.BLUE, WorkpieceColor.RED, WorkpieceColor.WHITE][self.state.id_counter % 3]
|
|
|
+ self.state.next_id = self.get_id()
|
|
|
+
|
|
|
+ # Set the delay between generating the next workpiece
|
|
|
self.state.delta_t = self.state.generation_delay
|
|
|
return self.state # important
|