||
- """
- This file contains the standard library for CBD building blocks.
- """
- from CBD.CBD import BaseBlock, CBD, level
- from CBD import naivelog
- import math
- __all__ = ['ConstantBlock', 'NegatorBlock', 'InverterBlock', 'AdderBlock', 'ProductBlock', 'ModuloBlock',
- 'RootBlock', 'AbsBlock', 'IntBlock', 'ClampBlock', 'GenericBlock', 'MultiplexerBlock', 'LessThanBlock',
- 'EqualsBlock', 'LessThanOrEqualsBlock', 'NotBlock', 'OrBlock', 'AndBlock', 'DelayBlock', 'LoggingBlock',
- 'AddOneBlock', 'DerivatorBlock', 'IntegratorBlock', 'SplitBlock', 'Clock', 'TimeBlock', 'PowerBlock',
- 'MaxBlock', 'MinBlock']
- class ConstantBlock(BaseBlock):
- """
- The constant block will always output its constant value
- """
- def __init__(self, block_name, value=0.0):
- BaseBlock.__init__(self, block_name, [], ["OUT1"])
- self.__value = value
- def getValue(self):
- """Get the current value."""
- return self.__value
- def setValue(self, value):
- """Change the constant value."""
- self.__value = value
- def compute(self, curIteration):
- # TO IMPLEMENT
- self.appendToSignal(self.getValue())
- def __repr__(self):
- return BaseBlock.__repr__(self) + " Value = " + str(self.getValue()) + "\n"
- class NegatorBlock(BaseBlock):
- """
- The negator block will output the value of the input multiplied with -1
- """
- def __init__(self, block_name):
- BaseBlock.__init__(self, block_name, ["IN1"], ["OUT1"])
- def compute(self, curIteration):
- # TO IMPLEMENT
- self.appendToSignal(-self.getInputSignal(curIteration, "IN1").value)
- class InverterBlock(BaseBlock):
- """
- The invertblock will output 1/IN
- """
- def __init__(self, block_name, tolerance=1e-30):
- BaseBlock.__init__(self, block_name, ["IN1"], ["OUT1"])
- self._tolerance = tolerance
- def compute(self, curIteration):
- # TO IMPLEMENT
- input = self.getInputSignal(curIteration, "IN1").value
- if abs(input) < self._tolerance:
- raise ZeroDivisionError("InverterBlock received input less than {}.".format(self._tolerance))
- self.appendToSignal(1.0 / input)
- class AdderBlock(BaseBlock):
- """
- The adderblock will add all the inputs.
- Args:
- block_name (str): The name of the block.
- numberOfInputs (int): The amount of input ports to set.
- """
- def __init__(self, block_name, numberOfInputs=2):
- BaseBlock.__init__(self, block_name, ["IN%d" % (x+1) for x in range(numberOfInputs)], ["OUT1"])
- self.__numberOfInputs = numberOfInputs
- def compute(self, curIteration):
- # TO IMPLEMENT
- result = 0
- for i in range(1, self.__numberOfInputs+1):
- result += self.getInputSignal(curIteration, "IN%d"%i).value
- self.appendToSignal(result)
- def getNumberOfInputs(self):
- """
- Gets the total number of input ports.
- """
- return self.__numberOfInputs
- class ProductBlock(BaseBlock):
- """
- The product block will multiply all the inputs
- """
- def __init__(self, block_name, numberOfInputs=2):
- BaseBlock.__init__(self, block_name, ["IN%d" % (x+1) for x in range(numberOfInputs)], ["OUT1"])
- self.__numberOfInputs = numberOfInputs
- def compute(self, curIteration):
- # TO IMPLEMENT
- result = 1
- for i in range(1, self.__numberOfInputs+1):
- result *= self.getInputSignal(curIteration, "IN%d"%i).value
- self.appendToSignal(result)
- def getNumberOfInputs(self):
- """
- Gets the total number of input ports.
- """
- return self.__numberOfInputs
- class ModuloBlock(BaseBlock):
- """
- A basic block that computes the IN1 modulo IN2
- """
- def __init__(self, block_name):
- BaseBlock.__init__(self, block_name, ["IN1", "IN2"], ["OUT1"])
- def compute(self, curIteration):
- # TO IMPLEMENT
- # Use 'math.fmod' for validity with C w.r.t. negative values AND floats
- self.appendToSignal(math.fmod(self.getInputSignal(curIteration, "IN1").value, self.getInputSignal(curIteration, "IN2").value))
- class RootBlock(BaseBlock):
- """
- A basic block that computes the IN2-th root from IN1
- """
- def __init__(self, block_name):
- BaseBlock.__init__(self, block_name, ["IN1", "IN2"], ["OUT1"])
- def compute(self, curIteration):
- # TO IMPLEMENT
- self.appendToSignal(self.getInputSignal(curIteration, "IN1").value ** (1 / self.getInputSignal(curIteration, "IN2").value))
- class PowerBlock(BaseBlock):
- """
- A basic block that computes IN1 to the IN2-th power
- """
- def __init__(self, block_name):
- BaseBlock.__init__(self, block_name, ["IN1", "IN2"], ["OUT1"])
- def compute(self, curIteration):
- # TO IMPLEMENT
- self.appendToSignal(self.getInputSignal(curIteration, "IN1").value ** self.getInputSignal(curIteration, "IN2").value)
- class AbsBlock(BaseBlock):
- """
- The abs block will output the absolute value of the input.
- """
- def __init__(self, block_name):
- BaseBlock.__init__(self, block_name, ["IN1"], ["OUT1"])
- def compute(self, curIteration):
- # TO IMPLEMENT
- self.appendToSignal(abs(self.getInputSignal(curIteration).value))
- class IntBlock(BaseBlock):
- """
- The int block will output the integer value (floored) of the input.
- """
- def __init__(self, block_name):
- BaseBlock.__init__(self, block_name, ["IN1"], ["OUT1"])
- def compute(self, curIteration):
- self.appendToSignal(int(self.getInputSignal(curIteration).value))
- class ClampBlock(BaseBlock):
- """
- The clamp block will clamp the input between min and max.
- Args:
- block_name (str): The name of the block.
- min (numeric): The minimal value.
- max (numeric): The maximal value.
- use_const (bool): When :code:`True`, the :attr:`min` and :attr:`max`
- values will be used. Otherwise, the minimal and
- maximal values are expected as inputs 2 and 3,
- respectively.
- """
- def __init__(self, block_name, min=-1, max=1, use_const=True):
- super().__init__(block_name, ["IN1"] if use_const else ["IN1", "IN2", "IN3"], ["OUT1"])
- self._use_const = use_const
- self.min = min
- self.max = max
- def compute(self, curIteration):
- if self._use_const:
- min_ = self.min
- max_ = self.max
- else:
- min_ = self.getInputSignal(curIteration, "IN2").value
- max_ = self.getInputSignal(curIteration, "IN3").value
- x = self.getInputSignal(curIteration, "IN1").value
- self.appendToSignal(min(max(x, min_), max_))
- class GenericBlock(BaseBlock):
- """
- The generic block will evaluate the operator on the input
- operator is the name (a string) of a Python function from the math library
- which will be called when the block is evaluated
- by default, initialized to None
- """
- def __init__(self, block_name, block_operator=None):
- # operator is the name (a string) of a Python function from the math library
- BaseBlock.__init__(self, block_name, ["IN1"], ["OUT1"])
- self.__block_operator = block_operator
- def getBlockOperator(self):
- """
- Gets the block operator.
- """
- return self.__block_operator
- def compute(self, curIteration):
- # TO IMPLEMENT
- a = self.getInputSignal(curIteration, "IN1").value
- self.appendToSignal(getattr(math, self.getBlockOperator())(a))
- def __repr__(self):
- repr = BaseBlock.__repr__(self)
- if self.__block_operator is None:
- repr += " No operator given\n"
- else:
- repr += " Operator :: " + self.__block_operator + "\n"
- return repr
- class MultiplexerBlock(BaseBlock):
- """
- The multiplexer block will output the signal from IN1 if select == 0; otherwise
- the signal from IN2 is outputted.
- """
- def __init__(self, block_name):
- BaseBlock.__init__(self, block_name, ["IN1", "IN2", "select"], ["OUT1"])
- def compute(self, curIteration):
- select = self.getInputSignal(curIteration, "select").value
- self.appendToSignal(self.getInputSignal(curIteration, "IN1" if select == 0 else "IN2").value)
- class MaxBlock(BaseBlock):
- """
- The max block will output the maximal value of all its inputs.
- """
- def __init__(self, block_name, numberOfInputs=2):
- BaseBlock.__init__(self, block_name, ["IN%d" % (x+1) for x in range(numberOfInputs)], ["OUT1"])
- self.__numberOfInputs = numberOfInputs
- def compute(self, curIteration):
- # TO IMPLEMENT
- result = []
- for i in range(1, self.__numberOfInputs+1):
- result.append(self.getInputSignal(curIteration, "IN%d"%i).value)
- self.appendToSignal(max(result))
- def getNumberOfInputs(self):
- """
- Gets the total number of input ports.
- """
- return self.__numberOfInputs
- class MinBlock(BaseBlock):
- """
- The min block will output the minimal value of all its inputs.
- """
- def __init__(self, block_name, numberOfInputs=2):
- BaseBlock.__init__(self, block_name, ["IN%d" % (x+1) for x in range(numberOfInputs)], ["OUT1"])
- self.__numberOfInputs = numberOfInputs
- def compute(self, curIteration):
- # TO IMPLEMENT
- result = []
- for i in range(1, self.__numberOfInputs+1):
- result.append(self.getInputSignal(curIteration, "IN%d"%i).value)
- self.appendToSignal(min(result))
- def getNumberOfInputs(self):
- """
- Gets the total number of input ports.
- """
- return self.__numberOfInputs
- class SplitBlock(BaseBlock):
- """
- The split block will split a signal over multiple paths.
- While this block can generally be omitted, it may still be
- used for clarity and clean-ness of the resulting models.
- Args:
- block_name (str): The name of the block.
- numberOfOutputs (int): The amount of paths to split into.
- """
- def __init__(self, block_name, numberOfOutputs=2):
- BaseBlock.__init__(self, block_name, ["IN1"], ["OUT%d" % (i+1) for i in range(numberOfOutputs)])
- self.__numberOfOutputs = numberOfOutputs
- def compute(self, curIteration):
- value = self.getInputSignal(curIteration).value
- for i in range(self.__numberOfOutputs):
- self.appendToSignal(value, "OUT%d" % (i+1))
- def getNumberOfOutputs(self):
- """
- Gets the total number of output ports.
- """
- return self.__numberOfOutputs
- class LessThanBlock(BaseBlock):
- """
- A simple block that will test if the IN1 is smaller than IN2 (output == 1 if true else 0)
- """
- def __init__(self, block_name):
- BaseBlock.__init__(self, block_name, ["IN1", "IN2"], ["OUT1"])
- def compute(self, curIteration):
- gisv = lambda s: self.getInputSignal(curIteration, s).value
- self.appendToSignal(1 if gisv("IN1") < gisv("IN2") else 0)
- class EqualsBlock(BaseBlock):
- """
- A simple block that will test if the IN1 is equal to IN2 (output == 1 if true else 0)
- """
- def __init__(self, block_name):
- BaseBlock.__init__(self, block_name, ["IN1", "IN2"], ["OUT1"])
- def compute(self, curIteration):
- gisv = lambda s: self.getInputSignal(curIteration, s).value
- self.appendToSignal(1 if gisv("IN1") == gisv("IN2") else 0)
- class LessThanOrEqualsBlock(BaseBlock):
- """
- A simple block that will test if the IN1 is smaller than or equals to IN2 (output == 1 if true else 0)
- """
- def __init__(self, block_name):
- BaseBlock.__init__(self, block_name, ["IN1", "IN2"], ["OUT1"])
- def compute(self, curIteration):
- gisv = lambda s: self.getInputSignal(curIteration, s).value
- self.appendToSignal(1 if gisv("IN1") <= gisv("IN2") else 0)
- class NotBlock(BaseBlock):
- """
- A simple Not block that will set a 0 to 1 and vice versa
- """
- def __init__(self, block_name):
- BaseBlock.__init__(self, block_name, ["IN1"], ["OUT1"])
- def compute(self, curIteration):
- result = 0 if self.getInputSignal(curIteration, "IN1").value else 1
- self.appendToSignal(result)
- class OrBlock(BaseBlock):
- """
- A simple Or block with possibly multiple inputlines
- """
- def __init__(self, block_name, numberOfInputs=2):
- BaseBlock.__init__(self, block_name, ["IN{0}".format(i) for i in range(1,numberOfInputs+1)], ["OUT1"])
- self.__numberOfInputs = numberOfInputs
- def compute(self, curIteration):
- result = 0
- for i in range(1, self.__numberOfInputs+1):
- result = result or self.getInputSignal(curIteration, "IN"+str(i)).value
- self.appendToSignal(result)
- def getNumberOfInputs(self):
- """
- Gets the total number of input ports.
- """
- return self.__numberOfInputs
- class AndBlock(BaseBlock):
- """
- A simple And block with possibly multiple inputlines
- """
- def __init__(self, block_name, numberOfInputs=2):
- BaseBlock.__init__(self, block_name, ["IN{0}".format(i) for i in range(1,numberOfInputs+1)], ["OUT1"])
- self.__numberOfInputs = numberOfInputs
- def compute(self, curIteration):
- result = 1
- for i in range(1, self.__numberOfInputs+1):
- result = result and self.getInputSignal(curIteration, "IN"+str(i)).value
- self.appendToSignal(result)
- def getNumberOfInputs(self):
- """
- Gets the total number of input ports.
- """
- return self.__numberOfInputs
- class DelayBlock(BaseBlock):
- """
- A delay block that takes the last value from the list
- IC: Initial Condition
- """
- def __init__(self, block_name):
- BaseBlock.__init__(self, block_name, ["IN1", "IC"], ["OUT1"])
- def getDependencies(self, curIteration):
- # TO IMPLEMENT: This is a helper function you can use to create the dependency graph
- # Treat dependencies differently. For instance, at the first iteration (curIteration == 0), the block only depends on the IC;
- if curIteration == 0:
- return [self._linksIn["IC"].block]
- return []
- def compute(self, curIteration):
- if curIteration == 0:
- self.appendToSignal(self.getInputSignal(curIteration, "IC").value)
- else:
- self.appendToSignal(self.getInputSignal(curIteration - 1).value)
- class LoggingBlock(BaseBlock):
- """
- A simple Logging block
- """
- def __init__(self, block_name, string, lev=level.WARNING):
- BaseBlock.__init__(self, block_name, ["IN1"], [])
- self.__string = string
- self.__logger = naivelog.getLogger("WarningLog")
- self.__lev = lev
- def compute(self, curIteration):
- if self.getInputSignal(curIteration, "IN1").value == 1:
- if self.__lev == level.WARNING:
- self.__logger.warning("Time " + str(self.getClock().getTime(curIteration)) + ": " + self.__string)
- elif self.__lev == level.ERROR:
- self.__logger.error("Time " + str(self.getClock().getTime(curIteration)) + ": " + self.__string)
- elif self.__lev == level.FATAL:
- self.__logger.fatal("Time " + str(self.getClock().getTime(curIteration)) + ": " + self.__string)
- class AddOneBlock(CBD):
- """
- Block adds a one to the input (used a lot for mux)
- """
- def __init__(self, block_name):
- CBD.__init__(self, block_name, ["IN1"], ["OUT1"])
- self.addBlock(ConstantBlock(block_name="OneConstant", value=1))
- self.addBlock(AdderBlock("PlusOne"))
- self.addConnection("IN1", "PlusOne")
- self.addConnection("OneConstant", "PlusOne")
- self.addConnection("PlusOne", "OUT1")
- class DerivatorBlock(CBD):
- """
- The derivator block is a CBD that calculates the derivative.
- """
- def __init__(self, block_name):
- CBD.__init__(self, block_name, ["IN1", "delta_t", "IC"], ["OUT1"])
- # TO IMPLEMENT
- self.addBlock(ProductBlock(block_name="multIc"))
- self.addBlock(NegatorBlock(block_name="neg1"))
- self.addBlock(AdderBlock(block_name="sum1"))
- self.addBlock(DelayBlock(block_name="delay"))
- self.addBlock(NegatorBlock(block_name="neg2"))
- self.addBlock(AdderBlock(block_name="sum2"))
- self.addBlock(ProductBlock(block_name="mult"))
- self.addBlock(InverterBlock(block_name="inv"))
- self.addConnection("IC", "multIc")
- self.addConnection("delta_t", "multIc")
- self.addConnection("multIc", "neg1")
- self.addConnection("neg1", "sum1")
- self.addConnection("IN1", "sum1")
- self.addConnection("sum1", "delay", input_port_name="IC")
- self.addConnection("IN1", "delay", input_port_name="IN1")
- self.addConnection("delay", "neg2")
- self.addConnection("neg2", "sum2")
- self.addConnection("IN1", "sum2")
- self.addConnection("sum2", "mult")
- self.addConnection("delta_t", "inv")
- self.addConnection("inv", "mult")
- self.addConnection("mult", "OUT1")
- class IntegratorBlock(CBD):
- """
- The integrator block is a CBD that calculates the integration.
- The block is implemented according to the backwards Euler rule.
- """
- def __init__(self, block_name):
- CBD.__init__(self, block_name, ["IN1", "delta_t", "IC"], ["OUT1"])
- # TO IMPLEMENT
- # TRAPEZOID RULE // TODO: fix the circularity issue => other solver?
- # # self.addBlock(ConstantBlock(block_name="zero", value=0.0))
- # self.addBlock(ConstantBlock(block_name="half", value=0.5))
- # self.addBlock(DelayBlock(block_name="delayIn"))
- # self.addBlock(ProductBlock(block_name="multDelta"))
- # self.addBlock(ProductBlock(block_name="multX"))
- # self.addBlock(DelayBlock(block_name="delayState"))
- # self.addBlock(AdderBlock(block_name="sumX"))
- # self.addBlock(AdderBlock(block_name="sumState"))
- # self.addBlock(NegatorBlock(block_name="neg"))
- # # self.addBlock(TimedGateBlock(block_name="gate", time=0.0))
- #
- # self.addConnection("IN1", "delayIn")
- # self.addConnection("IN1", "neg")
- # # self.addConnection("zero", "delayIn", input_port_name="IC")
- # self.addConnection("neg", "delayIn", input_port_name="IC")
- # self.addConnection("IN1", "sumX")
- # self.addConnection("delayIn", "sumX")
- # self.addConnection("sumX", "multX")
- # self.addConnection("delta_t", "multDelta")
- # self.addConnection("half", "multDelta")
- # self.addConnection("multDelta", "multX")
- # self.addConnection("sumState", "delayState")
- # # self.addConnection("zero", "gate", input_port_name="IN1")
- # # self.addConnection("multX", "gate", input_port_name="IN2")
- # # self.addConnection("gate", "sumState")
- # self.addConnection("multX", "sumState")
- # self.addConnection("IC", "delayState", input_port_name="IC")
- # self.addConnection("delayState", "sumState")
- # self.addConnection("sumState", "OUT1")
- # FORWARD EULER:
- # self.addBlock(ProductBlock("mul"))
- # self.addBlock(AdderBlock("sum"))
- # self.addBlock(AdderBlock("sum_ic"))
- # self.addBlock(DelayBlock("delay"))
- # self.addBlock(NegatorBlock("neg"))
- #
- # self.addConnection("IN1", "mul")
- # self.addConnection("delta_t", "mul")
- # self.addConnection("mul", "sum")
- # self.addConnection("delay", "sum")
- # self.addConnection("sum", "delay")
- # self.addConnection("IC", "neg")
- # self.addConnection("neg", "sum_ic")
- # self.addConnection("sum_ic", "delay", input_port_name="IC")
- # self.addConnection("mul", "sum_ic")
- # self.addConnection("sum", "OUT1")
- # BACKWARD EULER:
- self.addBlock(ConstantBlock(block_name="zero", value=0))
- self.addBlock(DelayBlock(block_name="delayIn"))
- self.addBlock(ProductBlock(block_name="multDelta"))
- self.addBlock(DelayBlock(block_name="delayState"))
- self.addBlock(AdderBlock(block_name="sumState"))
- self.addConnection("zero", "delayIn", input_port_name="IC")
- self.addConnection("IN1", "delayIn", input_port_name="IN1")
- self.addConnection("delayIn", "multDelta")
- self.addConnection("delta_t", "multDelta")
- self.addConnection("multDelta", "sumState")
- self.addConnection("IC", "delayState", input_port_name="IC")
- self.addConnection("delayState", "sumState")
- self.addConnection("sumState", "delayState", input_port_name="IN1")
- self.addConnection("sumState", "OUT1")
- class Clock(CBD):
- """
- System clock. **Must be present in a simulation model.**
- Args:
- block_name (str): The name of the block.
- start_time (float): Time at which the simulation starts. Defaults to 0.
- :Input Ports:
- - :code:`h`: The delta in-between timesteps. For fixed-rate simulations,
- this must be linked up to a constant value (e.g. a :class:`ConstantBlock`).
- :Output Ports:
- - :code:`time`: The current simulation time.
- - :code:`rel_time`: The relative simulation time, ignoring the start time.
- """
- def __init__(self, block_name, start_time=0.0):
- # TODO: simplify if start_time is 0
- # TODO: rel_time must not depend on delay + h !!
- CBD.__init__(self, block_name, ["h"], ["time", "rel_time"])
- self.__start_time = start_time
- self.addBlock(ConstantBlock("IC", start_time))
- self.addBlock(DelayBlock("delay"))
- self.addBlock(AdderBlock("TSum"))
- self.addBlock(AdderBlock("STSum"))
- self.addBlock(NegatorBlock("STNeg"))
- self.addConnection("h", "TSum")
- self.addConnection("delay", "TSum")
- self.addConnection("TSum", "delay", input_port_name='IN1')
- self.addConnection("delay", "time")
- self.addConnection("IC", "delay", input_port_name='IC')
- self.addConnection("IC", "STNeg")
- self.addConnection("TSum", "STSum")
- self.addConnection("STNeg", "STSum")
- self.addConnection("STSum", "rel_time")
- def getTime(self, curIt):
- """
- Gets the current time of the clock.
- """
- sig = self.getBlockByName("TSum").getSignal("OUT1")
- if curIt == 0 or len(sig) == 0:
- return self.__start_time
- return sig[curIt - 1].value
- def getRelativeTime(self, curIt):
- """
- Gets the relative simulation time (ignoring the start time).
- """
- return self.getTime(curIt) - self.__start_time
- class TimeBlock(BaseBlock):
- """
- Obtains the current time of the simulation.
- Args:
- block_name (str): The name of the block.
- Note:
- When manipulating and reading time values, it may be better to use the
- :class:`Clock` instead.
- """
- def __init__(self, block_name):
- BaseBlock.__init__(self, block_name, [], ["OUT1"])
- def compute(self, curIteration):
- time = self.getClock().getTime(curIteration)
- self.appendToSignal(time)
|