| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341 |
- import math
- from .Core import CBD
- from .util import PYTHON_VERSION
- if PYTHON_VERSION == 3:
- # Python 2 complient
- from functools import reduce
- # Superclass for possible additional solvers
- class Solver:
- """
- Superclass that can solve algebraic loops.
- Args:
- logger (Logger): The logger to use.
- """
- def __init__(self, logger):
- self._logger = logger
- def checkValidity(self, path, component):
- """
- Checks the validity of an algebraic loop.
- Args:
- path (str): The path of the top-level block.
- component (list): The blocks in the algebraic loop.
- """
- raise NotImplementedError()
- def constructInput(self, component, curIt):
- """
- Constructs input for the solver.
- Args:
- component (list): The blocks in the algebraic loop.
- curIt (int): The current iteration of the simulation.
- See Also:
- :func:`solve`
- """
- raise NotImplementedError()
- def solve(self, solverInput):
- """
- Solves the algebraic loop.
- Args:
- solverInput: The constructed input.
- See Also:
- :func:`constructInput`
- """
- raise NotImplementedError()
- class LinearSolver(Solver):
- """
- Solves linear algebraic loops using matrices.
- """
- def checkValidity(self, path, component):
- if not self.__isLinear(component):
- self._logger.fatal("Cannot solve non-linear algebraic loop.\nSelf: {}\nComponents: {}".format(path, component))
- def __isLinear(self, strongComponent):
- """Determines if an algebraic loop describes a linear equation or not.
- For a block to comprise the strong component, at least one of its dependencies must be in the strong
- component as well.
- Args:
- strongComponent (list): The detected loop, in a list (of BaseBlock instances)
- Returns:
- :class:`True` if the loop is linear, else :code:`False`.
- """
- # TO IMPLEMENT
- """
- A non-linear equation is generated when the following conditions occur:
- (1) there is a multiplication operation being performed between two unknowns.
- (2) there is an invertion operation being performed in an unknown.
- (3) some non-linear block belongs to the strong component
-
- The condition (1) can be operationalized by finding a product block that has two dependencies belonging to
- the strong component. This will immediatly tell us that it is a product between two unknowns.
- The condition (2) can be operationalized simply by finding an inverter block in the strong component.
- Because the inverter block only has one input, if it is in the strong component, it means that its only
- dependency is in the strong component.
- """
- # WON'T APPEAR: Constant, Sequence, Time, Logging
- # LINEAR: Negator, Adder, Delay, Input, Output, Wire
- # NON-LINEAR: Inverter, Modulo, Root, LT, EQ, LTE, Not, Or, And, MUX, Generic, ABS, Int, Power, Min, Max, Clamp
- # SEMI-LINEAR: Product // MUX?
- for block in strongComponent:
- # condition (1)
- if block.getBlockType() == "ProductBlock":
- dependenciesUnknown = [x for x in block.getDependencies(0) if x in strongComponent]
- if len(dependenciesUnknown) >= 2:
- return False
- # condition (2) and (3)
- if block.getBlockType() in ["InverterBlock", "ModuloBlock", "RootBlock", "LessThanBlock", "EqualsBlock",
- "LessThanOrEqualsBlock", "NotBlock", "OrBlock", "AndBlock", "MinBlock",
- "MaxBlock", "MultiplexerBlock", "GenericBlock", "AbsBlock", "IntBlock",
- "PowerBlock", "ClampBlock"]:
- return False
- return True
- def constructInput(self, strongComponent, curIteration):
- """
- Constructs input for a solver of systems of linear equations
- Input consists of two matrices:
- - M1: coefficient matrix, where each row represents an equation of the system
- - M2: result matrix, where each element is the result for the corresponding equation in M1
- """
- # Initialize matrices with zeros
- size = len(strongComponent)
- M1 = Matrix(size, size)
- M2 = [0] * size
- # block -> index of block
- indexdict = dict()
- for i, block in enumerate(strongComponent):
- indexdict[block] = i
- # Get low-level dependency
- resolveBlock = lambda possibleDep, output_port: possibleDep if not isinstance(possibleDep, CBD) else possibleDep.getBlockByName(output_port)
- # Get list of low-level dependencies from n inputs
- def getBlockDependencies2(block):
- return (resolveBlock(b, output_port) for (b, output_port) in [block.getBlockConnectedToInput(x) for x in block.getInputPortNames()])
- for i, block in enumerate(strongComponent):
- if block.getBlockType() == "AdderBlock":
- for external in [x for x in getBlockDependencies2(block) if x not in strongComponent]:
- M2[i] -= external.getSignal()[curIteration].value
- M1[i, i] = -1
- for compInStrong in [x for x in getBlockDependencies2(block) if x in strongComponent]:
- M1[i, indexdict[compInStrong]] += 1
- elif block.getBlockType() == "ProductBlock":
- # M2 can stay 0
- M1[i, i] = -1
- fact = reduce(lambda x, y: x * y, [x.getSignal()[curIteration].value for x in getBlockDependencies2(block) if x not in strongComponent])
- for compInStrong in [x for x in getBlockDependencies2(block) if x in strongComponent]:
- M1[i, indexdict[compInStrong]] += fact
- elif block.getBlockType() == "NegatorBlock":
- # M2 can stay 0
- M1[i, i] = -1
- possibleDep, output_port = block.getBlockConnectedToInput("IN1")
- M1[i, indexdict[resolveBlock(possibleDep, output_port)]] = - 1
- elif block.getBlockType() == "InputPortBlock":
- # M2 can stay 0
- M1[i, i] = 1
- possibleDep, output_port = block.parent.getBlockConnectedToInput(block.getBlockName())
- M1[i, indexdict[resolveBlock(possibleDep, output_port)]] = - 1
- elif block.getBlockType() == "OutputPortBlock" or block.getBlockType() == "WireBlock":
- # M2 can stay 0
- M1[i, i] = 1
- dblock = block.getDependencies(0)[0]
- if isinstance(dblock, CBD):
- oport = block.getLinksIn()['IN1'].output_port
- dblock = dblock.getBlockByName(oport).getLinksIn()['IN1'].block
- M1[i, indexdict[dblock]] = - 1
- elif block.getBlockType() == "DelayBlock":
- # If a delay is in a strong component, this is the first iteration
- # FIXME: turn this into a normal error?
- assert curIteration == 0
- # And so the dependency is the IC
- # M2 can stay 0 because we have an equation of the type -x = -ic <=> -x + ic = 0
- M1[i, i] = -1
- possibleDep, output_port = block.getBlockConnectedToInput("IC")
- dependency = resolveBlock(possibleDep, output_port)
- assert dependency in strongComponent
- M1[i, indexdict[dependency]] = 1
- else:
- self._logger.fatal("Unknown element '{}', please implement".format(block.getBlockType()))
- return M1, M2
- def solve(self, solverInput):
- M1, M2 = solverInput
- n = M1.rows
- indxc = [0] * n
- indxr = [0] * n
- ipiv = [0] * n
- icol = 0
- irow = 0
- for i in range(n):
- big = 0.0
- for j in range(n):
- if ipiv[j] != 1:
- for k in range(n):
- if ipiv[k] == 0:
- nb = math.fabs(M1[j, k])
- if nb >= big:
- big = nb
- irow = j
- icol = k
- elif ipiv[k] > 1:
- raise ValueError("GAUSSJ: Singular Matrix-1")
- ipiv[icol] += 1
- if irow != icol:
- for l in range(n):
- M1[irow, l], M1[icol, l] = M1[icol, l], M1[irow, l]
- M2[irow], M2[icol] = M2[icol], M2[irow]
- indxr[i] = irow
- indxc[i] = icol
- if M1[icol, icol] == 0.0:
- raise ValueError("GAUSSJ: Singular Matrix-2")
- pivinv = 1.0 / M1[icol, icol]
- M1[icol, icol] = 1.0
- for l in range(n):
- M1[icol, l] *= pivinv
- M2[icol] *= pivinv
- for ll in range(n):
- if ll != icol:
- dum = M1[ll, icol]
- M1[ll, icol] = 0.0
- for l in range(n):
- M1[ll, l] -= M1[icol, l] * dum
- M2[ll] -= M2[icol] * dum
- for l in range(n - 1, 0, -1):
- if indxr[l] != indxc[l]:
- for k in range(n):
- M1[k, indxr[l]], M1[k, indxc[l]] = M1[k, indxc[l]], M1[k, indxr[l]]
- return solverInput[1]
- class Matrix:
- """Custom, efficient matrix class. This class is used for efficiency purposes.
- - Using a while/for loop is slow.
- - Using :class:`[[0] * n] * n` will have n references to the same list.
- - Using :class:`[[0] * size for _ in range(size)]` can be 5 times slower
- than this class!
- Numpy could be used to even further increase efficiency, but this increases the
- required dependencies for external hardware systems (that may not provide these options).
- Note:
- Internally, the matrix is segmented into chunks of 500.000.000 items.
- """
- def __init__(self, rows, cols):
- self.rows = rows
- self.cols = cols
- self.size = rows * cols
- self.__max_list_size = 500 * 1000 * 1000
- self.data = [[0] * ((rows * cols) % self.__max_list_size)]
- for r in range(self.size // self.__max_list_size):
- self.data.append([0] * self.__max_list_size)
- def __getitem__(self, idx):
- absolute = idx[0] * self.cols + idx[1]
- outer = absolute // self.__max_list_size
- inner = absolute % self.__max_list_size
- return self.data[outer][inner]
- def __setitem__(self, idx, value):
- absolute = idx[0] * self.cols + idx[1]
- outer = absolute // self.__max_list_size
- inner = absolute % self.__max_list_size
- self.data[outer][inner] = value
- def __str__(self):
- res = ""
- for row in range(self.rows):
- if len(res) > 0:
- res += "\n"
- res += "["
- for col in range(self.cols):
- res += "\t%8.4f" % self[row, col]
- res += "\t]"
- return res
- try:
- import sympy
- # TODO: non-unique solutions?
- class SympySolver(Solver):
- def checkValidity(self, path, component):
- raise NotImplementedError("The Sympy Solver is not finished yet, so please refrain from using it.")
- def constructInput(self, component, curIt):
- eqs = {}
- for i, block in enumerate(component):
- args = []
- for x in self.__dependencies(block):
- if x not in component:
- args.append(x.getSignal()[curIt].value)
- else:
- args.append(sympy.symbols('x%d' % component.index(x)))
- eqs['x%d' % i] = self.__OPERATIONS[block.getBlockType()](args, block)
- return eqs
- def solve(self, solverInput):
- sol = []
- eqs = []
- answer = [0] * len(solverInput)
- for k, v in solverInput.items():
- x = sympy.symbols(k)
- sol.append(x)
- eqs.append(v - x)
- solution = sympy.nonlinsolve(eqs, sol)
- print(solverInput, solution)
- # TODO: Clamp, MUX, Split, LTE, Eq, LT, not, and, or, delay
- __OPERATIONS = {
- "AdderBlock": lambda l, _: sum(l),
- "ProductBlock": lambda l, _: reduce((lambda a, b: a * b), l),
- "NegatorBlock": lambda l, _: -l[0],
- "InverterBlock": lambda l, _: 1.0/l[0],
- "ModuloBlock": lambda l, _: l[0] % l[1],
- "RootBlock": lambda l, _: sympy.root(l[0], l[1]),
- "PowerBlock": lambda l, _: l[0] ** l[1],
- "AbsBlock": lambda l, _: abs(l[0]),
- "IntBlock": lambda l, _: sympy.floor(l[0]),
- "GenericBlock": lambda l, b: getattr(sympy, b.getBlockOperator())(l[0]),
- "MaxBlock": lambda l: sympy.Max(*l),
- "MinBlock": lambda l: sympy.Min(*l),
- }
- @staticmethod
- def __dependencies(block):
- blocks = []
- for s in block.getInputPortNames():
- b, op = block.getBlockConnectedToInput(s)
- if isinstance(b, CBD):
- b = b.getBlockByName(op)
- blocks.append(b)
- return blocks
- except: pass
|