| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- from dataclasses import dataclass
- from matplotlib import patches
- from cbd.core import CBD, Signal
- from cbd.simulator import Simulator
- from cbd.lib.std import ProductBlock, AdderBlock, IntegratorBlock, ConstantBlock
- import matplotlib.pyplot as plt
- import arklog
- import logging
- class SpringMassDamper(CBD):
- """
- Causal Block Diagram of a Spring Mass Damper system.
- ┌───────┐ ┌───────┐ ┌───────┐
- ──|1|──(100)──>│ │ │ │ │ │
- │ │ │ │ │ │
- ┌─────>│ Σ ├────|y"|────>│ ∫ ├────|y'|────>│ ∫ ├────|y|────>
- │ │ │ │ │ │ │ │ │
- │ ┌──>│ │ │ │ │ │ │ │
- │ │ └───────┘ └───────┘ │ └───────┘ │
- │ │ │ │
- │ └───────────────|-2y'|──────(-2)────────┘ │
- │ │
- └────────────────────────────────|-100y|────────────(-100)───────┘
- """
- def __init__(self, block_name: str):
- CBD.__init__(self, block_name, output_ports=["y"])
- # Start counting at 1 to keep consistency with the CBD library
- blocks = [
- ProductBlock("prod1"),
- ProductBlock("prod2"),
- ProductBlock("prod3"),
- AdderBlock("sum1", numberOfInputs=3),
- IntegratorBlock("int1"),
- IntegratorBlock("int2"),
- ConstantBlock("const1", 100),
- ConstantBlock("const2", -2),
- ConstantBlock("const3", -100),
- ConstantBlock("const4", 0),
- ConstantBlock("const5", 1),
- ]
- for block in blocks:
- self.add_block(block)
- self.add_connection("const5", "prod1", output_port_name="OUT1", input_port_name="IN1")
- self.add_connection("const1", "prod1", output_port_name="OUT1", input_port_name="IN2")
- self.add_connection("prod1", "sum1", output_port_name="OUT1", input_port_name="IN1")
- self.add_connection("sum1", "int1", output_port_name="OUT1", input_port_name="IN1")
- self.add_connection("int1", "int2", output_port_name="OUT1", input_port_name="IN1")
- self.add_connection("int1", "prod2", output_port_name="OUT1", input_port_name="IN1")
- self.add_connection("const2", "prod2", output_port_name="OUT1", input_port_name="IN2")
- self.add_connection("prod2", "sum1", output_port_name="OUT1", input_port_name="IN2")
- self.add_connection("int2", "y", output_port_name="OUT1")
- self.add_connection("int2", "prod3", output_port_name="OUT1", input_port_name="IN1")
- self.add_connection("const3", "prod3", output_port_name="OUT1", input_port_name="IN2")
- self.add_connection("prod3", "sum1", output_port_name="OUT1", input_port_name="IN3")
- self.add_connection("const4", "int1", output_port_name="OUT1", input_port_name="IC")
- self.add_connection("const4", "int2", output_port_name="OUT1", input_port_name="IC")
- def plot_experiment(signals: [Signal]):
- """Show the plot of the experiment signals containing the mass position at a certain time."""
- logging.debug("Generating experiment plot.")
- time_values = []
- y_values = []
- for signal in signals:
- time_values.append(signal.time)
- y_values.append(signal.value)
- fig, ax = plt.subplots(figsize=(5, 5), dpi=100)
- ax.plot(time_values, y_values, label="Mass Position", color=(1.00, 0.55, 0.22))
- plt.xlabel("Time (s)", color="black")
- plt.ylabel("Distance (m)", color="black")
- plt.title("Spring Mass Damper System")
- area_of_interest_patch = patches.Rectangle((5, 0.9), 2, 0.2, linewidth=1, edgecolor=(0.20, 0.20, 0.80), facecolor="none")
- ax.add_patch(area_of_interest_patch)
- handles, labels = ax.get_legend_handles_labels()
- patch = patches.Patch(edgecolor=area_of_interest_patch.get_edgecolor(), facecolor=area_of_interest_patch.get_facecolor(), label="Area of Interest")
- handles.append(patch)
- ax.legend(handles=handles, loc="upper right")
- plt.show()
- @dataclass(frozen=True, slots=True)
- class TimeFrame:
- """Experiment time frame. Denotes a start and stop time as floating point numbers representing seconds."""
- start_time: float
- stop_time: float
- def oscillation_difference(signals: [Signal], time_frame: TimeFrame, threshold: float):
- """Check if the equilibrium is reached withing a certain tolerance threshold at a specific time frame."""
- filtered_signals = list(filter(lambda signal:time_frame.start_time <= signal.time <= time_frame.stop_time, signals)) # Make this a list because we iterate it twice later on
- minimum = min([signal.value for signal in filtered_signals])
- maximum = max([signal.value for signal in filtered_signals])
- logging.debug(f"{minimum=}")
- logging.debug(f"{maximum=}")
- return maximum - minimum < threshold
- def run_experiment(time_step: float = 0.0001, run_time_seconds: float = 7.0):
- """Use the defined Spring Mass Damper to run the experiment."""
- smd = SpringMassDamper("SpringMassDamper")
- logging.info(f"Running '{smd}' experiment for {run_time_seconds} seconds at {1 / time_step} Hz.")
- sim = Simulator(smd)
- sim.set_delta_t(time_step)
- sim.run(run_time_seconds)
- signals = smd.get_signal_history("y")
- plot_experiment(signals)
- in_tolerance = oscillation_difference(signals, TimeFrame(5.0, 7.0), threshold=0.1)
- tolerance_output = "The experiment falls within the specified parameters."
- if not in_tolerance:
- tolerance_output.replace("within", "outside")
- logging.info(tolerance_output)
- if __name__ == "__main__":
- arklog.set_config_logging()
- run_experiment()
|