| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- """
- This is CBD simulator controller exposing the most relevant methods of
- a CBD simulator (See CBDMultipleOutput.Source.CBD.run) to allow for control from a SCCD.
- date: Oct 2015
- author (responsible): Claudio Gomes
- """
- import math
- from CBDMultipleOutput.Source.CBD import Clock, DepGraph
- from CBDMultipleOutput.Source.CBD import CBD
- class CBDController:
- """The CBD contoller acts as a facade to the simulator methods that are in the CBD objects.
- The original design decision to keep the methods in the CBD Domain classes
- (as opposed to, for instance, keep them in a visitor/interpreter was not the decision of the current author.)"""
-
- def __init__(self, cbdModel, delta_t):
- self.model = cbdModel
- self.model.setDeltaT(delta_t)
- self.delta = delta_t
-
- def initSimulation(self):
- self.model.resetSignals();
- self.model.setClock(Clock(self.delta))
-
- def createDepGraph(self, iteratonStep):
- return self.__createDepGraph(iteratonStep)
-
- def createStrongComponents(self, depGraph, iteratonStep):
- return depGraph.getStrongComponents(iteratonStep)
-
- def componentIsCycle(self, component, depGraph):
- return self.__hasCycle(component, depGraph)
-
- def computeNextBlock(self, comp, iteratonStep):
- block = comp[0] # the strongly connected component has a single element
- block.compute(iteratonStep)
-
- def computeNextAlgebraicLoop(self, comp, iteratonStep):
- currentComponent = comp
- # Detected a strongly connected component
- if not self.__isLinear(currentComponent):
- raise NotImplementedError("Cannot solve non-linear algebraic loop")
- solverInput = self.__constructLinearInput(currentComponent, iteratonStep)
- self.__gaussjLinearSolver(solverInput)
- solutionVector = solverInput[1]
- for block in currentComponent:
- blockIndex = currentComponent.index(block)
- block.appendToSignal(solutionVector[blockIndex])
-
- def advanceTimeStep(self):
- self.model.getClock().step()
-
- def getCurrentSimulationTime(self):
- return self.model.getClock().getTime()
-
- def __isLinear(self, strongComponent):
- """
- Determines if an algebraic loop describes a linear equation or not
- As input you get the detected loop in a list.
- If the loop is linear return True
- Else: call exit(1) to exit the simulation with exit code 1
-
- The strong component parameter is a list of blocks that comprise the strong component.
- For a block to comprise the strong component, at least one of its dependencies must be in the strong component as well.
- A non-linear equation is generated when the following conditions occur:
- (1) there is a multiplication operation being performed between two unknowns.
- (2) there is an invertion operatioon being performed in an unknown.
- (3) some non linear block belongs to the strong component
-
- The condition (1) can be operationalized by finding a product block that has two dependencies belonging to the strong component.
- This will immediatly tell us that it is a product between two unknowns.
- The condition (2) can be operationalized simply by finding an inverter block in the strong component.
- Because the inverter block only has one input, if it is in the strong component, it means that its only dependency is in the strong component.
-
- """
- for block in strongComponent:
- if not block.getBlockType() == "AdderBlock" and not block.getBlockType() == "NegatorBlock":
- # condition (1)
- if block.getBlockType() == "ProductBlock":
- dependenciesUnknown = [x for x in block.getDependencies(0) if x in strongComponent ]
- if (len(dependenciesUnknown) == 2):
- return False
-
- # conditions (2) and (3)
- if block.getBlockType() in ["InverterBlock", "LessThanBlock", "ModuloBlock", "RootBlock", "EqualsBlock", "NotBlock", "OrBlock", "AndBlock", "SequenceBlock"]:
- return False
-
- return True
-
- def __hasCycle(self, component, depGraph):
- """
- Determine whether a component is cyclic
- """
- assert len(component)>=1, "A component should have at least one element"
- if len(component)>1:
- return True
- else: # a strong component of size one may still have a cycle: a self-loop
- if depGraph.hasDependency(component[0], component[0]):
- return True
- else:
- return False
- def __constructLinearInput(self, strongComponent, curIteration):
- """
- Constructs input for a solver of systems of linear equations
- Input consists of two matrices:
- M1: coefficient matrix, where each row represents an equation of the system
- M2: result matrix, where each element is the result for the corresponding equation in M1
- """
- size = len(strongComponent)
- row = []
- M1 = []
- M2 = []
- # Initialize matrices with zeros
- i = 0
- while (i < size):
- j = 0
- row = []
- while (j < size):
- row.append(0)
- j += 1
- M1.append(row)
- M2.append(0)
- i += 1
- # block -> index of block
- indexdict = dict()
- for (i,block) in enumerate(strongComponent):
- indexdict[block] = i
- resolveBlock = lambda possibleDep, output_port: possibleDep if not isinstance(possibleDep, CBD) else possibleDep.getBlockByName(output_port)
- def getBlockDependencies2(block):
- return [ resolveBlock(b,output_port) for (b, output_port) in [block.getBlockConnectedToInput("IN1"), block.getBlockConnectedToInput("IN2")]]
- for (i, block) in enumerate(strongComponent):
- if block.getBlockType() == "AdderBlock":
- for external in [ x for x in getBlockDependencies2(block) if x not in strongComponent ]:
- M2[i] -= external.getSignal()[curIteration].value
- M1[i][i] = -1
- for compInStrong in [ x for x in getBlockDependencies2(block) if x in strongComponent ]:
- M1[i][indexdict[compInStrong]] = 1
- elif block.getBlockType() == "ProductBlock":
- #M2 can stay 0
- M1[i][i] = -1
- M1[i][indexdict[[ x for x in getBlockDependencies2(block) if x in strongComponent ][0]]] = reduce(lambda x,y: x*y, [ x.getSignal()[curIteration].value for x in getBlockDependencies2(block) if x not in strongComponent ])
- elif block.getBlockType() == "NegatorBlock":
- #M2 can stay 0
- M1[i][i] = -1
- possibleDep, output_port = block.getBlockConnectedToInput("IN1")
- M1[i][indexdict[resolveBlock(possibleDep, output_port)]] = - 1
- elif block.getBlockType() == "InputPortBlock":
- #M2 can stay 0
- M1[i][i] = 1
- possibleDep, output_port = block.parent.getBlockConnectedToInput(block.getBlockName())
- M1[i][indexdict[resolveBlock(possibleDep, output_port)]] = - 1
- elif block.getBlockType() == "OutputPortBlock" or block.getBlockType() == "WireBlock":
- #M2 can stay 0
- M1[i][i] = 1
- M1[i][indexdict[block.getDependencies(0)[0]]] = - 1
- elif block.getBlockType() == "DelayBlock":
- # If a delay is in a strong component, this is the first iteration
- assert curIteration == 0
- # And so the dependency is the IC
- # M2 can stay 0 because we have an equation of the type -x = -ic <=> -x + ic = 0
- M1[i][i] = -1
- possibleDep, output_port = block.getBlockConnectedToInput("IC")
- dependency = resolveBlock(possibleDep, output_port)
- assert dependency in strongComponent
- M1[i][indexdict[dependency]] = 1
- else:
- self.__logger.fatal("Unknown element, please implement")
- return [M1, M2]
-
- def __gaussjLinearSolver(self, solverInput):
- M1 = solverInput[0]
- M2 = solverInput[1]
- n = len(M1)
- indxc = self.__ivector(n)
- indxr = self.__ivector(n)
- ipiv = self.__ivector(n)
- icol = 0
- irow = 0
- for i in range(n):
- big = 0.0
- for j in range(n):
- if (ipiv[j] != 1):
- for k in range(n):
- if (ipiv[k] == 0):
- if (math.fabs(M1[j][k]) >= big):
- big = math.fabs(M1[j][k])
- irow = j
- icol = k
- elif (ipiv[k] > 1):
- raise ValueError("GAUSSJ: Singular Matrix-1")
- ipiv[icol] += 1
- if (irow != icol):
- for l in range(n):
- (M1[irow][l], M1[icol][l]) = self.__swap(M1[irow][l], M1[icol][l])
- (M2[irow], M2[icol]) = self.__swap(M2[irow], M2[icol])
- indxr[i] = irow
- indxc[i] = icol
- if (M1[icol][icol] == 0.0):
- raise ValueError("GAUSSJ: Singular Matrix-2")
- pivinv = 1.0/M1[icol][icol]
- M1[icol][icol] = 1.0
- for l in range(n):
- M1[icol][l] *= pivinv
- M2[icol] *= pivinv
- for ll in range(n):
- if (ll != icol):
- dum = M1[ll][icol]
- M1[ll][icol] = 0.0
- for l in range(n):
- M1[ll][l] -= M1[icol][l] * dum
- M2[ll] -= M2[icol] * dum
- l = n
- while (l > 0):
- l -= 1
- if (indxr[l] != indxc[l]):
- for k in range(n):
- (M1[k][indxr[l]], M1[k][indxc[l]]) = self.__swap(M1[k][indxr[l]], M1[k][indxc[l]])
- def __createDepGraph(self, curIteration):
- """
- Create a dependency graph of the CBD model.
- Use the curIteration to differentiate between the first and other iterations
- Watch out for dependencies with sub-models.
- """
- blocks = self.model.getBlocks()
- depGraph = DepGraph()
- for block in blocks:
- depGraph.addMember(block);
-
- def recSetInternalDependencies(blocks, curIteration):
- for block in blocks:
- for dep in block.getDependencies(curIteration):
- depGraph.setDependency(block, dep, curIteration)
- if isinstance(block, CBD):
- recSetInternalDependencies(block.getBlocks(), curIteration)
-
- recSetInternalDependencies(blocks, curIteration);
-
- return depGraph
-
- def __ivector(self, n):
- v = []
- for i in range(n):
- v.append(0)
- return v
- def __swap(self, a, b):
- return (b, a)
-
|