""" This file contains the standard library for CBD building blocks. """ from CBD.CBD import BaseBlock, CBD, level from CBD import naivelog import math __all__ = ['ConstantBlock', 'NegatorBlock', 'InverterBlock', 'AdderBlock', 'ProductBlock', 'ModuloBlock', 'RootBlock', 'AbsBlock', 'IntBlock', 'ClampBlock', 'GenericBlock', 'MultiplexerBlock', 'LessThanBlock', 'EqualsBlock', 'LessThanOrEqualsBlock', 'NotBlock', 'OrBlock', 'AndBlock', 'DelayBlock', 'LoggingBlock', 'AddOneBlock', 'DerivatorBlock', 'IntegratorBlock', 'SplitBlock', 'Clock', 'TimeBlock', 'PowerBlock', 'MaxBlock', 'MinBlock'] class ConstantBlock(BaseBlock): """ The constant block will always output its constant value """ def __init__(self, block_name, value=0.0): BaseBlock.__init__(self, block_name, [], ["OUT1"]) self.__value = value def getValue(self): """Get the current value.""" return self.__value def setValue(self, value): """Change the constant value.""" self.__value = value def compute(self, curIteration): # TO IMPLEMENT self.appendToSignal(self.getValue()) def __repr__(self): return BaseBlock.__repr__(self) + " Value = " + str(self.getValue()) + "\n" class NegatorBlock(BaseBlock): """ The negator block will output the value of the input multiplied with -1 """ def __init__(self, block_name): BaseBlock.__init__(self, block_name, ["IN1"], ["OUT1"]) def compute(self, curIteration): # TO IMPLEMENT self.appendToSignal(-self.getInputSignal(curIteration, "IN1").value) class InverterBlock(BaseBlock): """ The invertblock will output 1/IN """ def __init__(self, block_name, tolerance=1e-30): BaseBlock.__init__(self, block_name, ["IN1"], ["OUT1"]) self._tolerance = tolerance def compute(self, curIteration): # TO IMPLEMENT input = self.getInputSignal(curIteration, "IN1").value if abs(input) < self._tolerance: raise ZeroDivisionError("InverterBlock received input less than {}.".format(self._tolerance)) self.appendToSignal(1.0 / input) class AdderBlock(BaseBlock): """ The adderblock will add all the inputs. Args: block_name (str): The name of the block. numberOfInputs (int): The amount of input ports to set. """ def __init__(self, block_name, numberOfInputs=2): BaseBlock.__init__(self, block_name, ["IN%d" % (x+1) for x in range(numberOfInputs)], ["OUT1"]) self.__numberOfInputs = numberOfInputs def compute(self, curIteration): # TO IMPLEMENT result = 0 for i in range(1, self.__numberOfInputs+1): result += self.getInputSignal(curIteration, "IN%d"%i).value self.appendToSignal(result) def getNumberOfInputs(self): """ Gets the total number of input ports. """ return self.__numberOfInputs class ProductBlock(BaseBlock): """ The product block will multiply all the inputs """ def __init__(self, block_name, numberOfInputs=2): BaseBlock.__init__(self, block_name, ["IN%d" % (x+1) for x in range(numberOfInputs)], ["OUT1"]) self.__numberOfInputs = numberOfInputs def compute(self, curIteration): # TO IMPLEMENT result = 1 for i in range(1, self.__numberOfInputs+1): result *= self.getInputSignal(curIteration, "IN%d"%i).value self.appendToSignal(result) def getNumberOfInputs(self): """ Gets the total number of input ports. """ return self.__numberOfInputs class ModuloBlock(BaseBlock): """ A basic block that computes the IN1 modulo IN2 """ def __init__(self, block_name): BaseBlock.__init__(self, block_name, ["IN1", "IN2"], ["OUT1"]) def compute(self, curIteration): # TO IMPLEMENT # Use 'math.fmod' for validity with C w.r.t. negative values AND floats self.appendToSignal(math.fmod(self.getInputSignal(curIteration, "IN1").value, self.getInputSignal(curIteration, "IN2").value)) class RootBlock(BaseBlock): """ A basic block that computes the IN2-th root from IN1 """ def __init__(self, block_name): BaseBlock.__init__(self, block_name, ["IN1", "IN2"], ["OUT1"]) def compute(self, curIteration): # TO IMPLEMENT self.appendToSignal(self.getInputSignal(curIteration, "IN1").value ** (1 / self.getInputSignal(curIteration, "IN2").value)) class PowerBlock(BaseBlock): """ A basic block that computes IN1 to the IN2-th power """ def __init__(self, block_name): BaseBlock.__init__(self, block_name, ["IN1", "IN2"], ["OUT1"]) def compute(self, curIteration): # TO IMPLEMENT self.appendToSignal(self.getInputSignal(curIteration, "IN1").value ** self.getInputSignal(curIteration, "IN2").value) class AbsBlock(BaseBlock): """ The abs block will output the absolute value of the input. """ def __init__(self, block_name): BaseBlock.__init__(self, block_name, ["IN1"], ["OUT1"]) def compute(self, curIteration): # TO IMPLEMENT self.appendToSignal(abs(self.getInputSignal(curIteration).value)) class IntBlock(BaseBlock): """ The int block will output the integer value (floored) of the input. """ def __init__(self, block_name): BaseBlock.__init__(self, block_name, ["IN1"], ["OUT1"]) def compute(self, curIteration): self.appendToSignal(int(self.getInputSignal(curIteration).value)) class ClampBlock(BaseBlock): """ The clamp block will clamp the input between min and max. Args: block_name (str): The name of the block. min (numeric): The minimal value. max (numeric): The maximal value. use_const (bool): When :code:`True`, the :attr:`min` and :attr:`max` values will be used. Otherwise, the minimal and maximal values are expected as inputs 2 and 3, respectively. """ def __init__(self, block_name, min=-1, max=1, use_const=True): super().__init__(block_name, ["IN1"] if use_const else ["IN1", "IN2", "IN3"], ["OUT1"]) self._use_const = use_const self.min = min self.max = max def compute(self, curIteration): if self._use_const: min_ = self.min max_ = self.max else: min_ = self.getInputSignal(curIteration, "IN2").value max_ = self.getInputSignal(curIteration, "IN3").value x = self.getInputSignal(curIteration, "IN1").value self.appendToSignal(min(max(x, min_), max_)) class GenericBlock(BaseBlock): """ The generic block will evaluate the operator on the input operator is the name (a string) of a Python function from the math library which will be called when the block is evaluated by default, initialized to None """ def __init__(self, block_name, block_operator=None): # operator is the name (a string) of a Python function from the math library BaseBlock.__init__(self, block_name, ["IN1"], ["OUT1"]) self.__block_operator = block_operator def getBlockOperator(self): """ Gets the block operator. """ return self.__block_operator def compute(self, curIteration): # TO IMPLEMENT a = self.getInputSignal(curIteration, "IN1").value self.appendToSignal(getattr(math, self.getBlockOperator())(a)) def __repr__(self): repr = BaseBlock.__repr__(self) if self.__block_operator is None: repr += " No operator given\n" else: repr += " Operator :: " + self.__block_operator + "\n" return repr class MultiplexerBlock(BaseBlock): """ The multiplexer block will output the signal from IN1 if select == 0; otherwise the signal from IN2 is outputted. """ def __init__(self, block_name): BaseBlock.__init__(self, block_name, ["IN1", "IN2", "select"], ["OUT1"]) def compute(self, curIteration): select = self.getInputSignal(curIteration, "select").value self.appendToSignal(self.getInputSignal(curIteration, "IN1" if select == 0 else "IN2").value) class MaxBlock(BaseBlock): """ The max block will output the maximal value of all its inputs. """ def __init__(self, block_name, numberOfInputs=2): BaseBlock.__init__(self, block_name, ["IN%d" % (x+1) for x in range(numberOfInputs)], ["OUT1"]) self.__numberOfInputs = numberOfInputs def compute(self, curIteration): # TO IMPLEMENT result = [] for i in range(1, self.__numberOfInputs+1): result.append(self.getInputSignal(curIteration, "IN%d"%i).value) self.appendToSignal(max(result)) def getNumberOfInputs(self): """ Gets the total number of input ports. """ return self.__numberOfInputs class MinBlock(BaseBlock): """ The min block will output the minimal value of all its inputs. """ def __init__(self, block_name, numberOfInputs=2): BaseBlock.__init__(self, block_name, ["IN%d" % (x+1) for x in range(numberOfInputs)], ["OUT1"]) self.__numberOfInputs = numberOfInputs def compute(self, curIteration): # TO IMPLEMENT result = [] for i in range(1, self.__numberOfInputs+1): result.append(self.getInputSignal(curIteration, "IN%d"%i).value) self.appendToSignal(min(result)) def getNumberOfInputs(self): """ Gets the total number of input ports. """ return self.__numberOfInputs class SplitBlock(BaseBlock): """ The split block will split a signal over multiple paths. While this block can generally be omitted, it may still be used for clarity and clean-ness of the resulting models. Args: block_name (str): The name of the block. numberOfOutputs (int): The amount of paths to split into. """ def __init__(self, block_name, numberOfOutputs=2): BaseBlock.__init__(self, block_name, ["IN1"], ["OUT%d" % (i+1) for i in range(numberOfOutputs)]) self.__numberOfOutputs = numberOfOutputs def compute(self, curIteration): value = self.getInputSignal(curIteration).value for i in range(self.__numberOfOutputs): self.appendToSignal(value, "OUT%d" % (i+1)) def getNumberOfOutputs(self): """ Gets the total number of output ports. """ return self.__numberOfOutputs class LessThanBlock(BaseBlock): """ A simple block that will test if the IN1 is smaller than IN2 (output == 1 if true else 0) """ def __init__(self, block_name): BaseBlock.__init__(self, block_name, ["IN1", "IN2"], ["OUT1"]) def compute(self, curIteration): gisv = lambda s: self.getInputSignal(curIteration, s).value self.appendToSignal(1 if gisv("IN1") < gisv("IN2") else 0) class EqualsBlock(BaseBlock): """ A simple block that will test if the IN1 is equal to IN2 (output == 1 if true else 0) """ def __init__(self, block_name): BaseBlock.__init__(self, block_name, ["IN1", "IN2"], ["OUT1"]) def compute(self, curIteration): gisv = lambda s: self.getInputSignal(curIteration, s).value self.appendToSignal(1 if gisv("IN1") == gisv("IN2") else 0) class LessThanOrEqualsBlock(BaseBlock): """ A simple block that will test if the IN1 is smaller than or equals to IN2 (output == 1 if true else 0) """ def __init__(self, block_name): BaseBlock.__init__(self, block_name, ["IN1", "IN2"], ["OUT1"]) def compute(self, curIteration): gisv = lambda s: self.getInputSignal(curIteration, s).value self.appendToSignal(1 if gisv("IN1") <= gisv("IN2") else 0) class NotBlock(BaseBlock): """ A simple Not block that will set a 0 to 1 and vice versa """ def __init__(self, block_name): BaseBlock.__init__(self, block_name, ["IN1"], ["OUT1"]) def compute(self, curIteration): result = 0 if self.getInputSignal(curIteration, "IN1").value else 1 self.appendToSignal(result) class OrBlock(BaseBlock): """ A simple Or block with possibly multiple inputlines """ def __init__(self, block_name, numberOfInputs=2): BaseBlock.__init__(self, block_name, ["IN{0}".format(i) for i in range(1,numberOfInputs+1)], ["OUT1"]) self.__numberOfInputs = numberOfInputs def compute(self, curIteration): result = 0 for i in range(1, self.__numberOfInputs+1): result = result or self.getInputSignal(curIteration, "IN"+str(i)).value self.appendToSignal(result) def getNumberOfInputs(self): """ Gets the total number of input ports. """ return self.__numberOfInputs class AndBlock(BaseBlock): """ A simple And block with possibly multiple inputlines """ def __init__(self, block_name, numberOfInputs=2): BaseBlock.__init__(self, block_name, ["IN{0}".format(i) for i in range(1,numberOfInputs+1)], ["OUT1"]) self.__numberOfInputs = numberOfInputs def compute(self, curIteration): result = 1 for i in range(1, self.__numberOfInputs+1): result = result and self.getInputSignal(curIteration, "IN"+str(i)).value self.appendToSignal(result) def getNumberOfInputs(self): """ Gets the total number of input ports. """ return self.__numberOfInputs class DelayBlock(BaseBlock): """ A delay block that takes the last value from the list IC: Initial Condition """ def __init__(self, block_name): BaseBlock.__init__(self, block_name, ["IN1", "IC"], ["OUT1"]) def getDependencies(self, curIteration): # TO IMPLEMENT: This is a helper function you can use to create the dependency graph # Treat dependencies differently. For instance, at the first iteration (curIteration == 0), the block only depends on the IC; if curIteration == 0: return [self._linksIn["IC"].block] return [] def compute(self, curIteration): if curIteration == 0: self.appendToSignal(self.getInputSignal(curIteration, "IC").value) else: self.appendToSignal(self.getInputSignal(curIteration - 1).value) class LoggingBlock(BaseBlock): """ A simple Logging block """ def __init__(self, block_name, string, lev=level.WARNING): BaseBlock.__init__(self, block_name, ["IN1"], []) self.__string = string self.__logger = naivelog.getLogger("WarningLog") self.__lev = lev def compute(self, curIteration): if self.getInputSignal(curIteration, "IN1").value == 1: if self.__lev == level.WARNING: self.__logger.warning("Time " + str(self.getClock().getTime(curIteration)) + ": " + self.__string) elif self.__lev == level.ERROR: self.__logger.error("Time " + str(self.getClock().getTime(curIteration)) + ": " + self.__string) elif self.__lev == level.FATAL: self.__logger.fatal("Time " + str(self.getClock().getTime(curIteration)) + ": " + self.__string) class AddOneBlock(CBD): """ Block adds a one to the input (used a lot for mux) """ def __init__(self, block_name): CBD.__init__(self, block_name, ["IN1"], ["OUT1"]) self.addBlock(ConstantBlock(block_name="OneConstant", value=1)) self.addBlock(AdderBlock("PlusOne")) self.addConnection("IN1", "PlusOne") self.addConnection("OneConstant", "PlusOne") self.addConnection("PlusOne", "OUT1") class DerivatorBlock(CBD): """ The derivator block is a CBD that calculates the derivative. """ def __init__(self, block_name): CBD.__init__(self, block_name, ["IN1", "delta_t", "IC"], ["OUT1"]) # TO IMPLEMENT self.addBlock(ProductBlock(block_name="multIc")) self.addBlock(NegatorBlock(block_name="neg1")) self.addBlock(AdderBlock(block_name="sum1")) self.addBlock(DelayBlock(block_name="delay")) self.addBlock(NegatorBlock(block_name="neg2")) self.addBlock(AdderBlock(block_name="sum2")) self.addBlock(ProductBlock(block_name="mult")) self.addBlock(InverterBlock(block_name="inv")) self.addConnection("IC", "multIc") self.addConnection("delta_t", "multIc") self.addConnection("multIc", "neg1") self.addConnection("neg1", "sum1") self.addConnection("IN1", "sum1") self.addConnection("sum1", "delay", input_port_name="IC") self.addConnection("IN1", "delay", input_port_name="IN1") self.addConnection("delay", "neg2") self.addConnection("neg2", "sum2") self.addConnection("IN1", "sum2") self.addConnection("sum2", "mult") self.addConnection("delta_t", "inv") self.addConnection("inv", "mult") self.addConnection("mult", "OUT1") class IntegratorBlock(CBD): """ The integrator block is a CBD that calculates the integration. The block is implemented according to the backwards Euler rule. """ def __init__(self, block_name): CBD.__init__(self, block_name, ["IN1", "delta_t", "IC"], ["OUT1"]) # TO IMPLEMENT # TRAPEZOID RULE // TODO: fix the circularity issue => other solver? # # self.addBlock(ConstantBlock(block_name="zero", value=0.0)) # self.addBlock(ConstantBlock(block_name="half", value=0.5)) # self.addBlock(DelayBlock(block_name="delayIn")) # self.addBlock(ProductBlock(block_name="multDelta")) # self.addBlock(ProductBlock(block_name="multX")) # self.addBlock(DelayBlock(block_name="delayState")) # self.addBlock(AdderBlock(block_name="sumX")) # self.addBlock(AdderBlock(block_name="sumState")) # self.addBlock(NegatorBlock(block_name="neg")) # # self.addBlock(TimedGateBlock(block_name="gate", time=0.0)) # # self.addConnection("IN1", "delayIn") # self.addConnection("IN1", "neg") # # self.addConnection("zero", "delayIn", input_port_name="IC") # self.addConnection("neg", "delayIn", input_port_name="IC") # self.addConnection("IN1", "sumX") # self.addConnection("delayIn", "sumX") # self.addConnection("sumX", "multX") # self.addConnection("delta_t", "multDelta") # self.addConnection("half", "multDelta") # self.addConnection("multDelta", "multX") # self.addConnection("sumState", "delayState") # # self.addConnection("zero", "gate", input_port_name="IN1") # # self.addConnection("multX", "gate", input_port_name="IN2") # # self.addConnection("gate", "sumState") # self.addConnection("multX", "sumState") # self.addConnection("IC", "delayState", input_port_name="IC") # self.addConnection("delayState", "sumState") # self.addConnection("sumState", "OUT1") # FORWARD EULER: # self.addBlock(ProductBlock("mul")) # self.addBlock(AdderBlock("sum")) # self.addBlock(AdderBlock("sum_ic")) # self.addBlock(DelayBlock("delay")) # self.addBlock(NegatorBlock("neg")) # # self.addConnection("IN1", "mul") # self.addConnection("delta_t", "mul") # self.addConnection("mul", "sum") # self.addConnection("delay", "sum") # self.addConnection("sum", "delay") # self.addConnection("IC", "neg") # self.addConnection("neg", "sum_ic") # self.addConnection("sum_ic", "delay", input_port_name="IC") # self.addConnection("mul", "sum_ic") # self.addConnection("sum", "OUT1") # BACKWARD EULER: self.addBlock(ConstantBlock(block_name="zero", value=0)) self.addBlock(DelayBlock(block_name="delayIn")) self.addBlock(ProductBlock(block_name="multDelta")) self.addBlock(DelayBlock(block_name="delayState")) self.addBlock(AdderBlock(block_name="sumState")) self.addConnection("zero", "delayIn", input_port_name="IC") self.addConnection("IN1", "delayIn", input_port_name="IN1") self.addConnection("delayIn", "multDelta") self.addConnection("delta_t", "multDelta") self.addConnection("multDelta", "sumState") self.addConnection("IC", "delayState", input_port_name="IC") self.addConnection("delayState", "sumState") self.addConnection("sumState", "delayState", input_port_name="IN1") self.addConnection("sumState", "OUT1") class Clock(CBD): """ System clock. **Must be present in a simulation model.** Args: block_name (str): The name of the block. start_time (float): Time at which the simulation starts. Defaults to 0. :Input Ports: - :code:`h`: The delta in-between timesteps. For fixed-rate simulations, this must be linked up to a constant value (e.g. a :class:`ConstantBlock`). :Output Ports: - :code:`time`: The current simulation time. - :code:`rel_time`: The relative simulation time, ignoring the start time. """ def __init__(self, block_name, start_time=0.0): # TODO: simplify if start_time is 0 # TODO: rel_time must not depend on delay + h !! CBD.__init__(self, block_name, ["h"], ["time", "rel_time"]) self.__start_time = start_time self.addBlock(ConstantBlock("IC", start_time)) self.addBlock(DelayBlock("delay")) self.addBlock(AdderBlock("TSum")) self.addBlock(AdderBlock("STSum")) self.addBlock(NegatorBlock("STNeg")) self.addConnection("h", "TSum") self.addConnection("delay", "TSum") self.addConnection("TSum", "delay", input_port_name='IN1') self.addConnection("delay", "time") self.addConnection("IC", "delay", input_port_name='IC') self.addConnection("IC", "STNeg") self.addConnection("TSum", "STSum") self.addConnection("STNeg", "STSum") self.addConnection("STSum", "rel_time") def getTime(self, curIt): """ Gets the current time of the clock. """ sig = self.getBlockByName("TSum").getSignal("OUT1") if curIt == 0 or len(sig) == 0: return self.__start_time return sig[curIt - 1].value def getRelativeTime(self, curIt): """ Gets the relative simulation time (ignoring the start time). """ return self.getTime(curIt) - self.__start_time class TimeBlock(BaseBlock): """ Obtains the current time of the simulation. Args: block_name (str): The name of the block. Note: When manipulating and reading time values, it may be better to use the :class:`Clock` instead. """ def __init__(self, block_name): BaseBlock.__init__(self, block_name, [], ["OUT1"]) def compute(self, curIteration): time = self.getClock().getTime(curIteration) self.appendToSignal(time)