|
|
@@ -15,6 +15,156 @@ from copy import deepcopy
|
|
|
|
|
|
__all__ = ['CBDRunner', 'prepare_cbd', 'CrossingDetection']
|
|
|
|
|
|
+import math
|
|
|
+class CrossingDetection:
|
|
|
+ """
|
|
|
+ Helper class that implements the crossing detection algorithms.
|
|
|
+
|
|
|
+ The following description concerns the default arguments for all functions.
|
|
|
+ On top of that, there may be some additional arguments defined, as listed in
|
|
|
+ the specific documentations below.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ t1 (float): The x-value of the lower bound of the interval.
|
|
|
+ t2 (float): The x-value of the higher bound of the interval.
|
|
|
+ y (float): The value for which a crossing must be checked.
|
|
|
+ f: A function that should return the y-value for a given x (or t).
|
|
|
+ """
|
|
|
+ @staticmethod
|
|
|
+ def linear(t1, t2, y, f, **kwargs):
|
|
|
+ """
|
|
|
+ Finds the root of the crossing using linear interpolation.
|
|
|
+ No additional function calls or arguments are used.
|
|
|
+ """
|
|
|
+ y1 = f(t1)
|
|
|
+ y2 = f(t2)
|
|
|
+ if y1 == y2:
|
|
|
+ return y1
|
|
|
+ return (t2 - t1) / (y2 - y1) * (y - y1) + t1
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def regula_falsi(t1, t2, y, f, **kwargs):
|
|
|
+ """
|
|
|
+ Implements the Illinois algorithm for finding the root for a crossing problem.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ eps (float): Half of the upper bound for the relative error.
|
|
|
+ Defaults to 1e-5.
|
|
|
+ n (int): The maximal amount of iterations to compute. Defaults to
|
|
|
+ 5 million iterations.
|
|
|
+
|
|
|
+ See Also:
|
|
|
+ https://en.wikipedia.org/wiki/Regula_falsi
|
|
|
+ """
|
|
|
+ eps = kwargs.get("eps", 1e-5)
|
|
|
+ n = kwargs.get("n", 5 * 1000 * 1000)
|
|
|
+ y1 = f(t1) - y
|
|
|
+ y2 = f(t2) - y
|
|
|
+ assert y1 * y2 < 0, "No crossing possible in given range"
|
|
|
+ tn, yn = t1, y1
|
|
|
+ side = 0
|
|
|
+ for i in range(n):
|
|
|
+ if abs(t1 - t2) < eps * abs(t1 + t2): break
|
|
|
+ tn = (y1 * t2 - y2 * t1) / (y1 - y2)
|
|
|
+ yn = f(tn) - y
|
|
|
+ if yn * y2 > 0:
|
|
|
+ t2, y2 = tn, yn
|
|
|
+ if side == -1:
|
|
|
+ y1 /= 2
|
|
|
+ side = -1
|
|
|
+ elif yn * y1 > 0:
|
|
|
+ t1, y1 = tn, yn
|
|
|
+ if side == 1:
|
|
|
+ y2 /= 2
|
|
|
+ side = 1
|
|
|
+ else:
|
|
|
+ break
|
|
|
+ return tn
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def ITP(t1, t2, y, f, **kwargs):
|
|
|
+ r"""Implements the Interpolation-Truncation-Projection algorithm for finding
|
|
|
+ the root of a function.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ eps (float): Minimal interval size. Defaults to 1e-5.
|
|
|
+ k1 (float): First truncation size hyperparameter. Must be in the
|
|
|
+ range of :math:`(0, \infty)`. Defaults to 0.1.
|
|
|
+ k2 (float): Second truncation size hyperparameter. Must be in the
|
|
|
+ range of :math:`[1, 1 + \frac{1}{2}(1 + \sqrt{5})]`.
|
|
|
+ Defaults to 1.5.
|
|
|
+ n0 (float): Slack variable to control the size of the interval for
|
|
|
+ the projection step. Must be in :math:`[0, \infty)`.
|
|
|
+ When 0, the average number of iterations will be less
|
|
|
+ than that of the bisection method. Defaults to 0.
|
|
|
+
|
|
|
+ See Also:
|
|
|
+ https://en.wikipedia.org/wiki/ITP_method
|
|
|
+ """
|
|
|
+ k1 = kwargs.get("k1", 0.1)
|
|
|
+ k2 = kwargs.get("k2", 1.5)
|
|
|
+ eps = kwargs.get("eps", 1e-5)
|
|
|
+ n0 = kwargs.get("n0", 0)
|
|
|
+
|
|
|
+ assert 0 < k1, "For ITP, k1 must be strictly positive."
|
|
|
+ assert 1 <= k2 <= (1 + (1. + 5 ** 0.5) / 2.), "For ITP, k2 must be in [1, 1 + phi]."
|
|
|
+ assert 0 <= n0, "For ITP, n0 must be positive or zero."
|
|
|
+
|
|
|
+ sign = lambda x: 1 if x > 0 else (-1 if x < 0 else 0)
|
|
|
+
|
|
|
+ a = t1
|
|
|
+ b = t2
|
|
|
+ ya = f(a) - y
|
|
|
+ yb = f(b) - y
|
|
|
+
|
|
|
+ if ya == 0:
|
|
|
+ return a
|
|
|
+ if yb == 0:
|
|
|
+ return b
|
|
|
+ assert ya * yb < 0, "No crossing possible in given range"
|
|
|
+
|
|
|
+ # Preprocessing
|
|
|
+ nh = math.ceil(math.log2((b - a) / (2 * eps)))
|
|
|
+ nm = nh + n0
|
|
|
+ j = 0
|
|
|
+
|
|
|
+ while (b - a) > (2 * eps):
|
|
|
+ xh = (a + b) / 2
|
|
|
+ r = eps * 2 ** (nm - j) - (b - a) / 2
|
|
|
+ d = k1 * (b - a) ** k2
|
|
|
+
|
|
|
+ # Interpolation
|
|
|
+ xf = (yb * a - ya * b) / (yb - ya)
|
|
|
+
|
|
|
+ # Truncation
|
|
|
+ s = sign(xh - xf)
|
|
|
+ if d <= abs(xh - xf):
|
|
|
+ xt = xf + s * d
|
|
|
+ else:
|
|
|
+ xt = xh
|
|
|
+
|
|
|
+ # Projection
|
|
|
+ if abs(xt - xh) <= r:
|
|
|
+ xI = xt
|
|
|
+ else:
|
|
|
+ xI = xh - s * r
|
|
|
+
|
|
|
+ # Update Interval
|
|
|
+ yI = f(xI) - y
|
|
|
+ if (ya - yb) * yI < 0:
|
|
|
+ b = xI
|
|
|
+ yb = yI
|
|
|
+ elif (ya - yb) * yI > 0:
|
|
|
+ a = xI
|
|
|
+ ya = yI
|
|
|
+ else:
|
|
|
+ a = xI
|
|
|
+ b = xI
|
|
|
+ j += 1
|
|
|
+
|
|
|
+ return (a + b) / 2
|
|
|
+
|
|
|
+from CBD.converters.CBDDraw import draw
|
|
|
class CBDRunner(AtomicDEVS):
|
|
|
"""
|
|
|
Atomic DEVS model that can be used to execute a CBD model.
|
|
|
@@ -27,12 +177,18 @@ class CBDRunner(AtomicDEVS):
|
|
|
In this paused mode, the model will not
|
|
|
progress. Useful when in combination with
|
|
|
multiple ODEs. Defaults to :code:`False`.
|
|
|
+ algo: The root finding algorithm function. See
|
|
|
+ :class:`CrossingDetection` for more info.
|
|
|
+ **kwargs: Optional parameters for the zero-crossing
|
|
|
+ detection algorithm.
|
|
|
"""
|
|
|
- def __init__(self, name, cbd, initials=None, stopped=False, crossings=None):
|
|
|
+ def __init__(self, name, cbd, initials=None, stopped=False, crossings=None, algo=CrossingDetection.ITP, **kwargs):
|
|
|
AtomicDEVS.__init__(self, name)
|
|
|
self.initials = {} if initials is None else initials
|
|
|
self.original_model, self.Hname = prepare_cbd(cbd.clone(), initials)
|
|
|
self.crossings = {} if crossings is None else crossings
|
|
|
+ self.algo = algo
|
|
|
+ self.kwargs = kwargs
|
|
|
|
|
|
self.state = {
|
|
|
"CBD": None,
|
|
|
@@ -43,12 +199,12 @@ class CBDRunner(AtomicDEVS):
|
|
|
"request_compute": True,
|
|
|
"zero_crossing": None,
|
|
|
"time": 0.0,
|
|
|
- "stopped": stopped,
|
|
|
- # "zcd": { k: None for k in self.crossings }
|
|
|
+ "stopped": stopped
|
|
|
}
|
|
|
|
|
|
self.simulator = None
|
|
|
self.reinit()
|
|
|
+ draw(self.state["CBD"], name + ".dot")
|
|
|
|
|
|
self.inputs = {}
|
|
|
for inp in filter(lambda b: b.getBlockType() == "InputPortBlock", cbd.getBlocks()):
|
|
|
@@ -91,23 +247,23 @@ class CBDRunner(AtomicDEVS):
|
|
|
"""
|
|
|
return self.state["CBD"].getBlockByName(pname).getSignal("OUT1")[-1].value
|
|
|
|
|
|
- def crossing_detection(self, signal, y):
|
|
|
+ def crossing_detection(self, signal, y, y0):
|
|
|
"""
|
|
|
Crossing root finder for a signal.
|
|
|
|
|
|
Args:
|
|
|
signal (str): The port to detect.
|
|
|
y (float): The value to cross.
|
|
|
-
|
|
|
- Note:
|
|
|
- This function will change in the future, presumably becoming the IVP
|
|
|
- root finding algorithm. Currently, the Illinois variant of Regula Falsi
|
|
|
- is used.
|
|
|
+ y0 (float): Y-value of the lower end of the interval.
|
|
|
"""
|
|
|
- F = lambda t: self.crossing_function(t - self.state["time"], signal)
|
|
|
+ def F(t):
|
|
|
+ h = t - self.state["time"]
|
|
|
+ if h == 0:
|
|
|
+ return y0
|
|
|
+ return self.crossing_function(h, signal)
|
|
|
t1 = self.state["time"]
|
|
|
t2 = t1 + self.state["delta_t"]
|
|
|
- return CrossingDetection.regula_falsi(t1, t2, y, F)
|
|
|
+ return self.algo(t1, t2, y, F, **self.kwargs)
|
|
|
|
|
|
def crossing_function(self, h, signal, restore=True):
|
|
|
"""
|
|
|
@@ -231,15 +387,18 @@ class CBDRunner(AtomicDEVS):
|
|
|
value = self.get_signal(z)
|
|
|
# print("POST", z, self.state["CBD"].getBlockByName(z).getSignal("OUT1")[-1])
|
|
|
if (old[z] - y) * (value - y) < 0:
|
|
|
+ # print(old[z], value, y, z, self.state["time"], self.state["delta_t"])
|
|
|
# A crossing will happen!
|
|
|
# print("CROSSING:", old[z], value, y)
|
|
|
- cds.setdefault(self.crossing_detection(z, y), []).append(z)
|
|
|
+ cds.setdefault(self.crossing_detection(z, y, old[z]), []).append(z)
|
|
|
if len(cds) > 0:
|
|
|
fzc = min(cds)
|
|
|
self.state["zero_crossing"] = cds[fzc]
|
|
|
h = fzc - self.state["time"]
|
|
|
self.state["delta_t"] = h
|
|
|
|
|
|
+ # print("CROSSING HAPPENED")
|
|
|
+
|
|
|
self.crossing_function(h, None, False)
|
|
|
else:
|
|
|
self.set_delta(float('inf'))
|
|
|
@@ -321,74 +480,11 @@ def prepare_cbd(model, initials):
|
|
|
cbd.addConnection(Mname, clock, input_port_name='h')
|
|
|
return cbd, Hname
|
|
|
|
|
|
-
|
|
|
-class CrossingDetection:
|
|
|
- """
|
|
|
- Helper class that implements the crossing detection algorithms.
|
|
|
-
|
|
|
- The following description concerns the default arguments for all functions.
|
|
|
- On top of that, there may be some additional arguments defined, as listed in
|
|
|
- the specific documentations below.
|
|
|
-
|
|
|
- Args:
|
|
|
- t1 (float): The x-value of the lower bound of the interval.
|
|
|
- t2 (float): The x-value of the higher bound of the interval.
|
|
|
- y (float): The value for which a crossing must be checked.
|
|
|
- f: A function that should return the y-value for a given x (or t).
|
|
|
- """
|
|
|
- @staticmethod
|
|
|
- def linear(t1, t2, y, f, **kwargs):
|
|
|
- """
|
|
|
- Finds the root of the crossing using linear interpolation.
|
|
|
- No additional function calls or arguments are used.
|
|
|
- """
|
|
|
- y1 = f(t1)
|
|
|
- y2 = f(t2)
|
|
|
- if y1 == y2:
|
|
|
- return y1
|
|
|
- return (t2 - t1) / (y2 - y1) * (y - y1) + t1
|
|
|
-
|
|
|
- @staticmethod
|
|
|
- def regula_falsi(t1, t2, y, f, **kwargs):
|
|
|
- """
|
|
|
- Implements the Illinois algorithm for finding the root for a crossing problem.
|
|
|
-
|
|
|
- Args:
|
|
|
- eps (float): Half of the upper bound for the relative error.
|
|
|
- Defaults to 1e-5.
|
|
|
- n (int): The maximal amount of iterations to compute. Defaults to
|
|
|
- 5 million iterations.
|
|
|
-
|
|
|
- See Also:
|
|
|
- https://en.wikipedia.org/wiki/Regula_falsi
|
|
|
- """
|
|
|
- eps = kwargs.get("eps", 1e-5)
|
|
|
- n = kwargs.get("n", 5 * 1000 * 1000)
|
|
|
- y1 = f(t1) - y
|
|
|
- y2 = f(t2) - y
|
|
|
- assert y1 * y2 < 0, "No crossing possible in given range"
|
|
|
- tn, yn = t1, y1
|
|
|
- side = 0
|
|
|
- for i in range(n):
|
|
|
- if abs(t1 - t2) < eps * abs(t1 + t2): break
|
|
|
- tn = (y1 * t2 - y2 * t1) / (y1 - y2)
|
|
|
- yn = f(tn) - y
|
|
|
- if yn * y2 > 0:
|
|
|
- t2, y2 = tn, yn
|
|
|
- if side == -1:
|
|
|
- y1 /= 2
|
|
|
- side = -1
|
|
|
- elif yn * y1 > 0:
|
|
|
- t1, y1 = tn, yn
|
|
|
- if side == 1:
|
|
|
- y2 /= 2
|
|
|
- side = 1
|
|
|
- else:
|
|
|
- break
|
|
|
- return tn
|
|
|
-
|
|
|
if __name__ == '__main__':
|
|
|
import numpy as np
|
|
|
def f(x):
|
|
|
return np.cos(x) - x ** 3
|
|
|
- print(CrossingDetection.regula_falsi(0, 1.5, 0, f, eps=1e-5))
|
|
|
+ print(CrossingDetection.regula_falsi(0, 1.5, 0, f))
|
|
|
+ # def f(x):
|
|
|
+ # return x ** 3 - 2*x
|
|
|
+ print(CrossingDetection.ITP(0, 1.5, 0, f, k1=0.1, k2=1.5))
|