| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- #!/usr/bin/python3
- # This file was automatically generated from drawio2cbd with the command:
- # ../../drawio2cbd.py TrainSystem.xml TrainSystem.py -e TrainSystem -T 0.2 -t 360
- from ComputerBlock import ComputerBlock
- from TrainCostModelBlock import *
- from CBD.src.CBD import *
- DELTA_T = 0.2
- import matplotlib.pyplot as plt
- def plot_signals(block, signals, title):
- times = []
- outputs = []
- for signal in signals:
- tvpl = block.getSignal(signal)
- times = [t for t, _ in tvpl]
- outputs.append([v for _, v in tvpl])
- # Plot
- plt.figure()
- plt.title(title)
- plt.xlabel('time')
- plt.ylabel('N')
- for i in range(len(signals)):
- plt.plot(times, outputs[i], label=signals[i])
- plt.legend()
- plt.show()
- class Time(CBD):
- def __init__(self, block_name, h=(1)):
- CBD.__init__(self, block_name, input_ports=[], output_ports=['Out', 'OutDelta'])
-
- # Create the blocks
- self.addBlock(ConstantBlock(block_name='zero', value=(0)))
- self.addBlock(ConstantBlock(block_name='h', value=(h)))
- self.addBlock(AdderBlock(block_name='sum'))
- self.addBlock(DelayBlock(block_name='delay'))
-
- # Connect the blocks
- self.addConnection('sum', 'delay')
- self.addConnection('zero', 'delay', input_port_name='IC')
- self.addConnection('h', 'sum')
- self.addConnection('delay', 'sum')
- self.addConnection('delay', 'Out')
- self.addConnection('h', 'OutDelta')
- class TrainSystem(CBD):
- def __init__(self, block_name, h=(DELTA_T), Kp=(200), Ki=(0), Kd=(0)):
- CBD.__init__(self, block_name, input_ports=[], output_ports=['ActualVelocity', 'IdealVelocity', 'PeopleDisplacement', 'Acceleration', 'cost'])
-
- # Create the blocks
- self.addBlock(Time(block_name='time', h=(h)))
- self.addBlock(ComputerBlock(block_name='LookUp'))
- self.addBlock(AdderBlock(block_name='sum'))
- self.addBlock(PIDController(block_name='PID', Kp=(Kp), Ki=(Ki), Kd=(Kd)))
- self.addBlock(Plant(block_name='plant'))
- self.addBlock(NegatorBlock(block_name='neg'))
- self.addBlock(CostFunctionBlock(block_name='costBlock'))
- self.addBlock(AboveThresholdBlock(block_name='thresh', threshold=(0.4)))
- self.addBlock(StopSimulationBlock(block_name='stop'))
-
- # Connect the blocks
- self.addConnection('time', 'LookUp', output_port_name='Out')
- self.addConnection('sum', 'PID')
- self.addConnection('LookUp', 'sum')
- self.addConnection('time', 'PID', input_port_name='delta_t', output_port_name='OutDelta')
- self.addConnection('PID', 'plant', input_port_name='F')
- self.addConnection('time', 'plant', input_port_name='delta_t', output_port_name='OutDelta')
- self.addConnection('plant', 'neg', output_port_name='v')
- self.addConnection('neg', 'sum')
- self.addConnection('plant', 'ActualVelocity', output_port_name='v')
- self.addConnection('LookUp', 'IdealVelocity')
- self.addConnection('plant', 'PeopleDisplacement', output_port_name='x')
- self.addConnection('plant', 'Acceleration', output_port_name='a')
- self.addConnection('LookUp', 'costBlock', input_port_name='InVi')
- self.addConnection('time', 'costBlock', input_port_name='InDelta', output_port_name='OutDelta')
- self.addConnection('plant', 'costBlock', input_port_name='InXPerson', output_port_name='x')
- self.addConnection('plant', 'costBlock', input_port_name='InVTrain', output_port_name='v')
- self.addConnection('costBlock', 'cost', output_port_name='OutCost')
- self.addConnection('thresh', 'stop')
- self.addConnection('plant', 'thresh', output_port_name='x')
- class PIDController(CBD):
- def __init__(self, block_name, Kp=(0), Ki=(0), Kd=(0)):
- CBD.__init__(self, block_name, input_ports=['IN1', 'delta_t'], output_ports=['OUT1'])
-
- # Create the blocks
- self.addBlock(IntegratorBlock(block_name='int'))
- self.addBlock(ProductBlock(block_name='mult1'))
- self.addBlock(ConstantBlock(block_name='Kp', value=(Kp)))
- self.addBlock(AdderBlock(block_name='sum1'))
- self.addBlock(ProductBlock(block_name='mult2'))
- self.addBlock(ConstantBlock(block_name='Ki', value=(Ki)))
- self.addBlock(AdderBlock(block_name='sum2'))
- self.addBlock(DerivatorBlock(block_name='1MvUKixQIvwEmUJ5Sh7E-115'))
- self.addBlock(ProductBlock(block_name='1MvUKixQIvwEmUJ5Sh7E-122'))
- self.addBlock(ConstantBlock(block_name='Kd', value=(Kd)))
- self.addBlock(ConstantBlock(block_name='zero', value=(0)))
-
- # Connect the blocks
- self.addConnection('IN1', 'mult1')
- self.addConnection('IN1', 'int')
- self.addConnection('IN1', '1MvUKixQIvwEmUJ5Sh7E-115')
- self.addConnection('delta_t', 'int', input_port_name='delta_t')
- self.addConnection('delta_t', '1MvUKixQIvwEmUJ5Sh7E-115', input_port_name='delta_t')
- self.addConnection('Kp', 'mult1')
- self.addConnection('mult1', 'sum1')
- self.addConnection('sum1', 'OUT1')
- self.addConnection('int', 'mult2')
- self.addConnection('Ki', 'mult2')
- self.addConnection('mult2', 'sum2')
- self.addConnection('sum2', 'sum1')
- self.addConnection('1MvUKixQIvwEmUJ5Sh7E-115', '1MvUKixQIvwEmUJ5Sh7E-122')
- self.addConnection('Kd', '1MvUKixQIvwEmUJ5Sh7E-122')
- self.addConnection('1MvUKixQIvwEmUJ5Sh7E-122', 'sum2')
- self.addConnection('zero', '1MvUKixQIvwEmUJ5Sh7E-115', input_port_name='IC')
- self.addConnection('zero', 'int', input_port_name='IC')
- class Plant(CBD):
- def __init__(self, block_name, k=(300), c=(150), CD=(0.6), p=(1.2), A=(9.12), mp=(73), mt=(5555)):
- CBD.__init__(self, block_name, input_ports=['F', 'delta_t'], output_ports=['v', 'x', 'a'])
-
- # Create the blocks
- self.addBlock(ConstantBlock(block_name='mt', value=(mt)))
- self.addBlock(ConstantBlock(block_name='mp', value=(mp)))
- self.addBlock(AdderBlock(block_name='sum_M'))
- self.addBlock(InverterBlock(block_name='inv'))
- self.addBlock(ProductBlock(block_name='mul_M'))
- self.addBlock(ProductBlock(block_name='mul_p'))
- self.addBlock(ConstantBlock(block_name='p', value=(p)))
- self.addBlock(ConstantBlock(block_name='half', value=(1/2)))
- self.addBlock(ProductBlock(block_name='mul_pv'))
- self.addBlock(ProductBlock(block_name='square'))
- self.addBlock(ConstantBlock(block_name='CD', value=(CD)))
- self.addBlock(ProductBlock(block_name='mul_CD'))
- self.addBlock(ProductBlock(block_name='mul_above'))
- self.addBlock(ConstantBlock(block_name='A', value=(A)))
- self.addBlock(NegatorBlock(block_name='neg'))
- self.addBlock(AdderBlock(block_name='sum_div'))
- self.addBlock(IntegratorBlock(block_name='int'))
- self.addBlock(ConstantBlock(block_name='zero', value=(0)))
- self.addBlock(ConstantBlock(block_name='k', value=(k)))
- self.addBlock(ConstantBlock(block_name='c', value=(c)))
- self.addBlock(ProductBlock(block_name='mul_k'))
- self.addBlock(ProductBlock(block_name='mul_c'))
- self.addBlock(AdderBlock(block_name='sum_kc'))
- self.addBlock(ProductBlock(block_name='mul_FTM'))
- self.addBlock(ProductBlock(block_name='mul_mMF'))
- self.addBlock(AdderBlock(block_name='sum_x'))
- self.addBlock(InverterBlock(block_name='inv_mp'))
- self.addBlock(ProductBlock(block_name='prod_x'))
- self.addBlock(IntegratorBlock(block_name='int_vp'))
- self.addBlock(IntegratorBlock(block_name='int_xp'))
- self.addBlock(NegatorBlock(block_name='negx'))
-
- # Connect the blocks
- self.addConnection('F', 'sum_div')
- self.addConnection('F', 'mul_FTM')
- self.addConnection('mt', 'sum_M')
- self.addConnection('mp', 'sum_M')
- self.addConnection('sum_M', 'inv')
- self.addConnection('inv', 'mul_M')
- self.addConnection('half', 'mul_p')
- self.addConnection('mul_p', 'mul_pv')
- self.addConnection('square', 'mul_pv')
- self.addConnection('CD', 'mul_CD')
- self.addConnection('A', 'mul_CD')
- self.addConnection('mul_pv', 'mul_above')
- self.addConnection('mul_CD', 'mul_above')
- self.addConnection('mul_above', 'neg')
- self.addConnection('neg', 'sum_div')
- self.addConnection('sum_div', 'mul_M')
- self.addConnection('mul_M', 'int')
- self.addConnection('zero', 'int', input_port_name='IC')
- self.addConnection('delta_t', 'int', input_port_name='delta_t')
- self.addConnection('delta_t', 'int_vp', input_port_name='delta_t')
- self.addConnection('delta_t', 'int_xp', input_port_name='delta_t')
- self.addConnection('int', 'square')
- self.addConnection('int', 'v')
- self.addConnection('k', 'mul_k')
- self.addConnection('c', 'mul_c')
- self.addConnection('mul_k', 'sum_kc')
- self.addConnection('mul_c', 'sum_kc')
- self.addConnection('inv', 'mul_FTM')
- self.addConnection('mul_FTM', 'mul_mMF')
- self.addConnection('mp', 'mul_mMF')
- self.addConnection('mul_mMF', 'sum_x')
- self.addConnection('sum_kc', 'sum_x')
- self.addConnection('inv_mp', 'prod_x')
- self.addConnection('prod_x', 'int_vp')
- self.addConnection('zero', 'int_vp', input_port_name='IC')
- self.addConnection('int_vp', 'mul_c')
- self.addConnection('int_vp', 'int_xp')
- self.addConnection('zero', 'int_xp', input_port_name='IC')
- self.addConnection('int_xp', 'mul_k')
- self.addConnection('mul_M', 'a')
- self.addConnection('mp', 'inv_mp')
- self.addConnection('sum_x', 'negx')
- self.addConnection('negx', 'prod_x')
- self.addConnection('int_xp', 'x')
- self.addConnection('p', 'square')
- self.addConnection('int', 'mul_p')
- if __name__ == '__main__':
- cbd = TrainSystem("TrainSystem")
- # Run the simulation
- cbd.run(360, delta_t=DELTA_T)
- # process simulation results
- plot_signals(cbd, ['ActualVelocity', 'IdealVelocity'], 'Velocity of the Train')
- plot_signals(cbd, ['PeopleDisplacement', 'Acceleration'], 'People Displacement and Acceleration')
|