|
@@ -13,7 +13,7 @@ from CBD.simulator import Simulator
|
|
|
from CBD.realtime.threadingBackend import Platform
|
|
from CBD.realtime.threadingBackend import Platform
|
|
|
from copy import deepcopy
|
|
from copy import deepcopy
|
|
|
|
|
|
|
|
-__all__ = ['CBDRunner', 'prepare_cbd']
|
|
|
|
|
|
|
+__all__ = ['CBDRunner', 'prepare_cbd', 'CrossingDetection']
|
|
|
|
|
|
|
|
class CBDRunner(AtomicDEVS):
|
|
class CBDRunner(AtomicDEVS):
|
|
|
"""
|
|
"""
|
|
@@ -91,23 +91,66 @@ class CBDRunner(AtomicDEVS):
|
|
|
"""
|
|
"""
|
|
|
return self.state["CBD"].getBlockByName(pname).getSignal("OUT1")[-1].value
|
|
return self.state["CBD"].getBlockByName(pname).getSignal("OUT1")[-1].value
|
|
|
|
|
|
|
|
- def crossing_detection(self, signal, y, y1):
|
|
|
|
|
|
|
+ def crossing_detection(self, signal, y):
|
|
|
"""
|
|
"""
|
|
|
- Simple linear crossing root finder for a signal.
|
|
|
|
|
|
|
+ Crossing root finder for a signal.
|
|
|
|
|
|
|
|
Args:
|
|
Args:
|
|
|
signal (str): The port to detect.
|
|
signal (str): The port to detect.
|
|
|
y (float): The value to cross.
|
|
y (float): The value to cross.
|
|
|
- y1 (float): The first simulation value that was obtained.
|
|
|
|
|
|
|
|
|
|
Note:
|
|
Note:
|
|
|
This function will change in the future, presumably becoming the IVP
|
|
This function will change in the future, presumably becoming the IVP
|
|
|
- root finding algorithm.
|
|
|
|
|
|
|
+ root finding algorithm. Currently, the Illinois variant of Regula Falsi
|
|
|
|
|
+ is used.
|
|
|
"""
|
|
"""
|
|
|
|
|
+ F = lambda t: self.crossing_function(t - self.state["time"], signal)
|
|
|
t1 = self.state["time"]
|
|
t1 = self.state["time"]
|
|
|
t2 = t1 + self.state["delta_t"]
|
|
t2 = t1 + self.state["delta_t"]
|
|
|
- y2 = self.get_signal(signal)
|
|
|
|
|
- return (t2 - t1) / (y2 - y1) * (y - y1) + t1
|
|
|
|
|
|
|
+ return CrossingDetection.regula_falsi(t1, t2, y, F)
|
|
|
|
|
+
|
|
|
|
|
+ def crossing_function(self, h, signal, restore=True):
|
|
|
|
|
+ """
|
|
|
|
|
+ Computes the function value of the simulation after a specified delay.
|
|
|
|
|
+ This will be used in combination with :class:`CrossingDetection` to
|
|
|
|
|
+ find the roots of a function.
|
|
|
|
|
+
|
|
|
|
|
+ This function will first rewind the simulator such that :code:`h` can be
|
|
|
|
|
+ applied, before stepping until :code:`h` was found. Afterwards, the
|
|
|
|
|
+ simulation is possibly restored to the original state.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ h (float): The delay.
|
|
|
|
|
+ signal (str): The signal that must be read. When :code:`None`,
|
|
|
|
|
+ no signal will be read.
|
|
|
|
|
+ restore (bool): When :code:`True`, the simulation will be restored after
|
|
|
|
|
+ the value was found. Defaults to :code:`True`.
|
|
|
|
|
+ """
|
|
|
|
|
+ oh = self.get_delta()
|
|
|
|
|
+ self.simulator._rewind()
|
|
|
|
|
+ self.simulator._rewind()
|
|
|
|
|
+ self.set_delta(h)
|
|
|
|
|
+ p = self.simulator.getClock().getBlockByName("Past")
|
|
|
|
|
+ p.setValue(p.getValue() + oh)
|
|
|
|
|
+ self.simulator._do_single_step()
|
|
|
|
|
+ p.setValue(p.getValue() + oh)
|
|
|
|
|
+ self.simulator._do_single_step()
|
|
|
|
|
+
|
|
|
|
|
+ result = None
|
|
|
|
|
+ if signal is not None:
|
|
|
|
|
+ result = self.state["CBD"].getSignal(signal)[-1].value
|
|
|
|
|
+
|
|
|
|
|
+ if restore:
|
|
|
|
|
+ self.simulator._rewind()
|
|
|
|
|
+ self.simulator._rewind()
|
|
|
|
|
+ self.set_delta(oh)
|
|
|
|
|
+ p = self.simulator.getClock().getBlockByName("Past")
|
|
|
|
|
+ p.setValue(p.getValue() + oh)
|
|
|
|
|
+ self.simulator._do_single_step()
|
|
|
|
|
+ p.setValue(p.getValue() + oh)
|
|
|
|
|
+ self.simulator._do_single_step()
|
|
|
|
|
+
|
|
|
|
|
+ return result
|
|
|
|
|
|
|
|
def set_delta(self, val):
|
|
def set_delta(self, val):
|
|
|
"""
|
|
"""
|
|
@@ -125,6 +168,10 @@ class CBDRunner(AtomicDEVS):
|
|
|
return self.simulator.getClock().getInputSignal(-1, 'h').value
|
|
return self.simulator.getClock().getInputSignal(-1, 'h').value
|
|
|
|
|
|
|
|
def timeAdvance(self):
|
|
def timeAdvance(self):
|
|
|
|
|
+ """
|
|
|
|
|
+ See Also:
|
|
|
|
|
+ `PythonPDEVS documentation <https://msdl.uantwerpen.be/documentation/PythonPDEVS/ADEVS_int.html>`_
|
|
|
|
|
+ """
|
|
|
if self.state["stopped"]:
|
|
if self.state["stopped"]:
|
|
|
return INFINITY
|
|
return INFINITY
|
|
|
if self.state["request_compute"]:
|
|
if self.state["request_compute"]:
|
|
@@ -132,6 +179,10 @@ class CBDRunner(AtomicDEVS):
|
|
|
return max(0.0, self.state["delta_t"])
|
|
return max(0.0, self.state["delta_t"])
|
|
|
|
|
|
|
|
def outputFnc(self):
|
|
def outputFnc(self):
|
|
|
|
|
+ """
|
|
|
|
|
+ See Also:
|
|
|
|
|
+ `PythonPDEVS documentation <https://msdl.uantwerpen.be/documentation/PythonPDEVS/ADEVS_int.html>`_
|
|
|
|
|
+ """
|
|
|
res = {}
|
|
res = {}
|
|
|
cross = []
|
|
cross = []
|
|
|
if self.state["zero_crossing"] is not None:
|
|
if self.state["zero_crossing"] is not None:
|
|
@@ -146,6 +197,10 @@ class CBDRunner(AtomicDEVS):
|
|
|
return res
|
|
return res
|
|
|
|
|
|
|
|
def intTransition(self):
|
|
def intTransition(self):
|
|
|
|
|
+ """
|
|
|
|
|
+ See Also:
|
|
|
|
|
+ `PythonPDEVS documentation <https://msdl.uantwerpen.be/documentation/PythonPDEVS/ADEVS_int.html>`_
|
|
|
|
|
+ """
|
|
|
ta = self.timeAdvance()
|
|
ta = self.timeAdvance()
|
|
|
self.state["time"] += ta
|
|
self.state["time"] += ta
|
|
|
if self.state["request_output"]:
|
|
if self.state["request_output"]:
|
|
@@ -178,27 +233,23 @@ class CBDRunner(AtomicDEVS):
|
|
|
if (old[z] - y) * (value - y) < 0:
|
|
if (old[z] - y) * (value - y) < 0:
|
|
|
# A crossing will happen!
|
|
# A crossing will happen!
|
|
|
# print("CROSSING:", old[z], value, y)
|
|
# print("CROSSING:", old[z], value, y)
|
|
|
- cds.setdefault(self.crossing_detection(z, y, old[z]), []).append(z)
|
|
|
|
|
|
|
+ cds.setdefault(self.crossing_detection(z, y), []).append(z)
|
|
|
if len(cds) > 0:
|
|
if len(cds) > 0:
|
|
|
- oh = self.get_delta()
|
|
|
|
|
- self.simulator._rewind()
|
|
|
|
|
- self.simulator._rewind()
|
|
|
|
|
fzc = min(cds)
|
|
fzc = min(cds)
|
|
|
self.state["zero_crossing"] = cds[fzc]
|
|
self.state["zero_crossing"] = cds[fzc]
|
|
|
h = fzc - self.state["time"]
|
|
h = fzc - self.state["time"]
|
|
|
self.state["delta_t"] = h
|
|
self.state["delta_t"] = h
|
|
|
- self.set_delta(h)
|
|
|
|
|
- p = self.simulator.getClock().getBlockByName("Past")
|
|
|
|
|
- p.setValue(p.getValue() + oh)
|
|
|
|
|
- self.simulator._do_single_step()
|
|
|
|
|
- p.setValue(p.getValue() + oh)
|
|
|
|
|
- self.simulator._do_single_step()
|
|
|
|
|
- # print("CROSSING:", self.state["CBD"].getBlockByName("v").getSignal("OUT1")[-1])
|
|
|
|
|
|
|
+
|
|
|
|
|
+ self.crossing_function(h, None, False)
|
|
|
else:
|
|
else:
|
|
|
self.set_delta(float('inf'))
|
|
self.set_delta(float('inf'))
|
|
|
return self.state
|
|
return self.state
|
|
|
|
|
|
|
|
def extTransition(self, inputs):
|
|
def extTransition(self, inputs):
|
|
|
|
|
+ """
|
|
|
|
|
+ See Also:
|
|
|
|
|
+ `PythonPDEVS documentation <https://msdl.uantwerpen.be/documentation/PythonPDEVS/ADEVS_int.html>`_
|
|
|
|
|
+ """
|
|
|
self.state["time"] += self.elapsed
|
|
self.state["time"] += self.elapsed
|
|
|
|
|
|
|
|
if not self.state["stopped"]:
|
|
if not self.state["stopped"]:
|
|
@@ -271,105 +322,38 @@ def prepare_cbd(model, initials):
|
|
|
return cbd, Hname
|
|
return cbd, Hname
|
|
|
|
|
|
|
|
|
|
|
|
|
-class _CrossingDetection(AtomicDEVS):
|
|
|
|
|
|
|
+class CrossingDetection:
|
|
|
"""
|
|
"""
|
|
|
- Atomic DEVS model that allows for zero-crossing detection.
|
|
|
|
|
- This block is the generalized detector that allows detection of any
|
|
|
|
|
- crossing as well as the crossing of any range (this can be used in
|
|
|
|
|
- bang-bang control units).
|
|
|
|
|
|
|
+ Helper class that implements the crossing detection algorithms.
|
|
|
|
|
+
|
|
|
|
|
+ The following description concerns the default arguments for all functions.
|
|
|
|
|
+ On top of that, there may be some additional arguments defined, as listed in
|
|
|
|
|
+ the specific documentations below.
|
|
|
|
|
|
|
|
Args:
|
|
Args:
|
|
|
- name (str): The name of the block.
|
|
|
|
|
- ranges (dict): A dictionary where the keys are the names of the
|
|
|
|
|
- values to detect (they will become the input port
|
|
|
|
|
- names) and the values identify either a single value
|
|
|
|
|
- (for a rangeless detection) or a tuple of
|
|
|
|
|
- :code:`(low, high)` for which must be detected that
|
|
|
|
|
- the value exceeds that range.
|
|
|
|
|
- method: A callable that implements the root finding algorithm
|
|
|
|
|
- for the crossing. It takes 4 arguments
|
|
|
|
|
- :code:`p1, p2, y, f`, where :code:`p1` and :code:`p2`
|
|
|
|
|
- respectively represent the point before and after the
|
|
|
|
|
- crossing, :code:`y` is the value that was crossed; and
|
|
|
|
|
- :code:`f` identifies the function that does the crossing.
|
|
|
|
|
- This function is required when more complex root finding
|
|
|
|
|
- algorithms are to be used. This method should return a
|
|
|
|
|
- single value indicative of the time whence the crossing
|
|
|
|
|
- actually happened. Defaults to :func:`linear`.
|
|
|
|
|
- function: The function that is passed as :code:`f` in the method.
|
|
|
|
|
- **kwargs: Additional arguments to pass to the method or the function.
|
|
|
|
|
|
|
+ t1 (float): The x-value of the lower bound of the interval.
|
|
|
|
|
+ t2 (float): The x-value of the higher bound of the interval.
|
|
|
|
|
+ y (float): The value for which a crossing must be checked.
|
|
|
|
|
+ f: A function that should return the y-value for a given x (or t).
|
|
|
"""
|
|
"""
|
|
|
- def __init__(self, name, ranges, method="linear", function=None, **kwargs):
|
|
|
|
|
- super().__init__(name)
|
|
|
|
|
- self.ranges = ranges
|
|
|
|
|
- self.method = getattr(self, method) if isinstance(method, str) else method
|
|
|
|
|
- self.function = function
|
|
|
|
|
- self.kwargs = kwargs
|
|
|
|
|
-
|
|
|
|
|
- self.state = {
|
|
|
|
|
- "old": {},
|
|
|
|
|
- "new": {},
|
|
|
|
|
- "crossings": {},
|
|
|
|
|
- "time": 0.0
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- self.inputs = {}
|
|
|
|
|
- for key in self.ranges:
|
|
|
|
|
- self.inputs[key] = self.addInPort(key)
|
|
|
|
|
- self.state["old"][key] = None
|
|
|
|
|
- self.state["new"][key] = None
|
|
|
|
|
- if isinstance(self.ranges[key], (int, float)):
|
|
|
|
|
- self.ranges[key] = self.ranges[key], self.ranges[key]
|
|
|
|
|
- self.output = self.addOutPort("output")
|
|
|
|
|
-
|
|
|
|
|
- def timeAdvance(self):
|
|
|
|
|
- if len(self.state["crossings"]) > 0:
|
|
|
|
|
- return 0.0
|
|
|
|
|
- return INFINITY
|
|
|
|
|
-
|
|
|
|
|
- def intTransition(self):
|
|
|
|
|
- self.state["crossings"].clear()
|
|
|
|
|
- return self.state
|
|
|
|
|
-
|
|
|
|
|
- def outputFnc(self):
|
|
|
|
|
- return { self.output: self.state["crossings"] }
|
|
|
|
|
-
|
|
|
|
|
- def extTransition(self, inputs):
|
|
|
|
|
- self.state["time"] += self.elapsed
|
|
|
|
|
- for inp in inputs:
|
|
|
|
|
- name = inp.getPortName()
|
|
|
|
|
- self.state["old"][name] = self.state["new"][name]
|
|
|
|
|
- self.state["new"][name] = self.state["time"], inputs[inp]
|
|
|
|
|
-
|
|
|
|
|
- if self.state["old"][name] is None: continue
|
|
|
|
|
-
|
|
|
|
|
- # Detect Crossing
|
|
|
|
|
- y = None
|
|
|
|
|
- p1, p2 = self.state["old"][key], self.state["new"][key]
|
|
|
|
|
- if p1[1] >= self.ranges[name][0] > p2[1]:
|
|
|
|
|
- y = self.ranges[name][0]
|
|
|
|
|
- elif p1[1] <= self.ranges[name][1] < p2[1]:
|
|
|
|
|
- y = self.ranges[name][0]
|
|
|
|
|
- if y is not None:
|
|
|
|
|
- self.state["crossings"][name] = self.method(p1, p2, y, self.function, **self.kwargs)
|
|
|
|
|
- return self.state
|
|
|
|
|
-
|
|
|
|
|
@staticmethod
|
|
@staticmethod
|
|
|
- def linear(p1, p2, y, _=None, **kwargs):
|
|
|
|
|
|
|
+ def linear(t1, t2, y, f, **kwargs):
|
|
|
"""
|
|
"""
|
|
|
Finds the root of the crossing using linear interpolation.
|
|
Finds the root of the crossing using linear interpolation.
|
|
|
No additional function calls or arguments are used.
|
|
No additional function calls or arguments are used.
|
|
|
"""
|
|
"""
|
|
|
- t1, y1 = p1
|
|
|
|
|
- t2, y2 = p2
|
|
|
|
|
|
|
+ y1 = f(t1)
|
|
|
|
|
+ y2 = f(t2)
|
|
|
|
|
+ if y1 == y2:
|
|
|
|
|
+ return y1
|
|
|
return (t2 - t1) / (y2 - y1) * (y - y1) + t1
|
|
return (t2 - t1) / (y2 - y1) * (y - y1) + t1
|
|
|
|
|
|
|
|
@staticmethod
|
|
@staticmethod
|
|
|
- def regula_falsi(p1, p2, y, f, **kwargs):
|
|
|
|
|
|
|
+ def regula_falsi(t1, t2, y, f, **kwargs):
|
|
|
"""
|
|
"""
|
|
|
Implements the Illinois algorithm for finding the root for a crossing problem.
|
|
Implements the Illinois algorithm for finding the root for a crossing problem.
|
|
|
|
|
|
|
|
- Keyword Arguments:
|
|
|
|
|
|
|
+ Args:
|
|
|
eps (float): Half of the upper bound for the relative error.
|
|
eps (float): Half of the upper bound for the relative error.
|
|
|
Defaults to 1e-5.
|
|
Defaults to 1e-5.
|
|
|
n (int): The maximal amount of iterations to compute. Defaults to
|
|
n (int): The maximal amount of iterations to compute. Defaults to
|
|
@@ -380,12 +364,10 @@ class _CrossingDetection(AtomicDEVS):
|
|
|
"""
|
|
"""
|
|
|
eps = kwargs.get("eps", 1e-5)
|
|
eps = kwargs.get("eps", 1e-5)
|
|
|
n = kwargs.get("n", 5 * 1000 * 1000)
|
|
n = kwargs.get("n", 5 * 1000 * 1000)
|
|
|
- t1, y1 = p1
|
|
|
|
|
- t2, y2 = p2
|
|
|
|
|
- y1 -= y
|
|
|
|
|
- y2 -= y
|
|
|
|
|
|
|
+ y1 = f(t1) - y
|
|
|
|
|
+ y2 = f(t2) - y
|
|
|
assert y1 * y2 < 0, "No crossing possible in given range"
|
|
assert y1 * y2 < 0, "No crossing possible in given range"
|
|
|
- tn, yn = p1
|
|
|
|
|
|
|
+ tn, yn = t1, y1
|
|
|
side = 0
|
|
side = 0
|
|
|
for i in range(n):
|
|
for i in range(n):
|
|
|
if abs(t1 - t2) < eps * abs(t1 + t2): break
|
|
if abs(t1 - t2) < eps * abs(t1 + t2): break
|
|
@@ -409,4 +391,4 @@ if __name__ == '__main__':
|
|
|
import numpy as np
|
|
import numpy as np
|
|
|
def f(x):
|
|
def f(x):
|
|
|
return np.cos(x) - x ** 3
|
|
return np.cos(x) - x ** 3
|
|
|
- print(CrossingDetection.regula_falsi((0, f(0)), (1.5, f(1.5)), 0, f, eps=1e-5))
|
|
|
|
|
|
|
+ print(CrossingDetection.regula_falsi(0, 1.5, 0, f, eps=1e-5))
|