Pārlūkot izejas kodu

Started updating threading backend to be better understood and simpler

rparedis 3 gadi atpakaļ
vecāks
revīzija
3ed8605719

+ 4 - 0
doc/examples/Dashboard.rst

@@ -83,6 +83,10 @@ input fields for the variables :code:`A` (the amplitude) and :code:`B` (proporti
 and :func:`set_period` functions make use of the ability of setting a :class:`CBD.lib.std.ConstantBlock`'s value
 during runtime. Take a look at the corresponding documentations for more information.
 
+.. danger::
+    Do not alter the window closing protocol of the :code:`tkinter` root! It is automatically altered to ensure all
+    threads are closed.
+
 .. code-block:: python
 
     label = tk.Label(root, text="y = 1.00 * sin(1.00 * t)")

+ 2 - 0
examples/scripts/Dashboard/SinGen_experiment.py

@@ -5,6 +5,7 @@ import tkinter as tk
 from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
 
 fig = plt.figure(figsize=(15, 5), dpi=100)
+fig.tight_layout()
 ax = fig.add_subplot(111)
 ax.set_ylim((-1, 1))
 
@@ -52,6 +53,7 @@ from CBD.simulator import Simulator
 sim = Simulator(cbd)
 sim.setRealTime()
 sim.setRealTimePlatformTk(root)
+# sim.setTerminationTime(10)
 sim.setDeltaT(0.1)
 sim.run()
 root.mainloop()

+ 21 - 4
src/CBD/realtime/plotting.py

@@ -3,7 +3,8 @@ This module contains all classes and structures needed for live plotting of data
 polling an object. While specifically made for the CBD simulator, it is independent
 of any internal workings.
 """
-import math
+import time
+import threading
 try:
 	import matplotlib.pyplot as plt
 	import matplotlib.animation as animation
@@ -161,6 +162,7 @@ class PlotHandler:
 		self.object = object
 		self.kind = kind
 		self.figure = figure
+		self.interval = interval
 
 		self.kind._backend = backend
 		self.elm = self.kind.create(figure)
@@ -170,6 +172,9 @@ class PlotHandler:
 
 		# obtaining information
 		self.__get_data = lambda obj: obj.data_xy
+		self.__event_bus = []
+		self.__event_thread = threading.Thread(target=self.__event_thread_loop)
+		self.__event_thread.start()
 		self.__events = {
 			"preupdate": [],
 			"update": [],
@@ -181,7 +186,7 @@ class PlotHandler:
 			if Backend.compare("MPL", backend) or Backend.compare("SNS", backend):
 				self.__ani = animation.FuncAnimation(figure[0], lambda _: self.update(),
 				                                     interval=interval, frames=frames)
-				figure[0].canvas.mpl_connect('close_event', lambda evt: self.__close_event())
+				figure[0].canvas.mpl_connect('close_event', lambda _: self.__close_event())
 			elif Backend.compare("BOKEH", backend):
 				curdoc().add_periodic_callback(lambda: self.update(), interval)
 				# TODO (is this even possible?):
@@ -214,8 +219,10 @@ class PlotHandler:
 		"""
 		if name not in self.__events:
 			raise ValueError("Invalid signal '%s' in PlotHandler." % name)
-		for evt in self.__events[name]:
-			evt(*args)
+		# print("SIGNAL:", name)
+		self.__event_bus.append((name, args))
+		# for evt in self.__events[name]:
+		# 	evt(*args)
 
 	def connect(self, name, function):
 		"""
@@ -246,6 +253,14 @@ class PlotHandler:
 			raise ValueError("Invalid signal '%s' in PlotHandler." % name)
 		self.__events[name].append(function)
 
+	def __event_thread_loop(self):
+		while not self.__finished:
+			while len(self.__event_bus) > 0:
+				name, args = self.__event_bus.pop()
+				for evt in self.__events[name]:
+					evt(*args)
+			time.sleep(self.interval / 1000)
+
 	def set_data_getter(self, function):
 		"""
 		Sets the data getter function call. By default, the :code:`data_xy` attribute
@@ -298,6 +313,7 @@ class PlotHandler:
 			self.__opened = False
 			self.stop()
 			self.signal('closed')
+			# self.__event_thread.join()
 
 	def is_opened(self):
 		"""
@@ -309,6 +325,7 @@ class PlotHandler:
 		"""
 		Updates the plot information.
 		"""
+		# print("update")
 		data = self.get_data()
 		if not self.__hidden:
 			self.signal('preupdate', data)

+ 13 - 48
src/CBD/realtime/threadingBackend.py

@@ -16,6 +16,7 @@
 from ..util import enum
 import threading
 
+
 class Platform:
     """
     Identifies the platform to use in real-time simulation.
@@ -77,8 +78,7 @@ class Platform:
     """
 
 
-
-class ThreadingBackend(object):
+class ThreadingBackend:
     """
     Wrapper around the actual threading backend.
     It will also handle interrupts and the passing of them to the calling thread.
@@ -97,16 +97,16 @@ class ThreadingBackend(object):
         self.value_lock = threading.Lock()
         if subsystem.lower() == Platform.THREADING:
             from .threadingPython import ThreadingPython
-            self.subsystem = ThreadingPython()
+            self._subsystem = ThreadingPython()
         elif subsystem.lower() == Platform.TKINTER:
             from .threadingTkInter import ThreadingTkInter
-            self.subsystem = ThreadingTkInter(*args)
+            self._subsystem = ThreadingTkInter(*args)
         elif subsystem.lower() == Platform.GAMELOOP:
             from .threadingGameLoop import ThreadingGameLoop
-            self.subsystem = ThreadingGameLoop()
+            self._subsystem = ThreadingGameLoop()
         elif subsystem.lower() == Platform.GLA:
             from .threadingGameLoopAlt import ThreadingGameLoopAlt
-            self.subsystem = ThreadingGameLoopAlt()
+            self._subsystem = ThreadingGameLoopAlt()
         else:
             raise Exception("Realtime subsystem not found: " + str(subsystem))
 
@@ -115,46 +115,11 @@ class ThreadingBackend(object):
         A non-blocking call, which will call the :code:`func` parameter after
         :code:`time` seconds. It will use the provided backend to do this.
 
-        :param time: time to wait in seconds, a float is possible
-        :param func: the function to call after the time has passed
-        """
-        self.subsystem.wait(time, func)
-
-    def interrupt(self, value):
-        """
-        Interrupt a running wait call.
-
-        :param value: the value that interrupts
-        """
-        self.interrupted_value = value
-        self.subsystem.interrupt()
-
-    def setInterrupt(self, value):
-        """
-        Sets the value of the interrupt. This should not be used manually and is
-        only required to prevent the asynchronous combo generator from making
-        :func:`interrupt` calls.
-        
-        :param value: value with which the interrupt variable should be set
-        """
-        with self.value_lock:
-            if self.interrupted_value is None:
-                self.interrupted_value = value
-                return True
-            else:
-                # The interrupt was already set, indicating a collision!
-                return False
-
-    def getInterrupt(self):
-        """
-        Return the value of the interrupt and clear it internally.
-
-        :returns: the interrupt
+        Args:
+            time (float):       Time to wait in seconds.
+            func (callable):    The function to call after the time has passed.
         """
-        with self.value_lock:
-            val = self.interrupted_value
-            self.interrupted_value = None
-        return val
+        self._subsystem.wait(time, func)
 
     def step(self, time=0.0):
         """
@@ -164,7 +129,7 @@ class ThreadingBackend(object):
             time (float):   The current simulation time. Only used if the alternative
                             gameloop backend is used.
         """
-        if hasattr(self.subsystem, "time"):
-            self.subsystem.step(time)
+        if self._subsystem.__class__ == "ThreadingGameLoopAlt":
+            self._subsystem.step(time)
         else:
-            self.subsystem.step()
+            self._subsystem.step()

+ 10 - 23
src/CBD/realtime/threadingPython.py

@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import time
 from threading import Event, Thread, Lock
 from . import accurate_time as time
 
@@ -20,43 +21,29 @@ class ThreadingPython(object):
     """
     Simple Python threads subsystem
     """
-    def __init__(self):
+    def wait(self, time, func):
         """
-        Constructor
-        """
-        self.evt = Event()
-        self.evt_lock = Lock()
-
-    def wait(self, delay, func):
-        """
-        Wait for the specified time, or faster if interrupted
+        Wait for the specified time.
 
-        :param delay: time to wait
-        :param func: the function to call
+        Args:
+            time (float):       Time to wait.
+            func (callable):    The function to call.
         """
         #NOTE this call has a granularity of 5ms in Python <= 2.7.x in the worst case, so beware!
         #     the granularity seems to be much better in Python >= 3.x
-        p = Thread(target=ThreadingPython.callFunc, args=[self, delay, func])
+        p = Thread(target=ThreadingPython.callFunc, args=[self, time, func])
         p.daemon = True
         p.start()
 
-    def interrupt(self):
-        """
-        Interrupt the waiting thread
-        """
-        self.evt.set()
-
     def callFunc(self, delay, func):
         """
         Function to call on a seperate thread: will block for the
-        specified time and call the function afterwards
+        specified time and call the function afterwards.
 
         Args:
             delay (float):  The wait delay.
             func:           The function to call. No arguments can be
                             used and no return values are needed.
         """
-        with self.evt_lock:
-            self.evt.wait(delay)
-            func()
-            self.evt.clear()
+        time.sleep(delay)
+        func()

+ 12 - 63
src/CBD/realtime/threadingTkInter.py

@@ -13,74 +13,23 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-def tkMainThreadPoller(tk, queue):
+class ThreadingTkInter:
     """
-    The polling function to register with Tk at the start. This will do the actual scheduling in Tk.
+    Tk Inter subsystem for realtime simulation.
 
-    :param tk: the Tk instance to use
-    :param queue: the queue to check
-    """
-    global tkRunningID
-    while 1:
-        try:
-            time, func = queue.popleft()
-            tkRunningID = tk.after(time, func)
-        except TypeError:
-            # Was an invalidation call
-            try:
-                if tkRunningID is not None:
-                    tk.after_cancel(tkRunningID)
-            except IndexError:
-                # Nothing to cancel
-                pass
-            tkRunningID = None
-        except IndexError:
-            break
-    tk.after(10, tkMainThreadPoller, tk, queue)
-
-class ThreadingTkInter(object):
-    """
-    Tk Inter subsystem for realtime simulation
+    Args:
+        tk: TkInter root element.
     """
     def __init__(self, tk):
-        """
-        Constructor
-
-        :param queue: the queue object that is also used by the main thread to put events on the main Tk object
-        """
-        self.runningID = None
-        self.last_infinity = False
-        self.func = None
-        import collections
-        queue = collections.deque()
-        self.queue = queue
-        tk.after(10, tkMainThreadPoller, tk, queue)
+        self.tk = tk
 
-    def unlock(self):
+    def wait(self, time, func):
         """
-        Unlock the waiting thread
-        """
-        # Don't get it normally, as it would seem like a method call
-        getattr(self, "func")()
-
-    def wait(self, t, func):
-        """
-        Wait for the specified time, or faster if interrupted
+        Wait for the specified time.
 
-        :param t: time to wait
-        :param func: the function to call
-        """
-        if t == float('inf'):
-            self.last_infinity = True
-        else:
-            self.last_infinity = False
-            self.func = func
-            self.queue.append((int(t*1000), self.unlock))
-
-    def interrupt(self):
-        """
-        Interrupt the waiting thread
+        Args:
+            time (float):       Time to wait.
+            func (callable):    The function to call.
         """
-        if not self.last_infinity:
-            self.queue.append(None)
-        self.unlock()
+        if time == float("inf"): return   # Undefined behaviour for CBD
+        self.tk.after(int(time * 1000), func)

+ 36 - 10
src/CBD/simulator.py

@@ -1,3 +1,4 @@
+import sys
 import time
 import threading
 from . import naivelog
@@ -99,6 +100,8 @@ class Simulator:
 
 		self.__lasttime = None
 
+		self.__event_bus = []
+		# self.__event_thread = threading.Thread(target=self.__event_thread_loop)
 		self.__events = {
 			"started": [],
 			"finished": [],
@@ -144,6 +147,11 @@ class Simulator:
 			self.__realtime_start_time = time.time()
 			self.__lasttime = 0.0
 
+		# self.__event_thread.start()
+		# self.__threading_backend.wait(0, self.__event_thread_loop)
+		# self.__event_thread_loop()
+		self.__threading_backend_args[0].after(3000, lambda: print("hey"))
+		self.__threading_backend.wait(2, lambda: print("ho"))
 		self.signal("started")
 		if self.__realtime:
 			# Force execute the first iteration now. This way iteration n (n > 0) is
@@ -161,12 +169,13 @@ class Simulator:
 		"""
 		Terminate the simulation.
 		"""
-		self.__finished = True
 		if not self.__progress:
 			# Whenever the progress bar is initialized, wait until it ends
 			self.__progress_finished = True
 		self.__tracer.stopTracers()
 		self.signal("finished")
+		self.__finished = True
+		# self.__event_thread.join()
 
 	def __check(self):
 		"""
@@ -179,7 +188,7 @@ class Simulator:
 		"""
 		ret = self.__stop_requested
 		if self.__termination_condition is not None:
-			ret = self.__termination_condition(self.model, self.__sim_data[2])
+			ret = ret or self.__termination_condition(self.model, self.__sim_data[2])
 		return ret or self.__termination_time <= self.getTime()
 
 	def stop(self):
@@ -398,6 +407,13 @@ class Simulator:
 		self.__threading_backend_subsystem = subsystem
 		self.__threading_backend_args = args
 
+		# TODO: allow the user to also do things here?
+		# if subsystem == Platform.TKINTER:
+		# 	def cls(s, t):
+		# 		s.__finish()
+		# 		t.destroy()
+		# 	args[0].protocol("WM_DELETE_WINDOW", lambda a=self, b=args[0]: cls(a, b))
+
 	def setRealTimePlatformThreading(self):
 		"""
 		Wrapper around the :func:`setRealTimePlatform` call to automatically
@@ -480,7 +496,8 @@ class Simulator:
 		"""
 		self.signal("prestep")
 		curIt = self.__sim_data[2]
-		self.__tracer.traceNewIteration(curIt, self.getTime())
+		# self.__threading_backend.wait(10, lambda: self.__tracer.traceNewIteration(curIt, self.getTime()))
+
 		# Efficiency reasons: dep graph only changes at these times
 		#   in the given set of library blocks.
 		# TODO: Must be set to "every time" instead.
@@ -563,7 +580,7 @@ class Simulator:
 				block = component[0]  # the strongly connected component has a single element
 				if curIteration == 0 or self.__scheduler.mustCompute(block, self.getTime()):
 					block.compute(curIteration)
-					self.__tracer.traceCompute(curIteration, block)
+					# self.__threading_backend.wait(10, lambda: self.__tracer.traceCompute(curIteration, block))
 			else:
 				# Detected a strongly connected component
 				self.__solver.checkValidity(self.model.getPath(), component)
@@ -573,7 +590,7 @@ class Simulator:
 					if curIteration == 0 or self.__scheduler.mustCompute(block, self.getTime()):
 						blockIndex = component.index(block)
 						block.appendToSignal(solutionVector[blockIndex])
-						self.__tracer.traceCompute(curIteration, block)
+						# self.__threading_backend.wait(10, lambda: self.__tracer.traceCompute(curIteration, block))
 
 	def __hasCycle(self, component, depGraph):
 		"""
@@ -635,9 +652,8 @@ class Simulator:
 							whenever the event is raised.
 
 		Warning:
-			The more computationally expensive the set of connected signals is, the less
-			precise real-time simulation will be. A signal is meant to have a little hook
-			on when certain events happen. It is **not** meant for complex data analysis.
+			While these functions will be handled on a different thread, it heavy
+			computations will eventually result in a delay of all events.
 		"""
 		if name not in self.__events:
 			raise ValueError("Invalid signal '%s' in Simulator." % name)
@@ -670,8 +686,18 @@ class Simulator:
 		"""
 		if name not in self.__events:
 			raise ValueError("Invalid signal '%s' in Simulator." % name)
-		for evt in self.__events[name]:
-			evt(*args)
+		self.__event_bus.append((name, args))
+
+	def __event_thread_loop(self):
+		print("looping")
+		if not self.__finished:
+			while len(self.__event_bus) > 0:
+				name, args = self.__event_bus.pop()
+				for evt in self.__events[name]:
+					evt(*args)
+			# time.sleep(0.05)
+			print("hello there")
+			self.__threading_backend.wait(0.05, self.__event_thread_loop)
 
 	def setCustomTracer(self, *tracer):
 		"""

+ 9 - 2
src/CBD/tracers/baseTracer.py

@@ -2,6 +2,8 @@ import sys
 import time
 from .color import COLOR
 
+# TODO: do this on another thread to prevent simulation slowdown
+#       this can possibly be done using the pubsub architecture
 class BaseTracer:
 	"""
 	Base class for all tracers.
@@ -22,6 +24,7 @@ class BaseTracer:
 		self.filename = filename
 		self.file = None
 		self.width = 80
+		self.__active = False
 
 	def openFile(self, recover=False):
 		"""
@@ -58,7 +61,9 @@ class BaseTracer:
 		See Also:
 			:func:`openFile`
 		"""
-		self.openFile(recover)
+		if not self.__active:
+			self.__active = True
+			self.openFile(recover)
 
 	def stopTracer(self):
 		"""
@@ -67,7 +72,9 @@ class BaseTracer:
 		See Also:
 			:func:`closeFile`
 		"""
-		self.closeFile()
+		if self.__active:
+			self.__active = False
+			self.closeFile()
 
 	def traceNewIteration(self, curIt, time):
 		"""