rparedis 1 tahun lalu
induk
melakukan
5a7d234355

+ 3 - 1
.idea/PoAB.iml

@@ -1,7 +1,9 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <module type="PYTHON_MODULE" version="4">
   <component name="NewModuleRootManager">
-    <content url="file://$MODULE_DIR$" />
+    <content url="file://$MODULE_DIR$">
+      <sourceFolder url="file://$MODULE_DIR$" isTestSource="false" />
+    </content>
     <orderEntry type="inheritedJdk" />
     <orderEntry type="sourceFolder" forTests="false" />
   </component>

+ 3 - 0
.idea/misc.xml

@@ -1,4 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <project version="4">
+  <component name="Black">
+    <option name="sdkName" value="Python 3.8" />
+  </component>
   <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.8" project-jdk-type="Python SDK" />
 </project>

+ 29 - 0
twin/ActualObject.py

@@ -0,0 +1,29 @@
+import pandas as pd
+import time as ptime
+from twin.networking.mqtt import MQTTClient
+
+TIME_FACTOR = 300
+
+ivef = "results-de2/plan-anomaly.csv"
+ivef = pd.read_csv(ivef, usecols=["mmsi", "start", "ETA", "source", "target", "task"], dtype={"mmsi": str})
+ivef.sort_values("start", inplace=True)
+
+initial_start = ivef.iloc[0]["start"]
+ivef["start"] -= initial_start
+ivef["ETA"] -= initial_start
+ivef["start"] /= 1000
+ivef["ETA"] /= 1000
+
+client = MQTTClient("ActualObject")
+client.spin()
+client.publish("initial_time", [initial_start / 1000, TIME_FACTOR])
+
+time = 0
+for ridx, row in ivef.iterrows():
+	if row["start"] == time:
+		client.publish("tasks", row.to_list())
+	else:
+		duration = row["start"] - time
+		ptime.sleep(duration / TIME_FACTOR)
+		client.publish("tasks", row.to_list())
+		time = row["start"]

+ 77 - 0
twin/TwinObject.py

@@ -0,0 +1,77 @@
+import pandas as pd
+
+from networking.mqtt import MQTTClient
+import networking as net
+from twin.elements import *
+from pypdevs.DEVS import CoupledDEVS
+
+class Port(CoupledDEVS):
+	def __init__(self, name, client):
+		super(Port, self).__init__(name)
+
+		self.scheduler = self.addSubModel(net.Subscriber("tasks"))
+		self.scheduler.transform = lambda m: pd.Series(data=m, index=["mmsi", "start", "ETA", "source", "target", "task"])
+		self.planner = self.addSubModel(RoutePlanner("planner"))
+		self.pool = self.addSubModel(Pool("pool"))
+		self.sailer = self.addSubModel(Sailer("sailer"))
+		self.pub = self.addSubModel(net.Publisher("pub", "vessels", client, self.transform_vessel_to_pub))
+		self.error = self.addSubModel(net.Publisher("error", "error", client))
+
+		# self.connectPorts(self.tracer.reqs, self.pool.req_in)
+		self.connectPorts(self.scheduler.out, self.planner.req_in)
+		self.connectPorts(self.planner.req_out, self.pool.req_in)
+		self.connectPorts(self.pool.vessel_out, self.sailer.vessel_in)
+		self.connectPorts(self.sailer.vessel_out, self.pool.vessel_in)
+		# TODO: add a Sailer connection that outputs the full vessel bag upon input
+		self.connectPorts(self.pool.vessel_out, self.pub.inp)
+		self.connectPorts(self.pool.error, self.error.inp)
+
+		# self.clock = self.addSubModel(Clock("clock"))
+		# self.connectPorts(self.clock.outp, self.sailer.update)
+
+	@staticmethod
+	def transform_vessel_to_pub(vessel, time):
+		return [
+			vessel.start,
+			vessel.mmsi,
+			vessel.name,
+			vessel.task,
+			vessel.velocity,
+			vessel.source,
+			vessel.target,
+			vessel.start,
+			vessel.ETA
+		]
+
+	def select(self, imm_children):
+		for child in imm_children:
+			if isinstance(child, Sailer):
+				return child
+		for child in imm_children:
+			if isinstance(child, Pool):
+				return child
+		return imm_children[0]
+
+
+if __name__ == '__main__':
+	from pypdevs.simulator import Simulator
+
+	client = MQTTClient("TwinObject")
+	model = Port("PoA", client)
+	sim = Simulator(model)
+	sim.setRealTime(True)
+	sim.setRealTimePorts({"tasks": model.scheduler.inp})
+	proc = net.SubscriptionProcess(["tasks"], sim, client)
+
+	print("Starting Simulation...")
+	sim.simulate()
+	while True:
+		try:
+			client.spin()
+			proc.step()
+			proc.interrupt()
+		except KeyboardInterrupt:
+			print("Terminating...")
+			break
+	model.pub.cleanup()
+	proc.terminate()

TEMPAT SAMPAH
twin/__pycache__/elements.cpython-38.pyc


+ 196 - 0
twin/dashboard.py

@@ -0,0 +1,196 @@
+from geoplotlib.layers import BaseLayer
+from geoplotlib.core import BatchPainter
+import geoplotlib
+from geoplotlib.colors import create_set_cmap
+from geoplotlib.utils import BoundingBox, epoch_to_str
+import time as ptime
+
+import pandas as pd
+import numpy as np
+
+from de2.routing import get_graph, euler_distance, pathfinder, find_percentage_point_in_path as fpp
+
+from twin.elements import Vessel
+
+
+class PoABLayer(BaseLayer):
+	"""
+	Renders the map.
+	"""
+
+	__colors__ = {
+		"highlight": [150, 0, 250, 200], # yellow
+		"nodes": [150, 0, 250, 200],     # purple
+		"ecological": [0, 255, 0, 200]    # green
+	}
+
+	FPS = 30
+
+	def __init__(self, client):
+		self.client = client
+		self.berths = pd.read_csv("berths.csv")
+
+		self.painter = BatchPainter()
+
+		self.ticks = 0
+		self.time = 0
+		self.initial_time = 0
+		self.last_rt_time = ptime.time()
+		self.time_factor = 1.0
+
+		self.vessels = []
+		self.graph = get_graph()
+
+
+	def update_vessels(self):
+		self.client.spin()
+		if len(self.client.received["initial_time"]) > 0:
+			self.initial_time, self.time_factor = eval(self.client.received["initial_time"].pop(0))
+		while len(self.client.received["vessels"]) > 0:
+			vessel = eval(self.client.received["vessels"].pop(0))
+			# self.time_factor = (vessel[0] - self.time) / (ptime.time() - self.last_rt_time)
+			self.time = vessel[0]
+			self.ticks = 0
+			vdict = {
+				"mmsi": vessel[1],
+				"name": vessel[2],
+				"task": vessel[3],
+				"velocity": vessel[4],
+				"source": vessel[5],
+				"target": vessel[6],
+				"start": vessel[7],
+				"ETA": vessel[8],
+			}
+			self.vessels.append(vdict)
+			self.last_rt_time = ptime.time()
+
+	def draw_shape(self, x, y):
+		"""
+		Draws the shape on the layer, at a specific location.
+
+		Args:
+			x (numeric):    The x-position to draw this shape at.
+			y (numeric):    The y-position to draw this shape at.
+		"""
+		self.painter.points(x, y, 5)
+
+	def draw_path(self, path, proj):
+		"""
+		Draws a WSG84 path on the layer.
+
+		Args:
+			path:   The path to draw as (x, y) tuples.
+			proj:   The projector.
+		"""
+		x = []
+		y = []
+		for ix in range(len(path) - 1):
+			point1 = path[ix]
+			point2 = path[ix + 1]
+			x.append(point1[0])
+			x.append(point2[0])
+			y.append(point1[1])
+			y.append(point2[1])
+		px, py = proj.lonlat_to_screen(np.asarray(x), np.asarray(y))
+		vx1 = [e for ei, e in enumerate(px) if ei % 2 == 0]
+		vx2 = [e for ei, e in enumerate(px) if ei % 2 == 1]
+		vy1 = [e for ei, e in enumerate(py) if ei % 2 == 0]
+		vy2 = [e for ei, e in enumerate(py) if ei % 2 == 1]
+		self.painter.lines(vx1, vy1, vx2, vy2, width=1.5)
+
+	def draw(self, proj, mouse_x, mouse_y, ui_manager):
+		self.ticks += 1
+		self.update_vessels()
+		if len(self.client.received["error"]) > 0:
+			print('\n'.join(self.client.received["error"]))
+			ui_manager.status('\n'.join(self.client.received["error"]))
+			self.client.received["error"].clear()
+
+		self.painter = BatchPainter()
+		tooltips = []
+
+		# Show the simulation time in top right
+		ts = self.time + (self.ticks / self.FPS) * self.time_factor
+		ui_manager.info("TIME: %s (x %.1f)" % (epoch_to_str(self.initial_time + ts), self.time_factor))
+
+		# Get all sailing vessels
+		rem = []
+		for vix, vessel in enumerate(self.vessels):
+			col = self.__colors__["ecological"]
+			eco = Vessel.ecological()
+			dist = 0
+			if vessel["velocity"] < eco[0]:
+				dist = eco[0] - vessel["velocity"]
+			elif vessel["velocity"] > eco[1]:
+				dist = vessel["velocity"] - eco[1]
+			col[0] = min(255, int(dist) * 50)
+			col[1] = 255 - col[0]
+			self.painter.set_color(col)
+
+			# Obtain (and draw) vessel trajectory
+			if "trajectory" not in vessel:
+				vessel["trajectory"] = pathfinder(self.graph, vessel["source"], vessel["target"])
+
+			path, dists = vessel["trajectory"]
+
+			# Compute distance traveled and draw a vessel there
+			# TODO: use distance_left when it should be updated
+			tot_distance = sum(dists)
+			traveled = (ts - vessel["start"]) * vessel["velocity"]
+			distance_left = tot_distance - traveled
+
+			if tot_distance == 0:
+				p1 = p2 = path[0]
+				percentage = 1.
+			else:
+				percentage = traveled / tot_distance
+				p1, p2, percentage = fpp(path, dists, percentage)
+
+			if percentage < 1:
+				self.draw_path(path, proj)
+				sx, sy = proj.lonlat_to_screen(np.asarray([p1[0]]), np.asarray([p1[1]]))
+				tx, ty = proj.lonlat_to_screen(np.asarray([p2[0]]), np.asarray([p2[1]]))
+
+				dx = (tx - sx) * percentage
+				dy = (ty - sy) * percentage
+				if euler_distance(sx + dx, sy + dy, mouse_x, mouse_y) < 10:
+					self.painter.set_color(self.__colors__["highlight"])
+					tooltips.append("%s (MMSI: %s)" % (str(vessel["name"]), str(vessel["mmsi"])))
+					tooltips.append("task: %s | ETA: %s" % (vessel["task"], epoch_to_str(vessel["ETA"])))
+					tooltips.append("velocity: %.3f m/s" % vessel["velocity"])
+					tooltips.append("dist left/total: %.3f/%.3f m" % (distance_left, tot_distance))
+
+					# self.draw_path(path, proj)
+				self.draw_shape(sx + dx, sy + dy)
+			else:
+				rem.append(vix)
+
+		for vix in reversed(rem):
+			self.vessels.pop(vix)
+
+		ui_manager.tooltip("\n".join(tooltips))
+		self.painter.batch_draw()
+
+	def bbox(self):
+		box = [max(self.berths["center_lat"]), max(self.berths["center_lon"]), min(self.berths["center_lat"]), min(self.berths["center_lon"])]
+		return BoundingBox(north=box[0], east=box[1], south=box[2], west=box[3])
+
+
+if __name__ == '__main__':
+	from networking.mqtt import MQTTClient
+	client = MQTTClient("Dashboard")
+	client.subscribe("initial_time")
+	client.subscribe("vessels")
+	client.subscribe("error")
+
+	layer = PoABLayer(client)
+
+	geoplotlib.set_window_size(640, 760)
+	geoplotlib.tiles_provider({
+		'url': lambda zoom, xtile, ytile: 'https://tile.openstreetmap.org/%d/%d/%d.png' % (zoom, xtile, ytile),
+		'tiles_dir': 'mytiles',
+		'attribution': 'Map tiles by OpenStreetMap Carto, under CC BY 3.0. Data @ OpenStreetMap contributors.'
+	})
+
+	geoplotlib.add_layer(layer)
+	geoplotlib.show()

+ 260 - 0
twin/elements.py

@@ -0,0 +1,260 @@
+import numpy as np
+from pypdevs.DEVS import AtomicDEVS, CoupledDEVS
+from pypdevs.infinity import INFINITY
+
+from dataclasses import dataclass, field
+import random
+import pandas as pd
+
+from de2.routing import get_graph, get_closest_vertex
+
+
+TUGS = pd.read_excel("20230405_Tugs.xlsx", dtype={"MMSI": str, "NAME": str, "category": str})
+
+
+@dataclass
+class Vessel:
+	mmsi: str
+	velocity: float = 0.0
+	source: tuple = None
+	target: tuple = None
+	task: str = None
+	distance_left: float = 0.0
+	total_distance: float = 0.0
+	time_until_departure: float = 0.0
+
+	start: float = 0
+	ETA: float = 0
+
+	name: str = ""
+
+	def tuple(self):
+		return self.mmsi, self.name,\
+		       self.source, self.target,\
+		       self.task, self.total_distance, self.distance_left, self.velocity
+
+	@staticmethod
+	def knots_to_mps(knots):
+		return knots * 1852. / 3600.
+
+	@staticmethod
+	def ecological():
+		return Vessel.knots_to_mps(6), Vessel.knots_to_mps(7)
+
+
+class Pool(AtomicDEVS):
+	def __init__(self, name):
+		super(Pool, self).__init__(name)
+
+		self.state = {
+			"waiting": {},
+			"should_exit": [],
+			"delayed": [],
+			"time": 0.0,
+			"errors": []
+		}
+
+		self.req_in = self.addInPort("req_in")
+		self.vessel_in = self.addInPort("vessel_in")
+		self.vessel_out = self.addOutPort("vessel_out")
+		self.error = self.addOutPort("error")
+
+		# prefill with all vessels of the simulation
+		for mmsi in TUGS["MMSI"]:
+			if isinstance(mmsi, str):
+				self.state["waiting"][mmsi] = Vessel(mmsi)
+
+	def request_vessel(self, vessel, request):
+		vessel.velocity = request["velocity"]
+		if vessel.velocity >= 0:  # requests for idling should be ignored!
+			vessel.distance_left = request["distance"]
+			vessel.total_distance = vessel.distance_left
+			# vessel.time_until_departure = vessel.distance_left / vessel.velocity
+			vessel.source = request["source_lon"], request["source_lat"]
+			vessel.target = request["target_lon"], request["target_lat"]
+			vessel.task = request["task"]
+
+			vessel.start = request["start"]
+			vessel.ETA = request["ETA"]
+			vessel.name = request["name"]
+
+			self.state["should_exit"].append(vessel)
+			if vessel.mmsi in self.state["waiting"]:
+				del self.state["waiting"][vessel.mmsi]
+
+	def extTransition(self, inputs):
+		self.state["time"] += self.elapsed
+		if self.req_in in inputs:
+			for request in inputs[self.req_in]:
+				if request["mmsi"] in self.state["waiting"]:
+					vessel = self.state["waiting"][request["mmsi"]]
+					self.request_vessel(vessel, request)
+				else:
+					self.state["errors"].append("%4.4f Vessel %s does not exist in pool - delaying request" % (self.state["time"], str(request["mmsi"])))
+					self.state["delayed"].append(request)
+
+		if self.vessel_in in inputs:
+			for vessel in inputs[self.vessel_in]:
+				if vessel.mmsi in self.state["waiting"]:
+					self.state["errors"].append("[WARN] %4.4f Cannot create duplicate vessels (mmsi = %s; d = %f)" %
+					                 (self.state["time"], str(vessel.mmsi), vessel.total_distance))
+				else:
+					vessel.source = vessel.target
+					vessel.target = None
+
+					self.state["waiting"][vessel.mmsi] = vessel
+
+					for ix, r in enumerate(self.state["delayed"]):
+						if r["mmsi"] == vessel.mmsi:
+							self.request_vessel(vessel, r)
+							self.state["delayed"].pop(ix)
+							print("[INFO] %4.4f Delay for vessel %s cleared" % (self.state["time"], r["mmsi"]))
+							break
+		return self.state
+
+	def timeAdvance(self):
+		if len(self.state["should_exit"]) > 0 or len(self.state["errors"]) > 0:
+			return 0.0
+		return INFINITY
+
+	def outputFnc(self):
+		res = {}
+		if len(self.state["should_exit"]) > 0:
+			res[self.vessel_out] = [self.state["should_exit"][0]]
+		if len(self.state["errors"]) > 0:
+			res[self.error] = self.state["errors"]
+		return res
+
+	def intTransition(self):
+		self.state["time"] += self.timeAdvance()
+		if len(self.state["should_exit"]) > 0:
+			self.state["should_exit"].pop(0)
+		if len(self.state["errors"]) > 0:
+			self.state["errors"].clear()
+		return self.state
+
+class Sailer(AtomicDEVS):
+	def __init__(self, name):
+		super(Sailer, self).__init__(name)
+
+		self.state = {
+			"vessels": [],
+			"time": 0.0
+		}
+
+		self.vessel_in = self.addInPort("vessel_in")
+		self.vessel_out = self.addOutPort("vessel_out")
+
+	def extTransition(self, inputs):
+		self.state["time"] += self.elapsed
+		self.update_vessels(self.elapsed)
+		if self.vessel_in in inputs:
+			for vessel in inputs[self.vessel_in]:
+				if vessel.distance_left == 0.0:
+					vessel.time_until_departure = 0.0
+				else:
+					vessel.time_until_departure = vessel.distance_left / vessel.velocity
+				self.state["vessels"].append(vessel)
+		self.state["vessels"].sort(key=lambda v: v.time_until_departure)
+		return self.state
+
+	def update_vessels(self, elapsed):
+		for vessel in self.state["vessels"]:
+			x = vessel.velocity * elapsed
+			vessel.distance_left = max(0.0, vessel.distance_left - x)
+			vessel.time_until_departure = round(max(0.0, vessel.time_until_departure - elapsed), 6)
+
+	def timeAdvance(self):
+		if len(self.state["vessels"]) > 0:
+			v = self.state["vessels"][0]
+			return v.time_until_departure
+		return INFINITY
+
+	def outputFnc(self):
+		if len(self.state["vessels"]) > 0:
+			return {
+				self.vessel_out: [self.state["vessels"][0]]
+			}
+		return {}
+
+	def intTransition(self):
+		elapsed = self.timeAdvance()
+		self.state["time"] += elapsed
+		self.state["vessels"].pop(0)
+		self.update_vessels(elapsed)
+		if len(self.state["vessels"]) > 0:
+			self.state["vessels"].sort(key=lambda v: v.time_until_departure)
+		return self.state
+
+
+class RoutePlanner(AtomicDEVS):
+	def __init__(self, name):
+		super(RoutePlanner, self).__init__(name)
+
+		self.graph = get_graph()
+
+		self.state = {
+			"request": []
+		}
+
+		self.req_in = self.addInPort("req_in")
+		self.req_out = self.addOutPort("req_out")
+
+	def timeAdvance(self):
+		if len(self.state["request"]) == 0:
+			return INFINITY
+		return 0.0
+
+	def extTransition(self, inputs):
+		if self.req_in in inputs:
+			for request in inputs[self.req_in]:
+				tug = TUGS[TUGS["MMSI"] == str(request["mmsi"])].iloc[0]
+				request["name"] = tug["NAME"]
+
+				request["source_lon"], request["source_lat"] = eval(request["source"])
+				request["target_lon"], request["target_lat"] = eval(request["target"])
+
+				src_vertex, _ = get_closest_vertex(self.graph, request["source_lon"], request["source_lat"])
+				tgt_vertex, _ = get_closest_vertex(self.graph, request["target_lon"], request["target_lat"])
+
+				request["distance"] = self.graph.distances([src_vertex.index], [tgt_vertex.index], weights="distance", mode="all")[0][0]
+
+				# TODO: Use the fastest possible ecological velocity to apply to the trajectory
+				#       Doing the task faster than ETA-start is also allowed!
+				request["velocity"] = request["distance"] / (request["ETA"] - request["start"])
+				request["velocity"] = max(request["velocity"], Vessel.ecological()[1])
+
+				self.state["request"].append(request)
+		return self.state
+
+	def outputFnc(self):
+		if len(self.state["request"]) == 0:
+			return {}
+		return { self.req_out: [self.state["request"][0]] }
+
+	def intTransition(self):
+		if len(self.state["request"]) > 0:
+			self.state["request"].pop(0)
+		return self.state
+
+
+class Clock(AtomicDEVS):
+	def __init__(self, name, interval=1):
+		super(Clock, self).__init__(name)
+		self.interval = interval
+		self.state = {
+			"time": 0.0
+		}
+		self.outp = self.addOutPort("outp")
+
+	def timeAdvance(self):
+		return self.interval
+
+	def outputFnc(self):
+		return {
+			self.outp: [True]
+		}
+
+	def intTransition(self):
+		self.state["time"] += self.timeAdvance()
+		return self.state

+ 101 - 0
twin/networking/__init__.py

@@ -0,0 +1,101 @@
+from pypdevs.DEVS import AtomicDEVS
+from pypdevs.infinity import INFINITY
+
+
+class NetClient:
+    def __init__(self):
+        self.received = {}
+
+    def disconnect(self):
+        raise NotImplementedError()
+
+    def spin(self):
+        raise NotImplementedError()
+
+    def publish(self, topic, msg):
+        raise NotImplementedError()
+
+    def subscribe(self, topic):
+        raise NotImplementedError()
+
+    def poller(self, topic):
+        raise NotImplementedError()
+
+
+class Publisher(AtomicDEVS):
+    def __init__(self, name, topic, client, transform=lambda d, t: d):
+        super(Publisher, self).__init__(name)
+
+        self.topic = topic
+        self.node = client
+        self.state = {
+            "time": 0
+        }
+
+        self.inp = self.addInPort("inp")
+        self.transform = transform
+
+    def cleanup(self):
+        self.node.disconnect()
+
+    def extTransition(self, inputs):
+        self.state["time"] += self.elapsed
+        if self.inp in inputs:
+            data = self.transform(inputs[self.inp][0], self.state["time"])
+            self.node.publish(self.topic, data)
+        return self.state
+
+
+class Subscriber(AtomicDEVS):
+    def __init__(self, name):
+        super(Subscriber, self).__init__(name)
+
+        self.state = {
+            "data": None
+        }
+
+        self.inp = self.addInPort("inp")
+        self.out = self.addOutPort("out")
+        self.transform = lambda d: d
+
+    def timeAdvance(self):
+        if self.state["data"] is not None:
+            return 0
+        return INFINITY
+
+    def extTransition(self, inputs):
+        if self.inp in inputs:
+            self.state["data"] = self.transform(inputs[self.inp][0])
+        return self.state
+
+    def outputFnc(self):
+        if self.state["data"] is not None:
+            return { self.out: [self.state["data"]] }
+        return {}
+
+    def intTransition(self):
+        if self.state["data"] is not None:
+            self.state["data"] = None
+        return self.state
+
+
+class SubscriptionProcess:
+    def __init__(self, topics, simulator, client):
+        self.client = client
+        self.simulator = simulator
+        for topic in topics:
+            self.client.subscribe(topic)
+
+    def interrupt(self):
+        for topic, data in self.client.received.items():
+            if len(data) > 0:
+                event = data.pop(0)
+                # print("%s (%s)" % (topic, event))
+                self.simulator.realtime_interrupt("%s (%s)" % (topic, event))
+                break
+
+    def step(self):
+        return self.client.spin()
+
+    def terminate(self):
+        self.client.disconnect()

TEMPAT SAMPAH
twin/networking/__pycache__/__init__.cpython-38.pyc


TEMPAT SAMPAH
twin/networking/__pycache__/mqtt.cpython-38.pyc


+ 87 - 0
twin/networking/mqtt.py

@@ -0,0 +1,87 @@
+# To use the MQTT client, a message broker is required
+#  Mosquitto is a viable option for local development:
+#     C:\'Program Files'\mosquitto\mosquitto -v
+#  Alternatively, there is a free public broker available on:
+#     broker.emqx.io
+# Also, the paho-mqtt package needs to be installed.
+
+from paho.mqtt import client as mqtt_client
+from paho.mqtt.enums import CallbackAPIVersion
+
+import time
+from twin.networking import NetClient
+
+class MQTTClient(NetClient):
+    def __init__(self, name):
+        super().__init__()
+        self.broker = 'localhost'  # Free Public Broker: 'broker.emqx.io'
+        self.port = 1883
+        self.client_id = name
+        self.client = None
+        self.connected = False
+
+        self.connect()
+
+    def connect(self):
+        # Set Connecting Client ID
+        self.client = mqtt_client.Client(CallbackAPIVersion.VERSION2, self.client_id)
+        # client.username_pw_set(username, password)
+        self.client.on_connect = self.on_connect
+        self.client.on_disconnect = MQTTClient.on_disconnect
+        self.client.connect(self.broker, self.port, 0)
+
+    def on_connect(self, client, userdata, flags, rc, properties):
+        if rc == 0:
+            print("Connected to MQTT Broker!")
+            self.connected = True
+        else:
+            print("Failed to connect, return code %d\n" % rc)
+
+    @staticmethod
+    def on_disconnect(client, userdata, df, rc, properties):
+        FIRST_RECONNECT_DELAY = 1
+        RECONNECT_RATE = 2
+        MAX_RECONNECT_COUNT = 12
+        MAX_RECONNECT_DELAY = 60
+
+        print("Disconnected from MQTT Broker! Status:", rc)
+        if rc == 0: return
+
+        reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
+        while reconnect_count < MAX_RECONNECT_COUNT:
+            print("Reconnecting in %d seconds..." % reconnect_delay)
+            time.sleep(reconnect_delay)
+
+            try:
+                client.reconnect()
+                print("Reconnected successfully!")
+                return
+            except Exception as err:
+                print("%s. Reconnect failed. Retrying..." % err)
+
+            reconnect_delay *= RECONNECT_RATE
+            reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
+            reconnect_count += 1
+        print("Reconnect failed after %s attempts. Exiting..." % reconnect_count)
+
+    def on_message(self, client, userdata, msg):
+        topic = msg.topic
+        data = msg.payload.decode()
+        self.received[topic].append(data)
+
+    def disconnect(self):
+        self.client.disconnect()
+
+    def spin(self):
+        code = self.client.loop(0)
+        if code != 0:
+            raise ValueError("Spin failed with error %s" % code)
+
+    def publish(self, topic, msg):
+        self.client.publish(topic, str(msg))
+
+    def subscribe(self, topic):
+        self.received[topic] = []
+
+        self.client.subscribe(topic)
+        self.client.on_message = self.on_message