12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- from devs_models.crate_transporter import CrateTransporter
- from devs_models.generator import Generator
- from devs_models.high_bay_warehouse import HighBayWarehouse, InventoryPublisher
- from devs_models.dsi import DSI
- from devs_models.mpo import MPO
- from devs_models.mqtt_control_unit import MQTTControlUnit
- from devs_models.reader_station import ReaderStation
- from devs_models.simple_conveyor import SimpleConveyor
- from devs_models.sld import SLD
- from devs_models.dso import DSO
- from devs_models.vacuum_gripper import VacuumGripper
- from devs_models.collector import Collector
- from pypdevs.DEVS import CoupledDEVS
- class FischertechnikFactory(CoupledDEVS):
- def __init__(self, name: str, automatic: bool = True):
- super(FischertechnikFactory, self).__init__(name)
- # Add output port for MQTT
- self.REALTIME_OBSERVED = self.addOutPort('REALTIME_OBSERVED')
- # Create models
- self.gen: Generator = self.addSubModel(Generator("Generator", automatic=automatic))
- self.dsi: DSI = self.addSubModel(DSI("DSI"))
- self.reader: ReaderStation = self.addSubModel(ReaderStation("ReaderStation")) # NFC and Color reading station
- self.vgr: VacuumGripper = self.addSubModel(VacuumGripper("VacuumGripper"))
- self.ct: CrateTransporter = self.addSubModel(CrateTransporter("Transporter"))
- self.hbw: HighBayWarehouse = self.addSubModel(HighBayWarehouse("HighBayWarehouse"))
- self.hbw_inv: InventoryPublisher = self.addSubModel(InventoryPublisher("InventoryPublisher", 10))
- self.mpo: MPO = self.addSubModel(MPO("MPO"))
- self.conveyor: SimpleConveyor = self.addSubModel(SimpleConveyor("Conveyor"))
- self.sld: SLD = self.addSubModel(SLD("SLD"))
- self.dso: DSO = self.addSubModel(DSO("DSO"))
- self.mqtt_control: MQTTControlUnit = self.addSubModel(MQTTControlUnit("MQTTControlUnit"))
- self.collector: Collector = self.addSubModel(Collector("Collector"))
- # Connect ports
- self.connectPorts(self.gen.out, self.dsi.inp)
- self.connectPorts(self.dsi.out, self.vgr.dsi_in)
- self.connectPorts(self.vgr.dsi_out, self.dsi.vgr_in)
- self.connectPorts(self.vgr.mpo_out, self.mpo.vgr_in)
- self.connectPorts(self.vgr.nfc_out, self.reader.nfc_in)
- self.connectPorts(self.reader.nfc_out, self.vgr.nfc_in)
- self.connectPorts(self.vgr.color_sensor_out, self.reader.color_in)
- self.connectPorts(self.reader.color_out, self.vgr.color_sensor_in)
- self.connectPorts(self.vgr.ct_out, self.ct.right_in)
- self.connectPorts(self.ct.right_out, self.vgr.ct_in)
- self.connectPorts(self.ct.left_out, self.hbw.inp)
- self.connectPorts(self.hbw.out, self.ct.left_in)
- self.connectPorts(self.hbw.inventory_out, self.hbw_inv.inp)
- self.connectPorts(self.mpo.conveyor_out, self.conveyor.inp)
- self.connectPorts(self.conveyor.out, self.sld.inp)
- self.connectPorts(self.sld.red_out, self.vgr.sld_in["RED"])
- self.connectPorts(self.sld.blue_out, self.vgr.sld_in["BLUE"])
- self.connectPorts(self.sld.white_out, self.vgr.sld_in["WHITE"])
- self.connectPorts(self.vgr.sld_out["RED"], self.sld.red_in)
- self.connectPorts(self.vgr.sld_out["BLUE"], self.sld.blue_in)
- self.connectPorts(self.vgr.sld_out["WHITE"], self.sld.white_in)
- self.connectPorts(self.vgr.dso_out, self.dso.inp)
- self.connectPorts(self.dso.out, self.collector.inp)
- # Connect MQTT ports
- self.connectPorts(self.mqtt_control.mqtt_out, self.gen.mqtt_in)
- self.connectPorts(self.mqtt_control.mqtt_out, self.dsi.mqtt_in)
- self.connectPorts(self.mqtt_control.mqtt_out, self.reader.mqtt_in)
- self.connectPorts(self.mqtt_control.mqtt_out, self.vgr.mqtt_in)
- self.connectPorts(self.mqtt_control.mqtt_out, self.ct.mqtt_in)
- self.connectPorts(self.mqtt_control.mqtt_out, self.hbw.mqtt_in)
- self.connectPorts(self.mqtt_control.mqtt_out, self.mpo.mqtt_in)
- self.connectPorts(self.mqtt_control.mqtt_out, self.sld.mqtt_in)
- self.connectPorts(self.mqtt_control.mqtt_out, self.dso.mqtt_in)
- self.connectPorts(self.dsi.mqtt_out, self.mqtt_control.mqtt_in)
- self.connectPorts(self.reader.mqtt_out, self.mqtt_control.mqtt_in)
- self.connectPorts(self.vgr.mqtt_out, self.mqtt_control.mqtt_in)
- self.connectPorts(self.ct.mqtt_out, self.mqtt_control.mqtt_in)
- self.connectPorts(self.hbw.mqtt_out, self.mqtt_control.mqtt_in)
- self.connectPorts(self.hbw_inv.mqtt_out, self.mqtt_control.mqtt_in)
- self.connectPorts(self.mpo.mqtt_out, self.mqtt_control.mqtt_in)
- self.connectPorts(self.conveyor.mqtt_out, self.mqtt_control.mqtt_in)
- self.connectPorts(self.sld.mqtt_out, self.mqtt_control.mqtt_in)
- self.connectPorts(self.dso.mqtt_out, self.mqtt_control.mqtt_in)
- self.connectPorts(self.mqtt_control.REALTIME_OBSERVED, self.REALTIME_OBSERVED)
- def get_inventory(self) -> dict:
- """ Returns the inventory of the system """
- return self.hbw.state.mqtt_inventory.inventory
|