|
@@ -1,211 +0,0 @@
|
|
|
-from dataclasses import dataclass
|
|
|
-
|
|
|
-from matplotlib import pyplot as plt
|
|
|
-
|
|
|
-from src.data_extraction import DataExtraction
|
|
|
-from src.utils import get_model_path, get_results_path
|
|
|
-
|
|
|
-from typing import Dict, Any
|
|
|
-
|
|
|
-from fmpy import extract, read_model_description
|
|
|
-from fmpy.fmi2 import FMU2Slave
|
|
|
-from fmpy.model_description import ModelDescription
|
|
|
-from tqdm import tqdm
|
|
|
-
|
|
|
-@dataclass
|
|
|
-class CalibrationSetup:
|
|
|
- min_leakage_opening: float
|
|
|
- max_leakage_opening: float
|
|
|
- step_leakage_opening: float
|
|
|
- min_rise_time: float
|
|
|
- max_rise_time: float
|
|
|
- step_rise_time: float
|
|
|
- start_motor_position: float
|
|
|
- start_time: float
|
|
|
- stop_time: float
|
|
|
- step_size: float
|
|
|
- degree: int
|
|
|
-
|
|
|
-
|
|
|
-class Calibration:
|
|
|
-
|
|
|
- def __init__(self, cal_setup: CalibrationSetup):
|
|
|
- self.cal_setup = cal_setup
|
|
|
- self.model_path = get_model_path("calibration")
|
|
|
-
|
|
|
- self.current_time = self.cal_setup.start_time
|
|
|
-
|
|
|
- self._fmu: FMU2Slave = None
|
|
|
- self._model_desc: ModelDescription = None
|
|
|
-
|
|
|
- self._input_refs: Dict[str, Dict[str, Any]] = {}
|
|
|
- self._output_refs: Dict[str, Dict[str, Any]] = {}
|
|
|
- self._parameter_refs: Dict[str, Dict[str, Any]] = {}
|
|
|
-
|
|
|
- self.data_handler = DataExtraction()
|
|
|
-
|
|
|
- def setup(self, rise_time_heating, rise_time_cooling, leakage_opening):
|
|
|
- self._load_fmu()
|
|
|
- self._load_input_references()
|
|
|
- self._load_parameter_references()
|
|
|
- self._load_output_references()
|
|
|
- self._fmu.enterInitializationMode()
|
|
|
- self.set_start_position(1)
|
|
|
- self.set_rise_time_heating(rise_time_heating)
|
|
|
- self.set_rise_time_cooling(rise_time_cooling)
|
|
|
- self.set_leakage_opening(leakage_opening)
|
|
|
- self._fmu.exitInitializationMode()
|
|
|
- self.current_time = self.cal_setup.start_time
|
|
|
-
|
|
|
- def perform_step(self) -> bool:
|
|
|
- if self.current_time < self.cal_setup.stop_time:
|
|
|
- self._fmu.doStep(
|
|
|
- currentCommunicationPoint=self.current_time,
|
|
|
- communicationStepSize=self.cal_setup.step_size,
|
|
|
- )
|
|
|
- self.current_time += self.cal_setup.step_size
|
|
|
- self.current_time = round(self.current_time, 2)
|
|
|
- return True
|
|
|
- return False
|
|
|
-
|
|
|
- def _load_fmu(self):
|
|
|
-
|
|
|
- if not self.model_path.exists():
|
|
|
- raise FileNotFoundError(f"FMU file not found at path: '{self.model_path}'")
|
|
|
-
|
|
|
- extract_dir = extract(self.model_path)
|
|
|
- self._model_desc = read_model_description(self.model_path)
|
|
|
-
|
|
|
- self._fmu = FMU2Slave(
|
|
|
- guid=self._model_desc.guid, unzipDirectory=extract_dir,
|
|
|
- modelIdentifier=self._model_desc.coSimulation.modelIdentifier
|
|
|
- )
|
|
|
- self._fmu.instantiate()
|
|
|
- self._fmu.setupExperiment(
|
|
|
- startTime=self.cal_setup.start_time, stopTime=self.cal_setup.stop_time
|
|
|
- )
|
|
|
-
|
|
|
- def _load_input_references(self):
|
|
|
- self._input_refs = {
|
|
|
- var.name: {"reference": var.valueReference}
|
|
|
- for var in self._model_desc.modelVariables
|
|
|
- if var.causality == "input"
|
|
|
- }
|
|
|
-
|
|
|
- def _load_parameter_references(self):
|
|
|
- self._parameter_refs = {
|
|
|
- var.name: {"reference": var.valueReference}
|
|
|
- for var in self._model_desc.modelVariables
|
|
|
- if var.causality == "parameter" and (var.name in ["riseTimeCooling","riseTimeHeating", "leakageOpening"])
|
|
|
- }
|
|
|
-
|
|
|
- def _load_output_references(self):
|
|
|
- self._output_refs = {
|
|
|
- var.name: {"reference": var.valueReference}
|
|
|
- for var in self._model_desc.modelVariables
|
|
|
- if var.causality == "output"
|
|
|
- }
|
|
|
-
|
|
|
- def set_start_position(self, value: float):
|
|
|
- ref = self._input_refs.get("start_position")
|
|
|
- self._fmu.setReal([ref["reference"]], [value])
|
|
|
-
|
|
|
- def set_rise_time_heating(self, value: float):
|
|
|
- ref = self._parameter_refs.get("riseTimeHeating")
|
|
|
- self._fmu.setReal([ref["reference"]], [value])
|
|
|
-
|
|
|
- def set_rise_time_cooling(self, value: float):
|
|
|
- ref = self._parameter_refs.get("riseTimeCooling")
|
|
|
- self._fmu.setReal([ref["reference"]], [value])
|
|
|
-
|
|
|
- def set_leakage_opening(self, value: float):
|
|
|
- ref = self._parameter_refs.get("leakageOpening")
|
|
|
- self._fmu.setReal([ref["reference"]], [value])
|
|
|
-
|
|
|
- def set_new_position(self, value: float):
|
|
|
- ref = self._input_refs.get("u")
|
|
|
- self._fmu.setReal([ref["reference"]], [value])
|
|
|
-
|
|
|
- def get_motor_position(self):
|
|
|
- ref = self._output_refs.get("y")
|
|
|
- return round(self._fmu.getReal([ref["reference"]])[0], 5)
|
|
|
-
|
|
|
- def run(self):
|
|
|
-
|
|
|
- rise_time_cooling = round(self.cal_setup.min_rise_time, 0)
|
|
|
-
|
|
|
- best_results = {
|
|
|
- "rsse": float('inf'),
|
|
|
- "data": None,
|
|
|
- "rise_time_heating": None,
|
|
|
- "rise_time_cooling": None,
|
|
|
- "leakage": None
|
|
|
- }
|
|
|
-
|
|
|
- total_iters = (int((self.cal_setup.max_rise_time - self.cal_setup.min_rise_time) / self.cal_setup.step_rise_time) + 1) ** 2
|
|
|
- p_bar = tqdm(total=total_iters, desc="Calibration Progress")
|
|
|
-
|
|
|
- while rise_time_cooling <= round(self.cal_setup.max_rise_time, 0):
|
|
|
- rise_time_heating = round(self.cal_setup.min_rise_time, 0)
|
|
|
- while rise_time_heating <= round(self.cal_setup.max_rise_time, 0):
|
|
|
- leakage = round(self.cal_setup.min_leakage_opening, 4)
|
|
|
- # while leakage <= round(self.cal_setup.max_rise_time, 4):
|
|
|
-
|
|
|
- self.setup(rise_time_heating, rise_time_cooling, leakage)
|
|
|
- performed_step = True
|
|
|
- self.set_new_position(0)
|
|
|
-
|
|
|
- sim_results = {}
|
|
|
- while performed_step:
|
|
|
-
|
|
|
- sim_results[round(self.current_time, 1)] = self.get_motor_position()
|
|
|
-
|
|
|
- if abs(self.current_time - 600) <= 0.0005:
|
|
|
- self.set_new_position(1)
|
|
|
-
|
|
|
- performed_step = self.perform_step()
|
|
|
-
|
|
|
- # self.plot(sim_results, rise_time, leakage)
|
|
|
-
|
|
|
- # leakage += self.cal_setup.step_leakage_opening
|
|
|
- # leakage = round(leakage, 4)
|
|
|
-
|
|
|
- rsse, filtered_data = self.data_handler.calculate_rsse(sim_results, self.cal_setup.degree)
|
|
|
- if rsse < best_results['rsse']:
|
|
|
- best_results = {
|
|
|
- "rsse": rsse,
|
|
|
- "data": filtered_data,
|
|
|
- "rise_time_heating": rise_time_heating,
|
|
|
- "rise_time_cooling": rise_time_cooling,
|
|
|
- "leakage": leakage
|
|
|
- }
|
|
|
-
|
|
|
- rise_time_heating += self.cal_setup.step_rise_time
|
|
|
- rise_time_heating = round(rise_time_heating, 0)
|
|
|
- p_bar.update(1)
|
|
|
-
|
|
|
- rise_time_cooling += self.cal_setup.step_rise_time
|
|
|
- rise_time_cooling = round(rise_time_cooling, 0)
|
|
|
-
|
|
|
- self.plot(best_results)
|
|
|
-
|
|
|
- def plot(self, best_results):
|
|
|
-
|
|
|
- times = self.data_handler.df['Time'].values
|
|
|
- sim_data = best_results["data"]
|
|
|
- measured_data = self.data_handler.get_degree_values(self.cal_setup.degree)
|
|
|
-
|
|
|
- plt.figure(figsize=(8, 4))
|
|
|
- plt.plot(times, sim_data, linewidth=2, label="Simulation")
|
|
|
- plt.plot(times, measured_data, linewidth=2, label="Measured")
|
|
|
-
|
|
|
- plt.xlabel("Time [s]")
|
|
|
- plt.ylabel("Actuator Position")
|
|
|
- plt.title(
|
|
|
- f"Simulation: riseTimeCooling={best_results['rise_time_cooling']} riseTimeHeating={best_results['rise_time_heating']} "
|
|
|
- f"leakage={best_results['leakage']} RSSE={best_results['rsse']:.3f} degrees={self.cal_setup.degree}"
|
|
|
- )
|
|
|
- plt.grid(True, linestyle="--", alpha=0.7)
|
|
|
- plt.legend()
|
|
|
- plt.tight_layout()
|
|
|
- plt.savefig(get_results_path(f"calibration_{self.cal_setup.degree}_degree"))
|