| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- import numpy
- from operator import attrgetter
- ############### runtime classes
- class RunTimeEvent:
- def __init__(self, name, time):
- self.name = name
- self.time = time
- self.processed = False
- class Events:
- def __init__(self, events):
- self.events = sorted(events, key=attrgetter('time'))
-
- '''
- Returns the most recent event with the timestamp smaller than time.
- '''
- def getInputAt(self,time):
- event = None
- if self.events != []:
- # Claudio: this approach is assuming that the queue is sorted according the time of the events.
- for e in self.events:
- if e.time < time or numpy.isclose(e.time, time, 1e-05, 1e-08):
- event = e
- else:
- # stop at the first event that satisfies e.time > time
- break
- return event
-
- '''
- Removes event all other older events from the queue.
- '''
- def popEvent(self, event):
- if event == None:
- return
-
- assert event in self.events
- # Claudio: this approach is assuming that the queue is sorted according the time of the events.
- # first collect list of events to be removed.
- # then remove them.
- eventsToBeRemoved = []
- for e in self.events:
- eventsToBeRemoved.append(e)
- if e == event:
- break
- assert len(eventsToBeRemoved)>0
- for e in eventsToBeRemoved:
- self.events.remove(e)
-
- ############### static classes - invariants
- class Expression:
- def __init__(self):
- pass
- class FSAModel:
- def __init__(self,states,transitions):
- self.transitions = transitions
- self.states = states
- self.initialState = None
-
- def getTransitionFrom(self, state, event, elapsed):
- assert state != None
- assert elapsed >= 0.0
-
- # Priotiry goes to event.
- selectedTransition = None
-
- if event!=None:
- selectedTransition = self.getTriggeredTransition(state, event)
-
- # Then to after
- if (selectedTransition == None):
- selectedTransition = self.getTransitionAfter(state, elapsed)
-
- # And finally to default transitions
- if (selectedTransition == None):
- selectedTransition = self.getDefaultTransition(state)
-
- return selectedTransition
-
- def getDefaultTransition(self, state):
- assert state != None
- for t in self.transitions:
- if t.source == state and t.trigger == None:
- return t
-
- return None
-
-
- def getTriggeredTransition(self, state, event):
- assert event != None
- for t in self.transitions:
- if t.source == state and isinstance(t.trigger,Event) and t.trigger.name == event.name:
- return t
-
- return None
-
- def getTransitionAfter(self, state, elapsed):
- for t in self.transitions:
- if t.source == state and isinstance(t.trigger,After):
- if ExpressionVisitor(t.trigger.after).visit() <= elapsed:
- return t
- return None
- class ExpressionVisitor:
- def __init__(self, expression):
- self.expression = expression
-
- def visit(self):
- if(isinstance(self.expression,AtomValue)):
- return self.expression.value
- if(isinstance(self.expression,Operation)):
- left = ExpressionVisitor(self.expression.left).visit()
- right = ExpressionVisitor(self.expression.right).visit()
- if(self.expression.op == '+'):
- return left + right
- if(self.expression.op == '-'):
- return left - right
- if(self.expression.op == '*'):
- return left * right
- if(self.expression.op == '/'):
- return left / right
- if(self.expression.op == 'and'):
- return left and right
- if(self.expression.op == 'or'):
- return left or right
- if(isinstance(self.expression,Not)):
- return not ExpressionVisitor(self.expression.expression).visit()
- class State:
- def __init__(self, name, final = False):
- self.name = name
- self.final = final
-
- def getName(self):
- return self.name
-
- def __str__(self):
- return self.name
-
-
- class Operation(Expression):
- def __init__(self, left, right, op):
- self.op = op
- self.left = left
- self.right = right
-
- class Transition:
- def __init__(self,name, source,target):
- self.name = name
- self.source = source
- self.target = target
- self.trigger = None
- self.guard = None
-
- def __str__(self):
- return str(self.source) + "--" + self.name + ": " + str(self.trigger) + "-->" + str(self.target)
-
- class Trigger:
- def __init__(self):
- pass
- class Event(Trigger):
- def __init__(self,name):
- self.name = name
-
- def __str__(self):
- return self.name
-
- class After(Trigger):
- def __init__(self, expression):
- self.after = expression
-
- def __str__(self):
- return "after("+ str(self.after) + ")"
-
- class Guard:
- def __init__(self, expression):
- self.expression = expression
- class And(Operation):
- def __init__(self, lexpression, rexpression):
- self.left = lexpression
- self.right = rexpression
- self.op = "and"
- class Or(Operation):
- def __init__(self, lexpression, rexpression):
- self.left = lexpression
- self.right = rexpression
- self.op = "or"
- class Not(Expression):
- def __init__(self, expression):
- self.expression = expression
- class Variable(Expression):
- def __init__(self, varname):
- self.name = varname
- class AtomValue(Expression):
- def __init__(self, value):
- self.value = value
-
- def __str__(self):
- return str(self.value)
-
- class Integer(AtomValue):
- def __init__(self, value):
- self.value = value
-
- class Float(AtomValue):
- def __init__(self, value):
- self.value = value
- class String(AtomValue):
- def __init__(self, value):
- self.value = value
|