فهرست منبع

case study with algebraic adaptation done

Cláudio Gomes 8 سال پیش
والد
کامیت
687a862d5f

+ 19 - 5
SemanticAdaptationForFMI/FMIAbstraction/src/abstract_units/AbstractSimulationUnit.py

@@ -78,14 +78,28 @@ class AbstractSimulationUnit(object):
         Utils.trimList(self.__computed_time, toStep)
         Utils.trimList(self.__computed_time, toStep)
         raise "To be finished"
         raise "To be finished"
     
     
-    def __recordTime(self, current_time, step, current_iteration, step_size):
-        if current_iteration == 0:
+    def __recordTime(self, time, step, iteration, step_size):
+        assert step <= len(self.__computed_time)
+        if step == len(self.__computed_time):
+            assert iteration == 0
+            self.__computed_time.append([time + step_size])
+        elif step < len(self.__computed_time):
+            assert step == len(self.__computed_time) -1, "Makes no sense to rewrite past times, without rolling back first."
+            if iteration == len(self.__computed_time[step]):
+                self.__computed_time[step].append(time + step_size)
+            elif iteration < len(self.__computed_time[step]):
+                assert iteration == len(self.__computed_time[step]) - 1, "Weird use of the iteration records. Either rewrite the last iteration, or keep tracking them."
+                self.__computed_time[step][iteration] = time + step_size
+                
+        """
+        if iteration == 0:
             assert step == len(self.__computed_time)
             assert step == len(self.__computed_time)
-            self.__computed_time.append([current_time + step_size])
+            self.__computed_time.append([time + step_size])
         else:
         else:
             assert step == len(self.__computed_time) - 1
             assert step == len(self.__computed_time) - 1
-            assert current_iteration == len(self.__computed_time[step])
-            self.__computed_time[step].append(current_time + step_size)
+            assert iteration == len(self.__computed_time[step])
+            self.__computed_time[step].append(time + step_size)
+        """
        
        
     def doStep(self, time, step, current_iteration, step_size):
     def doStep(self, time, step, current_iteration, step_size):
         l.debug(">%s.doStep(t=%f, s=%d, i=%d, H=%f)", self._name,  time, step, current_iteration, step_size)
         l.debug(">%s.doStep(t=%f, s=%d, i=%d, H=%f)", self._name,  time, step, current_iteration, step_size)

+ 10 - 0
SemanticAdaptationForFMI/FMIAbstraction/src/abstract_units/Utils.py

@@ -25,12 +25,22 @@ class Utils(object):
     @staticmethod
     @staticmethod
     def copyValueToStateTrace(target, var, step, iteration, value):
     def copyValueToStateTrace(target, var, step, iteration, value):
         assert target.has_key(var)
         assert target.has_key(var)
+        assert step <= len(target[var])
         if step == len(target[var]):
         if step == len(target[var]):
             target[var].append([value])
             target[var].append([value])
+        elif step < len(target[var]):
+            assert step == len(target[var]) - 1, "Makes no sense to rewrite past steps, without rolling back first."
+            if iteration == len(target[var][step]):
+                target[var][step].append(value)
+            elif iteration < len(target[var][step]):
+                assert iteration == len(target[var][step]) - 1, "Weird use of the iteration records. Either rewrite the last iteration, or never rewrite them (in case you want to keep track of them."
+                target[var][step][iteration] = value
+        """
         elif iteration == len(target[var][step]):
         elif iteration == len(target[var][step]):
             target[var][step].append(value)
             target[var][step].append(value)
         else:
         else:
             target[var][step][iteration] = value
             target[var][step][iteration] = value
+        """
     
     
     @staticmethod
     @staticmethod
     def trimList(listToTrim, target_size):
     def trimList(listToTrim, target_size):

+ 163 - 0
SemanticAdaptationForFMI/FMIAbstraction/src/case_study/scenarios/ControlledScenario_AlgebraicAdaptation.py

@@ -0,0 +1,163 @@
+"""
+This scenario covers the following adaptations:
+- Step delay to the output of the controller
+- Algebraic loop adaptation for the power, window, and obstacle
+"""
+
+import logging
+
+from bokeh.plotting import figure, output_file, show
+
+from case_study.units.adaptations.InacurateControllerArmatureAdaptation_CT import InacurateControllerArmatureAdaptation_CT
+from case_study.units.de_based.DriverControllerStatechartFMU_CTInOut import DriverControllerStatechartFMU_CTInOut
+from case_study.units.de_based.EnvironmentStatechartFMU_CTInOut import EnvironmentStatechartFMU_CTInOut
+from case_study.units.adaptations.AlgebraicAdaptation_Power_Window_Obstacle import AlgebraicAdaptation_Power_Window_Obstacle
+
+NUM_RTOL = 1e-05
+NUM_ATOL = 1e-05
+
+l = logging.getLogger()
+l.setLevel(logging.DEBUG)
+
+
+cosim_step_size = 0.001
+num_internal_steps = 1
+stop_time = 6;
+
+environment = EnvironmentStatechartFMU_CTInOut("env", NUM_RTOL, NUM_ATOL)
+
+controller = DriverControllerStatechartFMU_CTInOut("controller", NUM_RTOL, NUM_ATOL)
+
+power_window_obstacle = AlgebraicAdaptation_Power_Window_Obstacle("strong_comp", NUM_RTOL, NUM_ATOL, cosim_step_size, num_internal_steps)
+
+armature_threshold = 5
+adapt_armature = InacurateControllerArmatureAdaptation_CT("arm_adapt", NUM_RTOL, NUM_ATOL, armature_threshold, True)
+
+environment.enterInitMode()
+controller.enterInitMode()
+power_window_obstacle.enterInitMode()
+adapt_armature.enterInitMode()
+
+step = 0
+iteration = 0
+
+envOut = environment.getValues(step, iteration, [environment.out_up, environment.out_down])
+
+power_window_obstacle.setValues(step, iteration, {
+                                 power_window_obstacle.omega: 0.0, 
+                                 power_window_obstacle.theta: 0.0, 
+                                 power_window_obstacle.i: 0.0,
+                                 power_window_obstacle.up: 0.0, # Delayed input
+                                 power_window_obstacle.down: 0.0    # Delayed input
+                                 })
+
+pOut = power_window_obstacle.getValues(step, iteration, [power_window_obstacle.i])
+
+adapt_armature.setValues(step, iteration, {adapt_armature.armature_current: pOut[power_window_obstacle.i],
+                                adapt_armature.out_obj: 0.0 })
+adaptArmOut = adapt_armature.getValues(step, iteration, [adapt_armature.out_obj])
+
+controller.setValues(step, iteration, {controller.in_dup : envOut[environment.out_up],
+                            controller.in_ddown : envOut[environment.out_down],
+                            controller.in_obj : adaptArmOut[adapt_armature.out_obj]})
+cOut = controller.getValues(step, iteration, [controller.out_up, controller.out_down])
+
+# Set the improved inputs to the components that got delayed inputs.
+power_window_obstacle.setValues(step, iteration, {
+                                                  power_window_obstacle.up: cOut[controller.out_up],
+                                                  power_window_obstacle.down: cOut[controller.out_down]})
+
+environment.exitInitMode()
+controller.exitInitMode()
+power_window_obstacle.exitInitMode()
+adapt_armature.exitInitMode()
+
+trace_i = [0.0]
+trace_omega = [0.0]
+trace_x = [0.0]
+trace_F = [0.0]
+times = [0.0]
+
+time = 0.0
+
+iteration = 0 # For now this is always zero
+
+for step in range(1, int(stop_time / cosim_step_size) + 1):
+    
+    """
+    We have delays the following inputs:
+    - control inputs to the power
+    - window tau input to the power
+    - window height input to the obstacle
+    
+    The order of computation in the scenario is as follows:
+    environment
+    power
+    obstacle
+    window
+    armature adaptation
+    controller
+    
+    """
+    
+    environment.doStep(time, step, iteration, cosim_step_size) 
+    envOut = environment.getValues(step, iteration, [environment.out_up, environment.out_down])
+    
+    # get power delayed inputs
+    cOut = controller.getValues(step-1, iteration, [controller.out_up, controller.out_down])
+    
+    power_window_obstacle.setValues(step, iteration, {
+                                     power_window_obstacle.up: cOut[controller.out_up], # Delayed input
+                                     power_window_obstacle.down: cOut[controller.out_down]    # Delayed input
+                                     })
+    power_window_obstacle.doStep(time, step, iteration, cosim_step_size)
+    pOut = power_window_obstacle.getValues(step, iteration, 
+                        [power_window_obstacle.i,
+                         power_window_obstacle.omega,
+                         power_window_obstacle.x,
+                         power_window_obstacle.F
+                         ])
+    
+    adapt_armature.setValues(step, iteration, 
+                             {adapt_armature.armature_current: pOut[power_window_obstacle.i]})
+    adapt_armature.doStep(time, step, iteration, cosim_step_size)
+    adaptArmOut = adapt_armature.getValues(step, iteration, [adapt_armature.out_obj])
+    
+    controller.setValues(step, iteration, {controller.in_dup : envOut[environment.out_up],
+                                controller.in_ddown : envOut[environment.out_down],
+                                controller.in_obj : adaptArmOut[adapt_armature.out_obj]})
+    controller.doStep(time, step, iteration, cosim_step_size)
+    
+    trace_omega.append(pOut[power_window_obstacle.omega])
+    trace_i.append(pOut[power_window_obstacle.i])
+    trace_x.append(pOut[power_window_obstacle.x])
+    trace_F.append(pOut[power_window_obstacle.F])
+    time += cosim_step_size
+    times.append(time)
+
+color_pallete = [
+                "#e41a1c",
+                "#377eb8",
+                "#4daf4a",
+                "#984ea3",
+                "#ff7f00",
+                "#ffff33",
+                "#a65628",
+                "#f781bf"
+                 ]
+
+output_file("./results.html", title="Results")
+p = figure(title="Plot", x_axis_label='time', y_axis_label='')
+p.line(x=times, y=trace_omega, legend="trace_omega", color=color_pallete[0])
+p.line(x=times, y=trace_i, legend="trace_i", color=color_pallete[1])
+show(p)
+
+output_file("./results_x.html", title="Results")
+p = figure(title="Plot", x_axis_label='time', y_axis_label='')
+p.line(x=times, y=trace_x, legend="trace_x", color=color_pallete[2])
+show(p)
+
+output_file("./results_F.html", title="Results")
+p = figure(title="Plot", x_axis_label='time', y_axis_label='')
+p.line(x=times, y=trace_F, legend="trace_F", color=color_pallete[3])
+show(p)

+ 221 - 0
SemanticAdaptationForFMI/FMIAbstraction/src/case_study/units/adaptations/AlgebraicAdaptation_Power_Window_Obstacle.py

@@ -0,0 +1,221 @@
+import logging
+
+import numpy
+
+from abstract_units.AbstractSimulationUnit import AbstractSimulationUnit, \
+    STEP_ACCEPT, INIT_MODE
+from case_study.units.ct_based.ObstacleFMU import ObstacleFMU
+from case_study.units.ct_based.PowerFMU import PowerFMU
+from case_study.units.ct_based.WindowFMU import WindowFMU
+
+l = logging.getLogger()
+
+class AlgebraicAdaptation_Power_Window_Obstacle(AbstractSimulationUnit):
+    """
+    This is the adaptation that realizes the strong coupling between the power,
+    window, and obstacle units.
+    """
+    
+    def __init__(self, name, num_rtol, num_atol, cosim_step_size, num_internal_steps):
+        
+        self._num_rtol = num_rtol
+        self._num_atol = num_atol
+        
+        self._max_iterations = 100
+        
+        self._power = PowerFMU("power", num_rtol, num_atol, cosim_step_size/num_internal_steps, 
+                     J=0.085, 
+                     b=5, 
+                     K=7.45, 
+                     R=0.15, 
+                     L=0.036,
+                     V_a=12)
+        
+        self.omega = self._power.omega
+        self.theta= self._power.theta
+        
+        
+        self._window = WindowFMU("window", num_rtol, num_atol, cosim_step_size/num_internal_steps,
+                     r=0.11, 
+                     b = 10)
+        
+        self._obstacle = ObstacleFMU("obstacle", num_rtol, num_atol, cosim_step_size/num_internal_steps, 
+                     c=1e5, 
+                     fixed_x=0.45)
+        
+        self.up = self._power.up
+        self.down = self._power.down
+        
+        input_vars = [self.down, self.up]
+        
+        self.i = self._power.i
+        self.omega = self._power.omega
+        self.theta = self._power.theta
+        self.x = self._window.x
+        self.F = self._obstacle.F
+        
+        state_vars = [self.i, self.omega, self.theta, self.x, self.F]
+        
+        algebraic_functions = {}
+        
+        AbstractSimulationUnit.__init__(self, name, algebraic_functions, state_vars, input_vars)
+    
+    def _isClose(self, a, b):
+        return numpy.isclose(a,b, self._num_rtol, self._num_atol)
+    
+    def _biggerThan(self, a, b):
+        return not numpy.isclose(a,b, self._num_rtol, self._num_atol) and a > b
+    
+    def _doInternalSteps(self, time, step, iteration, cosim_step_size):
+        l.debug(">%s._doInternalSteps(%f, %d, %d, %f)", self._name, time, step, iteration, cosim_step_size)
+        
+        assert self._biggerThan(cosim_step_size, 0), "cosim_step_size too small: {0}".format(cosim_step_size)
+        assert iteration == 0, "Fixed point iterations not supported outside of this component."
+        
+        converged = False
+        internal_iteration = 0
+        
+        cOut = self.getValues(step, iteration, [self.up, self.down])
+        wOut = self._window.getValues(step-1, iteration, [self._window.x, self._window.tau])
+        pOut = None
+        
+        while not converged and internal_iteration < self._max_iterations:
+            self._power.setValues(step, iteration, {
+                                     self._power.tau: wOut[self._window.tau], # Delayed input
+                                     self._power.up: cOut[self.up],
+                                     self._power.down: cOut[self.down]
+                                     })
+            self._power.doStep(time, step, iteration, cosim_step_size)
+            pOut = self._power.getValues(step, iteration, [self._power.omega, 
+                                                           self._power.theta, 
+                                                           self._power.i])
+            
+            self._obstacle.setValues(step, iteration, {self._obstacle.x: wOut[self._window.x]}) # Delayed input
+            self._obstacle.doStep(time, step, iteration, cosim_step_size)
+            oOut = self._obstacle.getValues(step, iteration, [self._obstacle.F])
+            
+            self._window.setValues(step, iteration, {self._window.omega_input: pOut[self._power.omega],
+                                        self._window.theta_input: pOut[self._power.theta],
+                                        self._window.F_obj: oOut[self._obstacle.F]
+                                        })
+            self._window.doStep(time, step, iteration, cosim_step_size)
+            wOut_corrected = self._window.getValues(step, iteration, [self._window.x, self._window.tau])
+            
+            l.debug("Iteration step completed:")
+            l.debug("wOut=%s;", wOut)
+            l.debug("wOut_corrected=%s.", wOut_corrected)
+            
+            
+            if self._isClose(wOut[self._window.x], wOut_corrected[self._window.x]) \
+                and self._isClose(wOut[self._window.tau], wOut_corrected[self._window.tau]):
+                converged = True
+            
+            internal_iteration = internal_iteration + 1
+            wOut = wOut_corrected
+                
+        if converged:
+            l.debug("Fixed point found after %d iterations", internal_iteration)
+        else:
+            l.debug("Fixed point not found after %d iterations", internal_iteration)
+            raise RuntimeError("Fixed point not found")
+        
+        AbstractSimulationUnit.setValues(self, step, iteration, 
+                                         {
+                                         self.i: pOut[self._power.i],
+                                         self.theta: pOut[self._power.theta],
+                                         self.omega: pOut[self._power.omega],
+                                         self.x: wOut[self._window.x],
+                                         self.F: oOut[self._obstacle.F]
+                                         });
+        
+        l.debug("<%s._doInternalSteps() = (%s, %d)", self._name, STEP_ACCEPT, cosim_step_size)
+        return (STEP_ACCEPT, cosim_step_size)
+    
+    def setValues(self, step, iteration, values):
+        l.debug(">%s.AlgebraicAdaptation_Power_Window_Obstacle.setValues(%d, %d, %s)", self._name, step, iteration, values)
+        
+        # Filter just the inputs.
+        inputs = {
+                  self.up: values[self.up],
+                  self.down: values[self.down]
+                  }
+        
+        AbstractSimulationUnit.setValues(self, step, iteration, inputs);
+        
+        if self._mode == INIT_MODE:
+            # Initialize the internal FMUs and compute the value of the armature.
+            
+            l.debug("Initializing strong component...")
+            
+            step = iteration = 0
+            
+            wOut_tau_delayed =  0.0;
+            wOut_x_delayed = 0.0;
+            
+            # Set power inputs (or initial state, given by values)
+            self._power.setValues(step, iteration, values)
+            self._power.setValues(step, iteration, {
+                                             self._power.tau: wOut_tau_delayed    
+                                             })
+            
+            # Get power initial outputs
+            pOut = self._power.getValues(step, iteration, [self._power.omega, self._power.theta, self._power.i])
+            
+            # Set obstacle initial inputs
+            # Assume they are zero because the outputs of the window are delayed.
+            self._obstacle.setValues(step, iteration, {self._obstacle.x: wOut_x_delayed})
+            # Get obstacle outputs
+            oOut = self._obstacle.getValues(step, iteration, [self._obstacle.F])
+            
+            # Set window inputs
+            self._window.setValues(step, iteration, {self._window.omega_input: pOut[self._power.omega],
+                                        self._window.theta_input: pOut[self._power.theta],
+                                        self._window.F_obj: oOut[self._obstacle.F]
+                                        })
+            # Get window outputs
+            wOut = self._window.getValues(step, iteration, [self._window.x, self._window.tau])
+            
+            # Set corrected power windows
+            self._power.setValues(step, iteration, {
+                                             self._power.tau: wOut[self._window.tau] # Delayed input from window
+                                             })
+            
+            # We know that convergence is easily achieved for this initialisation
+            assert self._isClose(wOut[self._window.tau], wOut_tau_delayed)
+            assert self._isClose(wOut[self._window.x], wOut_x_delayed)
+            
+            # Record the outputs
+            AbstractSimulationUnit.setValues(self, step, iteration, {
+                                                                     self.i: pOut[self._power.i],
+                                                                     self.theta: pOut[self._power.theta],
+                                                                     self.omega: pOut[self._power.omega],
+                                                                     self.x: wOut[self._window.x],
+                                                                     self.F: oOut[self._obstacle.F]
+                                                                     });
+            
+            l.debug("Strong component initialized.")
+            
+        l.debug("<%s.AlgebraicAdaptation_Power_Window_Obstacle.setValues()", self._name)
+    
+    
+    def enterInitMode(self):
+        l.debug(">%s.AlgebraicAdaptation_Power_Window_Obstacle.enterInitMode()", self._name)
+        
+        AbstractSimulationUnit.enterInitMode(self);
+        
+        self._power.enterInitMode()
+        self._window.enterInitMode()
+        self._obstacle.enterInitMode()
+        
+        l.debug("<%s.AlgebraicAdaptation_Power_Window_Obstacle.enterInitMode()", self._name)
+    
+    def exitInitMode(self):
+        l.debug(">%s.AlgebraicAdaptation_Power_Window_Obstacle.exitInitMode()", self._name)
+        
+        AbstractSimulationUnit.exitInitMode(self);
+        
+        self._power.exitInitMode()
+        self._window.exitInitMode()
+        self._obstacle.exitInitMode()
+        
+        l.debug("<%s.AlgebraicAdaptation_Power_Window_Obstacle.exitInitMode()", self._name)