Kaynağa Gözat

remaining adaptaitons implemented

Cláudio Gomes 8 yıl önce
ebeveyn
işleme
054cac7074

+ 5 - 6
SemanticAdaptationForFMI/FMIAbstraction/src/abstract_units/CTSimulationUnit.py

@@ -60,10 +60,6 @@ class CTSimulationUnit(AbstractSimulationUnit):
         (do_next_step, micro_step_size) = (True, 0.0)
         
         while do_next_step:
-            
-            internal_time += micro_step_size
-            l.debug("internal_time=%f", internal_time)
-            
             # TODO Any input extrapolation would be done here by changing the input_buffer
             
             # flip the buffers
@@ -75,10 +71,13 @@ class CTSimulationUnit(AbstractSimulationUnit):
             next_state = state_buffers[next_state_idx]
             
             (do_next_step, micro_step_size) = self._doNumericalStep(next_state, previous_state, input_buffer, internal_time, step, iteration, next_cosim_time)
-        
+            
+            internal_time += micro_step_size
+            l.debug("internal_time=%f", internal_time)
+            
         
         # Commit the new state
-        self.setValues(step, iteration, state_buffers[previous_state_idx])
+        self.setValues(step, iteration, state_buffers[next_state_idx])
         
         l.debug("<%s._doInternalSteps() = (%s, %d)", self._name, STEP_ACCEPT, step_size)
         return (STEP_ACCEPT, step_size)

+ 6 - 4
SemanticAdaptationForFMI/FMIAbstraction/src/abstract_units/CTSimulationUnit_Euler.py

@@ -28,12 +28,14 @@ class CTSimulationUnit_Euler(CTSimulationUnit):
         
         self.__start_step_size = start_step_size
     
-    def _doNumericalStep(self, next_x, previous_x, inputs, time, step, iteration, STEP_TIME):
+    def _doNumericalStep(self, next_x, previous_x, inputs, time, step, iteration, stop_time):
         l.debug(">%s._doNumericalStep(%s, %s, %s, %f, %d, %d, %f)", 
-                self._name, next_x, previous_x, inputs, time, step, iteration, STEP_TIME)
+                self._name, next_x, previous_x, inputs, time, step, iteration, stop_time)
         
-        hit_stop_time = self._biggerThan(time + self.__start_step_size, STEP_TIME)
-        step_size = STEP_TIME - time if hit_stop_time else self.__start_step_size
+        hit_stop_time = self._biggerThan(time + self.__start_step_size, stop_time)
+        step_size = stop_time - time if hit_stop_time else self.__start_step_size
+        l.debug(">%s.step_size=%f", 
+                self._name, step_size)
         for state_var in self._state_derivatives.keys():
             state_derivative_function = self._state_derivatives[state_var]
             der_state_var = state_derivative_function(previous_x, inputs)

+ 275 - 0
SemanticAdaptationForFMI/FMIAbstraction/src/case_study/scenarios/ControlledScenario_AccurateCrossingAdaptation.py

@@ -0,0 +1,275 @@
+"""
+This scenario covers the following adaptations:
+- Step delay and decouple adaptation to all power outputs and to the obstacle output, to break the algebraic loops.
+- Accurate threshold crossing adaptation to the armature current output.
+
+The master algorithm's policy is as follows:
+Each time he asks an FMU to do a step, the FMU may either accept or reject the step.
+If the FMU accepts the step, the master moves onto the next one.
+If the FMU does not accept the state, the master rollsback every other FMU and retries with the step proposed by the FMU.
+
+After a while, if all FMU continuously accept the steps, the master will gradually increase the step.
+"""
+
+import logging
+
+from bokeh.plotting import figure, output_file, show
+
+from abstract_units.AbstractSimulationUnit import STEP_DISCARD
+from case_study.units.adaptations.AccurateControllerArmatureAdaptation import AccurateControllerArmatureAdaptation
+from case_study.units.ct_based.ObstacleFMU import ObstacleFMU
+from case_study.units.ct_based.PowerFMU import PowerFMU
+from case_study.units.ct_based.WindowFMU import WindowFMU
+from case_study.units.de_based.DriverControllerStatechartFMU_CTInOut import DriverControllerStatechartFMU_CTInOut
+from case_study.units.de_based.EnvironmentStatechartFMU_CTInOut import EnvironmentStatechartFMU_CTInOut
+
+
+NUM_RTOL = 1e-08
+NUM_ATOL = 1e-08
+
+l = logging.getLogger()
+l.setLevel(logging.DEBUG)
+
+START_STEP_SIZE = 0.001
+MAX_STEP_SIZE = 0.01
+MIN_STEPS_BEFORE_INCREASE_STEP_SIZE = 10
+STEP_SIZE_INCREASE_RATE = 10 # e.g, 10%
+
+FMU_START_RATE = 10
+STOP_TIME = 6;
+
+environment = EnvironmentStatechartFMU_CTInOut("env", NUM_RTOL, NUM_ATOL)
+
+controller = DriverControllerStatechartFMU_CTInOut("controller", NUM_RTOL, NUM_ATOL)
+
+power = PowerFMU("power", NUM_RTOL, NUM_ATOL, START_STEP_SIZE/FMU_START_RATE, 
+                     J=0.085, 
+                     b=5, 
+                     K=7.45, 
+                     R=0.15, 
+                     L=0.036,
+                     V_a=12)
+
+armature_threshold = 5
+adapt_armature = AccurateControllerArmatureAdaptation("arm_adapt", NUM_RTOL, NUM_ATOL, armature_threshold, True)
+
+window = WindowFMU("window", NUM_RTOL, NUM_ATOL, START_STEP_SIZE/FMU_START_RATE,
+                     r=0.11, 
+                     b = 10)
+
+obstacle = ObstacleFMU("obstacle", NUM_RTOL, NUM_ATOL, START_STEP_SIZE/FMU_START_RATE, 
+                     c=1e5, 
+                     fixed_x=0.45)
+
+environment.enterInitMode()
+controller.enterInitMode()
+power.enterInitMode()
+adapt_armature.enterInitMode()
+window.enterInitMode()
+obstacle.enterInitMode()
+
+step = 0
+iteration = 0
+
+# Set environment initial inputs
+# Nothing to do
+
+# Get environment initial outputs
+envOut = environment.getValues(step, iteration, [environment.out_up, environment.out_down])
+
+# Set power initial state
+power.setValues(step, iteration, {
+                                 power.omega: 0.0, 
+                                 power.theta: 0.0, 
+                                 power.i: 0.0,
+                                 power.tau: 0.0, # Delayed input
+                                 power.up: 0.0, # Delayed input
+                                 power.down: 0.0    # Delayed input
+                                 })
+# Get power initial outputs
+pOut = power.getValues(step, iteration, [power.omega, power.theta, power.i])
+
+# Set obstacle initial inputs
+# Assume they are zero because the outputs of the window are delayed.
+obstacle.setValues(step, iteration, {obstacle.x: 0.0})
+# Get obstacle outputs
+oOut = obstacle.getValues(step, iteration, [obstacle.F])
+
+# Set window inputs
+window.setValues(step, iteration, {window.omega_input: pOut[power.omega],
+                            window.theta_input: pOut[power.theta],
+                            window.F_obj: oOut[obstacle.F]
+                            })
+# Get window outputs
+wOut = window.getValues(step, iteration, [window.x, window.tau])
+
+# Set adapt armature inputs and initial state
+adapt_armature.setValues(step, iteration, {adapt_armature.armature_current: pOut[power.i],
+                                adapt_armature.out_obj: 0.0 })
+# Get adapt armature outputs
+adaptArmOut = adapt_armature.getValues(step, iteration, [adapt_armature.out_obj])
+
+# Set the initial inputs to the controller
+controller.setValues(step, iteration, {controller.in_dup : envOut[environment.out_up],
+                            controller.in_ddown : envOut[environment.out_down],
+                            controller.in_obj : adaptArmOut[adapt_armature.out_obj]})
+# Get the controller output
+cOut = controller.getValues(step, iteration, [controller.out_up, controller.out_down])
+
+# Set the improved inputs to the components that got delayed inputs.
+power.setValues(step, iteration, {power.tau: wOut[window.tau], 
+                       power.up: cOut[controller.out_up],
+                       power.down: cOut[controller.out_down]})
+obstacle.setValues(step, iteration, {obstacle.x: wOut[window.x]})
+
+environment.exitInitMode()
+controller.exitInitMode()
+power.exitInitMode()
+adapt_armature.exitInitMode()
+window.exitInitMode()
+obstacle.exitInitMode()
+
+trace_i = [0.0]
+trace_omega = [0.0]
+trace_x = [0.0]
+trace_F = [0.0]
+times = [0.0]
+
+time = 0.0
+
+# This is always zero because we do not keep track of intermediate values in fixed point iterations
+iteration = 0 
+
+step_size = START_STEP_SIZE
+step = 1 # Step 0 has already been done by the initialization.
+last_rollback_step = 0
+
+while time < STOP_TIME:
+    """
+    We have delays the following inputs:
+    - control inputs to the power
+    - window tau input to the power
+    - window height input to the obstacle
+    
+    The order of computation in the scenario is as follows:
+    environment
+    power
+    obstacle
+    window
+    armature adaptation
+    controller
+    
+    """
+    step_completed = False
+    while not step_completed:
+        l.debug("Start step: %d at time %f", step, time)
+        """
+        We know that the only FMU who can reject the step size is the accurate zero crossing adaptation.
+        So, I'll be lazy, and just take into account the output given by that FMU.
+        Unless there is more information, the output of each FMU should be taken into account.
+        """
+        environment.doStep(time, step, iteration, step_size) 
+        envOut = environment.getValues(step, iteration, [environment.out_up, environment.out_down])
+        
+        cOut = controller.getValues(step-1, iteration, [controller.out_up, controller.out_down])
+        wOut = window.getValues(step-1, iteration, [window.x, window.tau])
+        
+        power.setValues(step, iteration, {
+                                         power.tau: wOut[window.tau], # Delayed input
+                                         power.up: cOut[controller.out_up], # Delayed input
+                                         power.down: cOut[controller.out_down]    # Delayed input
+                                         })
+        power.doStep(time, step, iteration, step_size)
+        pOut = power.getValues(step, iteration, [power.omega, power.theta, power.i])
+        
+        """
+        We move the evaluation of the adapt armature component to as early as possible
+        because we know that it is the component that is most likely to reject the step size.
+        """
+        
+        # Set adapt armature inputs and initial state
+        adapt_armature.setValues(step, iteration, {adapt_armature.armature_current: pOut[power.i]})
+        (step_info, proposed_step_size) = adapt_armature.doStep(time, step, iteration, step_size)
+        
+        if step_info == STEP_DISCARD:
+            l.debug("Rolling back step: %d at time %f", step, time)
+            """
+            In this abstraction, we don't need to do much.
+            But we real FMI, we would have to restore the state of the FMUs.
+            """
+            step_completed = False
+            last_rollback_step = step
+            l.debug("Decreasing step size from %f to %f...", step_size, proposed_step_size)            
+            step_size = proposed_step_size
+        else:
+            step_completed = True
+    
+    # At this point, we know that the adapt_armature FMU has accepted the step size, so we can proceed.
+    
+    # Get adapt armature outputs
+    adaptArmOut = adapt_armature.getValues(step, iteration, [adapt_armature.out_obj])
+    
+    # The outputs of the window are delayed.
+    obstacle.setValues(step, iteration, {obstacle.x: wOut[window.x]}) # Delayed input
+    obstacle.doStep(time, step, iteration, step_size)
+    oOut = obstacle.getValues(step, iteration, [obstacle.F])
+    
+    window.setValues(step, iteration, {window.omega_input: pOut[power.omega],
+                                window.theta_input: pOut[power.theta],
+                                window.F_obj: oOut[obstacle.F]
+                                })
+    window.doStep(time, step, iteration, step_size)
+    wOut = window.getValues(step, iteration, [window.x, window.tau])
+    
+    
+    # Set the initial inputs to the controller
+    controller.setValues(step, iteration, {controller.in_dup : envOut[environment.out_up],
+                                controller.in_ddown : envOut[environment.out_down],
+                                controller.in_obj : adaptArmOut[adapt_armature.out_obj]})
+    controller.doStep(time, step, iteration, step_size)
+    
+    if (step - last_rollback_step) > MIN_STEPS_BEFORE_INCREASE_STEP_SIZE \
+                and step_size < MAX_STEP_SIZE:
+        new_step_size = step_size + step_size * (STEP_SIZE_INCREASE_RATE/100.0)
+        l.debug("Increasing step size from %f to %f...", step_size, new_step_size)            
+        step_size = new_step_size
+        if step_size > MAX_STEP_SIZE:
+            l.debug("Max step size attained: %f.", MAX_STEP_SIZE)  
+            step_size = MAX_STEP_SIZE
+    
+    
+    step += 1
+    time += step_size
+    times.append(time)
+    
+    trace_omega.append(pOut[power.omega])
+    trace_i.append(pOut[power.i])
+    trace_x.append(wOut[window.x])
+    trace_F.append(oOut[obstacle.F])
+
+color_pallete = [
+                "#e41a1c",
+                "#377eb8",
+                "#4daf4a",
+                "#984ea3",
+                "#ff7f00",
+                "#ffff33",
+                "#a65628",
+                "#f781bf"
+                 ]
+
+output_file("./results.html", title="Results")
+p = figure(title="Plot", x_axis_label='time', y_axis_label='')
+p.line(x=times, y=trace_omega, legend="trace_omega", color=color_pallete[0])
+p.line(x=times, y=trace_i, legend="trace_i", color=color_pallete[1])
+show(p)
+
+output_file("./results_x.html", title="Results")
+p = figure(title="Plot", x_axis_label='time', y_axis_label='')
+p.line(x=times, y=trace_x, legend="trace_x", color=color_pallete[2])
+show(p)
+
+output_file("./results_F.html", title="Results")
+p = figure(title="Plot", x_axis_label='time', y_axis_label='')
+p.line(x=times, y=trace_F, legend="trace_F", color=color_pallete[3])
+show(p)

+ 96 - 0
SemanticAdaptationForFMI/FMIAbstraction/src/case_study/units/adaptations/AccurateControllerArmatureAdaptation.py

@@ -0,0 +1,96 @@
+import logging
+
+import numpy
+
+from abstract_units.AbstractSimulationUnit import AbstractSimulationUnit, \
+    STEP_ACCEPT, STEP_DISCARD
+
+
+l = logging.getLogger()
+
+class AccurateControllerArmatureAdaptation(AbstractSimulationUnit):
+    """
+    This is the adaptation of the armature signal coming from the power system and into the controller statechart.
+    The input will be the armature continuous signal.
+    The output will be a pulse signal obj that can either be 0, or 1 whenever the absolute value of 
+        the armature signal exceeds a certain threshold.
+    The discontinuity is accurately located by rejecting the step size whenever it is larger than a given tolerance.
+    
+    """
+    
+    def __init__(self, name, num_rtol, num_atol, threshold, upward):
+        
+        self._num_rtol = num_rtol
+        self._num_atol = num_atol
+        
+        assert upward, "Not implemented yet."
+        
+        self.__crossUpward = upward
+        
+        self.__threshold = threshold
+        
+        self.armature_current = "armature_current"
+        self.out_obj = "out_obj"
+        input_vars = [self.armature_current]
+        state_vars = [self.out_obj]
+        
+        algebraic_functions = {}
+        AbstractSimulationUnit.__init__(self, name, algebraic_functions, state_vars, input_vars)
+    
+    def _isClose(self, a, b):
+        return numpy.isclose(a,b, self._num_rtol, self._num_atol)
+    
+    def _biggerThan(self, a, b):
+        return not numpy.isclose(a,b, self._num_rtol, self._num_atol) and a > b
+    
+    def _doInternalSteps(self, time, step, iteration, step_size):
+        l.debug(">%s._doInternalSteps(%f, %d, %d, %f)", self._name, time, step, iteration, step_size)
+        
+        assert self._biggerThan(step_size, 0), "step_size too small: {0}".format(step_size)
+        assert iteration == 0, "Fixed point iterations not supported yet."
+        
+        current_input = self.getValues(step, iteration, 
+                                       self._getInputVars())[self.armature_current]
+        previous_input = self.getValues(step-1, iteration, 
+                                        self._getInputVars())[self.armature_current]
+        
+        output_value = 0
+        
+        l.debug("%s.previous_input=%f", self._name, previous_input)
+        l.debug("%s.current_input=%f", self._name, current_input)
+        
+        step_info = STEP_ACCEPT
+        proposed_step_size = step_size
+        
+        if self.__crossUpward:
+            if (not self._biggerThan(previous_input, self.__threshold)) \
+                    and self._biggerThan(current_input, self.__threshold):
+                l.debug("Crossing detected from %f to %f", 
+                        previous_input, current_input)
+                l.debug("But step too large.")
+                step_info = STEP_DISCARD
+                 
+                # Convert this into a zero crossing problem
+                negative_value = previous_input - self.__threshold
+                positive_value = current_input - self.__threshold
+                
+                # Estimate the step size by regula-falsi
+                proposed_step_size = (step_size * (- negative_value)) / (positive_value - negative_value)
+                assert proposed_step_size > 0.0, "Problem with the signals and tolerances. All hope is lost."
+                
+            elif (not self._biggerThan(previous_input, self.__threshold))\
+                and self._isClose(current_input, self.__threshold):
+                l.debug("Crossing detected from %f to %f", 
+                        previous_input, current_input)
+                l.debug("And step size is acceptable.")
+                output_value = 1
+                
+        
+        l.debug("%s.output_value=%s", self._name, output_value)
+        self.setValues(step, iteration, {self.out_obj: output_value})
+        
+        l.debug("<%s._doInternalSteps() = (%s, %d)", 
+                self._name, step_info, proposed_step_size)
+        return (step_info, proposed_step_size)
+    
+