#!/usr/bin/python3 # This file was automatically generated from drawio2cbd with the command: # ../../drawio2cbd.py TrainSystem.xml TrainSystem.py -e TrainSystem -T 0.2 -t 360 from ComputerBlock import ComputerBlock from TrainCostModelBlock import * from CBD.src.CBD import * DELTA_T = 0.2 import matplotlib.pyplot as plt def plot_signals(block, signals, title): times = [] outputs = [] for signal in signals: tvpl = block.getSignal(signal) times = [t for t, _ in tvpl] outputs.append([v for _, v in tvpl]) # Plot plt.figure() plt.title(title) plt.xlabel('time') plt.ylabel('N') for i in range(len(signals)): plt.plot(times, outputs[i], label=signals[i]) plt.legend() plt.show() class Time(CBD): def __init__(self, block_name, h=(1)): CBD.__init__(self, block_name, input_ports=[], output_ports=['Out', 'OutDelta']) # Create the blocks self.addBlock(ConstantBlock(block_name='zero', value=(0))) self.addBlock(ConstantBlock(block_name='h', value=(h))) self.addBlock(AdderBlock(block_name='sum')) self.addBlock(DelayBlock(block_name='delay')) # Connect the blocks self.addConnection('sum', 'delay') self.addConnection('zero', 'delay', input_port_name='IC') self.addConnection('h', 'sum') self.addConnection('delay', 'sum') self.addConnection('delay', 'Out') self.addConnection('h', 'OutDelta') class TrainSystem(CBD): def __init__(self, block_name, h=(DELTA_T), Kp=(200), Ki=(0), Kd=(0)): CBD.__init__(self, block_name, input_ports=[], output_ports=['ActualVelocity', 'IdealVelocity', 'PeopleDisplacement', 'Acceleration', 'cost']) # Create the blocks self.addBlock(Time(block_name='time', h=(h))) self.addBlock(ComputerBlock(block_name='LookUp')) self.addBlock(AdderBlock(block_name='sum')) self.addBlock(PIDController(block_name='PID', Kp=(Kp), Ki=(Ki), Kd=(Kd))) self.addBlock(Plant(block_name='plant')) self.addBlock(NegatorBlock(block_name='neg')) self.addBlock(CostFunctionBlock(block_name='costBlock')) self.addBlock(AboveThresholdBlock(block_name='thresh', threshold=(0.4))) self.addBlock(StopSimulationBlock(block_name='stop')) # Connect the blocks self.addConnection('time', 'LookUp', output_port_name='Out') self.addConnection('sum', 'PID') self.addConnection('LookUp', 'sum') self.addConnection('time', 'PID', input_port_name='delta_t', output_port_name='OutDelta') self.addConnection('PID', 'plant', input_port_name='F') self.addConnection('time', 'plant', input_port_name='delta_t', output_port_name='OutDelta') self.addConnection('plant', 'neg', output_port_name='v') self.addConnection('neg', 'sum') self.addConnection('plant', 'ActualVelocity', output_port_name='v') self.addConnection('LookUp', 'IdealVelocity') self.addConnection('plant', 'PeopleDisplacement', output_port_name='x') self.addConnection('plant', 'Acceleration', output_port_name='a') self.addConnection('LookUp', 'costBlock', input_port_name='InVi') self.addConnection('time', 'costBlock', input_port_name='InDelta', output_port_name='OutDelta') self.addConnection('plant', 'costBlock', input_port_name='InXPerson', output_port_name='x') self.addConnection('plant', 'costBlock', input_port_name='InVTrain', output_port_name='v') self.addConnection('costBlock', 'cost', output_port_name='OutCost') self.addConnection('thresh', 'stop') self.addConnection('plant', 'thresh', output_port_name='x') class PIDController(CBD): def __init__(self, block_name, Kp=(0), Ki=(0), Kd=(0)): CBD.__init__(self, block_name, input_ports=['IN1', 'delta_t'], output_ports=['OUT1']) # Create the blocks self.addBlock(IntegratorBlock(block_name='int')) self.addBlock(ProductBlock(block_name='mult1')) self.addBlock(ConstantBlock(block_name='Kp', value=(Kp))) self.addBlock(AdderBlock(block_name='sum1')) self.addBlock(ProductBlock(block_name='mult2')) self.addBlock(ConstantBlock(block_name='Ki', value=(Ki))) self.addBlock(AdderBlock(block_name='sum2')) self.addBlock(DerivatorBlock(block_name='1MvUKixQIvwEmUJ5Sh7E-115')) self.addBlock(ProductBlock(block_name='1MvUKixQIvwEmUJ5Sh7E-122')) self.addBlock(ConstantBlock(block_name='Kd', value=(Kd))) self.addBlock(ConstantBlock(block_name='zero', value=(0))) # Connect the blocks self.addConnection('IN1', 'mult1') self.addConnection('IN1', 'int') self.addConnection('IN1', '1MvUKixQIvwEmUJ5Sh7E-115') self.addConnection('delta_t', 'int', input_port_name='delta_t') self.addConnection('delta_t', '1MvUKixQIvwEmUJ5Sh7E-115', input_port_name='delta_t') self.addConnection('Kp', 'mult1') self.addConnection('mult1', 'sum1') self.addConnection('sum1', 'OUT1') self.addConnection('int', 'mult2') self.addConnection('Ki', 'mult2') self.addConnection('mult2', 'sum2') self.addConnection('sum2', 'sum1') self.addConnection('1MvUKixQIvwEmUJ5Sh7E-115', '1MvUKixQIvwEmUJ5Sh7E-122') self.addConnection('Kd', '1MvUKixQIvwEmUJ5Sh7E-122') self.addConnection('1MvUKixQIvwEmUJ5Sh7E-122', 'sum2') self.addConnection('zero', '1MvUKixQIvwEmUJ5Sh7E-115', input_port_name='IC') self.addConnection('zero', 'int', input_port_name='IC') class Plant(CBD): def __init__(self, block_name, k=(300), c=(150), CD=(0.6), p=(1.2), A=(9.12), mp=(73), mt=(5555)): CBD.__init__(self, block_name, input_ports=['F', 'delta_t'], output_ports=['v', 'x', 'a']) # Create the blocks self.addBlock(ConstantBlock(block_name='mt', value=(mt))) self.addBlock(ConstantBlock(block_name='mp', value=(mp))) self.addBlock(AdderBlock(block_name='sum_M')) self.addBlock(InverterBlock(block_name='inv')) self.addBlock(ProductBlock(block_name='mul_M')) self.addBlock(ProductBlock(block_name='mul_p')) self.addBlock(ConstantBlock(block_name='p', value=(p))) self.addBlock(ConstantBlock(block_name='half', value=(1/2))) self.addBlock(ProductBlock(block_name='mul_pv')) self.addBlock(ProductBlock(block_name='square')) self.addBlock(ConstantBlock(block_name='CD', value=(CD))) self.addBlock(ProductBlock(block_name='mul_CD')) self.addBlock(ProductBlock(block_name='mul_above')) self.addBlock(ConstantBlock(block_name='A', value=(A))) self.addBlock(NegatorBlock(block_name='neg')) self.addBlock(AdderBlock(block_name='sum_div')) self.addBlock(IntegratorBlock(block_name='int')) self.addBlock(ConstantBlock(block_name='zero', value=(0))) self.addBlock(ConstantBlock(block_name='k', value=(k))) self.addBlock(ConstantBlock(block_name='c', value=(c))) self.addBlock(ProductBlock(block_name='mul_k')) self.addBlock(ProductBlock(block_name='mul_c')) self.addBlock(AdderBlock(block_name='sum_kc')) self.addBlock(ProductBlock(block_name='mul_FTM')) self.addBlock(ProductBlock(block_name='mul_mMF')) self.addBlock(AdderBlock(block_name='sum_x')) self.addBlock(InverterBlock(block_name='inv_mp')) self.addBlock(ProductBlock(block_name='prod_x')) self.addBlock(IntegratorBlock(block_name='int_vp')) self.addBlock(IntegratorBlock(block_name='int_xp')) self.addBlock(NegatorBlock(block_name='negx')) # Connect the blocks self.addConnection('F', 'sum_div') self.addConnection('F', 'mul_FTM') self.addConnection('mt', 'sum_M') self.addConnection('mp', 'sum_M') self.addConnection('sum_M', 'inv') self.addConnection('inv', 'mul_M') self.addConnection('half', 'mul_p') self.addConnection('mul_p', 'mul_pv') self.addConnection('square', 'mul_pv') self.addConnection('CD', 'mul_CD') self.addConnection('A', 'mul_CD') self.addConnection('mul_pv', 'mul_above') self.addConnection('mul_CD', 'mul_above') self.addConnection('mul_above', 'neg') self.addConnection('neg', 'sum_div') self.addConnection('sum_div', 'mul_M') self.addConnection('mul_M', 'int') self.addConnection('zero', 'int', input_port_name='IC') self.addConnection('delta_t', 'int', input_port_name='delta_t') self.addConnection('delta_t', 'int_vp', input_port_name='delta_t') self.addConnection('delta_t', 'int_xp', input_port_name='delta_t') self.addConnection('int', 'square') self.addConnection('int', 'v') self.addConnection('k', 'mul_k') self.addConnection('c', 'mul_c') self.addConnection('mul_k', 'sum_kc') self.addConnection('mul_c', 'sum_kc') self.addConnection('inv', 'mul_FTM') self.addConnection('mul_FTM', 'mul_mMF') self.addConnection('mp', 'mul_mMF') self.addConnection('mul_mMF', 'sum_x') self.addConnection('sum_kc', 'sum_x') self.addConnection('inv_mp', 'prod_x') self.addConnection('prod_x', 'int_vp') self.addConnection('zero', 'int_vp', input_port_name='IC') self.addConnection('int_vp', 'mul_c') self.addConnection('int_vp', 'int_xp') self.addConnection('zero', 'int_xp', input_port_name='IC') self.addConnection('int_xp', 'mul_k') self.addConnection('mul_M', 'a') self.addConnection('mp', 'inv_mp') self.addConnection('sum_x', 'negx') self.addConnection('negx', 'prod_x') self.addConnection('int_xp', 'x') self.addConnection('p', 'square') self.addConnection('int', 'mul_p') if __name__ == '__main__': cbd = TrainSystem("TrainSystem") # Run the simulation cbd.run(360, delta_t=DELTA_T) # process simulation results plot_signals(cbd, ['ActualVelocity', 'IdealVelocity'], 'Velocity of the Train') plot_signals(cbd, ['PeopleDisplacement', 'Acceleration'], 'People Displacement and Acceleration')