Browse Source

statechart FMUs

Cláudio Gomes 8 years ago
parent
commit
5830a5af0f

+ 0 - 1
ModelicaModels/Obstacle.mo

@@ -2,7 +2,6 @@ model Obstacle
   parameter Real c = 1e5;
   parameter Real fixed_x = 0.45;
   input Real x;
-  input Real v;
   output Real F;
 equation
   F = if x > fixed_x then c*(x - fixed_x) else 0;

+ 0 - 1
ModelicaModels/UncontrolledScenario.mo

@@ -9,7 +9,6 @@ equation
   window.theta_input = power.theta;
   power.tau = -(window.tau + obstacle.F*window.r);
   obstacle.x = window.x;
-  obstacle.v = window.v;
   annotation(
     experiment(StartTime = 0, StopTime = 10, Tolerance = 1e-06, Interval = 0.004));
 end UncontrolledScenario;

+ 56 - 0
SemanticAdaptationForFMI/FMIAbstraction/src/sampleunits/DriverControllerStatechartFMU.py

@@ -0,0 +1,56 @@
+from units.StatechartSimulationUnit import StatechartSimulationUnit
+
+
+class DriverControllerStatechartFMU(StatechartSimulationUnit):
+    """
+    This is a simple controller for the power window case study.
+    It takes input events (dup, ddown, dstop, obj) representing the wishes of the driver 
+        and whether an object has been detected,
+        and produces output events (up, down, stop), 
+        representing instructions for the power system.
+    It has the states
+        Neutral    (initial)
+        Up
+        Down
+        Object
+    And transitions
+        Neutral --dup/up--> Up
+        Neutral --ddown/down--> Down
+        Up --dstop/stop--> Neutral
+        Up --ddown/down--> Down
+        Up --obj/down--> Object
+        Down --dstop/stop--> Neutral
+        Down --dup/up--> Up 
+        Object --after(1)/stop--> Neutral
+    """
+    
+    def __init__(self, name, num_rtol, num_atol):
+        
+        def state_transition(current_state, input_event, elapsed):
+            # Check for transitions enabled by input_event
+            if input_event != None:
+                if current_state=="Neutral":
+                    if input_event == "dup":
+                        return ("up","Up",True,StatechartSimulationUnit.TRIGGER_INPUT)
+                    if input_event == "ddown":
+                        return ("down","Down",True,StatechartSimulationUnit.TRIGGER_INPUT)
+                if current_state=="Up":
+                    if input_event == "dstop":
+                        return ("stop","Neutral",True,StatechartSimulationUnit.TRIGGER_INPUT)
+                    if input_event == "ddown":
+                        return ("down","Down",True,StatechartSimulationUnit.TRIGGER_INPUT)
+                    if input_event == "obj":
+                        return ("down","Object",True,StatechartSimulationUnit.TRIGGER_INPUT)
+                if current_state=="Down":
+                    if input_event == "dstop":
+                        return ("stop","Neutral",True,StatechartSimulationUnit.TRIGGER_INPUT)
+                    if input_event == "dup":
+                        return ("up","Up",True,StatechartSimulationUnit.TRIGGER_INPUT)
+            # Check for after transitions
+            if current_state=="Object":
+                if self._biggerThan(elapsed, 1.0):
+                    return ("stop","Neutral",True,StatechartSimulationUnit.TRIGGER_AFTER)
+            # No transition taken
+            return (None, current_state, False, None)
+        
+        StatechartSimulationUnit.__init__(self, name, num_rtol, num_atol, state_transition)

+ 24 - 0
SemanticAdaptationForFMI/FMIAbstraction/src/sampleunits/EnvironmentStatechartFMU.py

@@ -0,0 +1,24 @@
+from units.StatechartSimulationUnit import StatechartSimulationUnit
+
+
+class EnvironmentStatechartFMU(StatechartSimulationUnit):
+    """
+    This is a simple environment for the power window case study.
+    It has the states
+        Neutral    (initial)
+        Up
+        Down
+    And the transitions are self explanatory.
+    """
+    
+    def __init__(self, name, num_rtol, num_atol):
+        
+        def state_transition(current_state, input_event, elapsed):
+            # Check for after transitions
+            if current_state=="Neutral":
+                if self._biggerThan(elapsed, 0.5):
+                    return ("dup","Up",True,StatechartSimulationUnit.TRIGGER_AFTER)
+            # No transition taken
+            return (None, current_state, False, None)
+        
+        StatechartSimulationUnit.__init__(self, name, num_rtol, num_atol, state_transition)

+ 1 - 0
SemanticAdaptationForFMI/FMIAbstraction/src/scenarios/.gitignore

@@ -1,2 +1,3 @@
 /results.html
 /results_x.html
+/results_F.html

+ 1 - 1
SemanticAdaptationForFMI/FMIAbstraction/src/scenarios/UncontrolledScenario.py

@@ -88,7 +88,7 @@ for step in range(1, int(stop_time / cosim_step_size) + 1):
     
     # set old values for window (this is not ideal)
     pOut = power.getValues(step-1, 0, [power.omega, power.theta, power.i])
-
+    
     window.setValues(step, 0, {window.omega_input: pOut[power.omega],
                             window.theta_input: pOut[power.theta]})
     window.doStep(time, step, 0, cosim_step_size)

+ 137 - 0
SemanticAdaptationForFMI/FMIAbstraction/src/units/StatechartSimulationUnit.py

@@ -0,0 +1,137 @@
+'''
+Created on Mar 5, 2016
+
+@author: claudio gomes
+'''
+import logging
+
+import numpy
+
+from units.AbstractSimulationUnit import AbstractSimulationUnit, STEP_ACCEPT
+
+l = logging.getLogger()
+
+class StatechartSimulationUnit(AbstractSimulationUnit):
+    
+    TRIGGER_AFTER = "After"
+    TRIGGER_INPUT = "Input"
+    TRIGGER_DEFAULT = "Default"
+    
+    """
+    Represents the controller FMU in a statechart form.
+    It has a single string input and a single string output, denoted event_in/out.
+    The string can be None, to denote that no event occurs.
+    When an event occurs, the input gets the string representation of the event.
+    When no event occurs, it simply updated its internal time counters.
+    Due to after(x) transitions, there can be events produced after a doStep is performed.
+    Also, if an event occurs and its time does not coincide exactly with the next co-sim time, 
+        then the FMU rejects the step size and returns the step that he actually did.
+        
+    This class is generic and is meant to be subclassed by concrete implementations of statecharts.
+    
+    It currently does not support rejecting the step size in order to precisely hit an event time.
+    The cosim step size will just have to divide every after transition threshold.
+    
+    Example interaction:_______________
+    sFMU = StatechartFMU(...)
+        At this point, the sFMU should be in the initial state
+    sFMU.enterInitMode()
+    sFMU.setValues(...,"someEvent")
+        This tells the FMU that there is an event to be processed (can be None).
+        Even if it is None, it needs to be called with None.
+    "someReaction" = sFMU.getValues(...)
+        Because there was no doStep call in between, the event returned here can only be None.
+    sFMU.exitInitMode()
+    
+    sFMU.setValues(..., None)
+    sFMU.doStep(..., H)
+        The sFMU will check if there are input events and compute any output event that may be due, 
+            possibly making ALL state transitions that are enabled by None.
+            This computation is part of the output functions.
+            Also, if multiple events are produced, then only the last one will be taken into account.
+        When no transitions are enable, the sFMU advances its elapsed time.
+    
+    "someReaction" = sFMU.getValues(...)
+        The last event processed is in the output variable, ready to be collected.
+    ______________________________
+    """
+    
+    def __init__(self, name, num_rtol, num_atol, state_transition_function):
+        """
+        The state transition functions hold the encoding of the statechart.
+        They are called with the current state and the elapsed time on that state.
+        """
+        
+        self._state_transition_function = state_transition_function
+
+        self._num_rtol = num_rtol
+        self._num_atol = num_atol
+        
+        self.in_event = "__in_event"
+        self.out_event = "__out_event"
+        self.__current_state = "state"
+        
+        self.__last_transition_time = 0.0
+        
+        AbstractSimulationUnit.__init__(self, name, {}, [self.__current_state, self.out_event], [self.__in_event])
+    
+    def _isClose(self, a, b):
+        return numpy.isclose(a,b, self._num_rtol, self._num_atol)
+    
+    def _biggerThan(self, a, b):
+        return not numpy.isclose(a,b, self._num_rtol, self._num_atol) and a > b
+    
+    
+    """
+    The _doStepFunction function will compute state transitions as long as they are enable
+        and place the events that each transition taken may produce in the self.__outputEvent.
+    """
+    def _doStepFunction(self, time, state, inputs):
+        """
+        state is {self.__current_state : __current_state}
+        input is {self.input : input}
+        """
+        l.debug(">%s._doStepFunction(%f, %d, %d, %f)", self.name, state, inputs)
+        
+        transition_taken = True
+        (old_state, _) = state[self.__current_state]
+        inputToConsume = inputs[self.__in_event]
+        output_event = None
+        while transition_taken:
+            (out_event, new_state, transition_taken, trigger) = self._state_transition_function(old_state, inputToConsume, self.__last_transition_time - time)
+            if transition_taken:
+                l.debug("Transition taken from %s to %s because of %s. Produced event: %s.", old_state, new_state, trigger, out_event)
+                output_event = out_event if out_event != None else output_event
+                old_state = new_state
+                self.__last_transition_time = time
+                if trigger==StatechartSimulationUnit.TRIGGER_INPUT:
+                    inputToConsume = None
+            else:
+                l.debug("No transition taken.")
+        l.debug("Finished all enabled transitions.")
+        
+        l.debug("state=%s", old_state)
+        l.debug("output_event=%s", output_event)
+        
+        l.debug("<%s._doStepFunction()", self.name)
+        return (old_state, output_event)
+    
+    
+    def _doInternalSteps(self, time, step, iteration, cosim_step_size):
+        l.debug(">%s._doInternalSteps(%f, %d, %d, %f)", self._name, time, step, iteration, cosim_step_size)
+        
+        assert self._biggerThan(cosim_step_size, 0), "cosim_step_size too small: {0}".format(cosim_step_size)
+        assert iteration == 0, "Fixed point iterations involving this component are not supported."
+        
+        state_snapshot = self.getValues(step-1, iteration, self._getStateVars())
+        input_snapshot = self.getValues(step, iteration, self._getInputVars())
+        
+        (next_state, output_event) = self._doStepFunction(time+cosim_step_size, state_snapshot, input_snapshot)
+        
+        # Commit the new state
+        self.setValues(step, iteration, {self.__current_state : next_state, self.out_event : output_event})
+        
+        l.debug("<%s._doInternalSteps() = (%s, %d)", self._name, STEP_ACCEPT, cosim_step_size)
+        return (STEP_ACCEPT, cosim_step_size)
+
+