Source code for pypdevs.minimal

# Copyright 2015 Modelling, Simulation and Design Lab (MSDL) at 
# McGill University and the University of Antwerp (http://msdl.cs.mcgill.ca/)
# 
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The minimal PythonPDEVS simulation kernel. It only supports simple Parallel DEVS simulation, without any fancy configuration options.
While it behaves exactly the same as the normal simulation kernel with default options, it is a lot faster due to skipping all features.
"""

from collections import defaultdict

# Uncomment this part to make a completely stand-alone simulation kernel
class BaseDEVS(object):
    def __init__(self, name):
        self.name = name
        self.IPorts = []
        self.OPorts = []
        self.ports = []
        self.parent = None
        self.time_last = (0.0, 0)
        self.time_next = (0.0, 1)
        self.my_input = {}

    def addPort(self, name, is_input):
        name = name if name is not None else "port%s" % len(self.ports)
        port = Port(is_input=is_input, name=name)
        if is_input:
            self.IPorts.append(port)
        else:
            self.OPorts.append(port)
        port.port_id = len(self.ports)
        self.ports.append(port)
        port.host_DEVS = self
        return port

    def addInPort(self, name=None):
        return self.addPort(name, True)

    def addOutPort(self, name=None):
        return self.addPort(name, False)

    def getModelName(self):
        return self.name

    def getModelFullName(self):
        return self.full_name

class AtomicDEVS(BaseDEVS):
    ID = 0

    def __init__(self, name):
        BaseDEVS.__init__(self, name)
        self.elapsed = 0.0
        self.state = None
        self.model_id = AtomicDEVS.ID
        AtomicDEVS.ID += 1

    def extTransition(self, inputs):
        return self.state

    def intTransition(self):
        return self.state

    def confTransition(self, inputs):
        self.state = self.intTransition()
        return self.extTransition(inputs)

    def timeAdvance(self):
        return float('inf')

    def outputFnc(self):
        return {}

class CoupledDEVS(BaseDEVS):
    def __init__(self, name):
        BaseDEVS.__init__(self, name)
        self.component_set = []

    def addSubModel(self, model):
        model.parent = self
        self.component_set.append(model)
        return model

    def connectPorts(self, p1, p2):
        p1.outline.append(p2)
        p2.inline.append(p1)

class RootDEVS(object):
    def __init__(self, components):
        from schedulers.schedulerHS import SchedulerHS as Scheduler
        self.component_set = components
        self.time_next = float('inf')
        self.scheduler = Scheduler(self.component_set, 1e-6, len(self.component_set))

class Port(object):
    def __init__(self, is_input, name=None):
        self.inline = []
        self.outline = []
        self.host_DEVS = None
        self.name = name

    def getPortname(self):
        return self.name

[docs]def directConnect(component_set): """ Perform a trimmed down version of the direct connection algorithm. It does not support transfer functions, but all the rest is the same. :param component_set: the iterable to direct connect :returns: the direct connected component_set """ new_list = [] for i in component_set: if isinstance(i, CoupledDEVS): component_set.extend(i.component_set) else: # Found an atomic model new_list.append(i) component_set = new_list # All and only all atomic models are now direct children of this model for i in component_set: # Remap the output ports for outport in i.OPorts: # The new contents of the line outport.routing_outline = set() worklist = list(outport.outline) for outline in worklist: # If it is a coupled model, we must expand this model if isinstance(outline.host_DEVS, CoupledDEVS): worklist.extend(outline.outline) else: outport.routing_outline.add(outline) outport.routing_outline = list(outport.routing_outline) return component_set
[docs]class Simulator(object): """ Minimal simulation kernel, offering only setTerminationTime and simulate. Use this Simulator instead of the normal one to use the minimal kernel. While it has a lot less features, its performance is much higher. The polymorphic scheduler is also used by default. """
[docs] def __init__(self, model): """ Constructor :param model: the model to simulate """ self.original_model = model if isinstance(model, CoupledDEVS): component_set = directConnect(model.component_set) ids = 0 for m in component_set: m.time_last = (-m.elapsed, 0) m.time_next = (-m.elapsed + m.timeAdvance(), 1) m.model_id = ids ids += 1 self.model = RootDEVS(component_set) elif isinstance(model, AtomicDEVS): for p in model.OPorts: p.routing_outline = [] model.time_last = (-model.elapsed, 0) model.time_next = (model.time_last[0] + model.timeAdvance(), 1) model.model_id = 0 self.model = RootDEVS([model]) self.setTerminationTime(float('inf'))
[docs] def setTerminationTime(self, time): """ Set the termination time of the simulation. :param time: simulation time at which simulation should terminate """ self.setTerminationCondition(lambda t, m: time <= t[0])
[docs] def setTerminationCondition(self, function): """ Set the termination condition of the simulation. :param function: termination condition to execute, taking the current simulated time and the model, returning a boolean (True to terminate) """ self.termination_function = function
[docs] def simulate(self): """ Perform the simulation """ scheduler = self.model.scheduler tn = scheduler.readFirst() while not self.termination_function(tn, self.original_model): # Generate outputs transitioning = defaultdict(int) for c in scheduler.getImminent(tn): transitioning[c] |= 1 outbag = c.outputFnc() for outport in outbag: p = outbag[outport] for inport in outport.routing_outline: inport.host_DEVS.my_input.setdefault(inport, []).extend(p) transitioning[inport.host_DEVS] |= 2 # Perform transitions for aDEVS, ttype in transitioning.iteritems(): if ttype == 1: aDEVS.state = aDEVS.intTransition() elif ttype == 2: aDEVS.elapsed = tn[0] - aDEVS.time_last[0] aDEVS.state = aDEVS.extTransition(aDEVS.my_input) elif ttype == 3: aDEVS.elapsed = 0. aDEVS.state = aDEVS.confTransition(aDEVS.my_input) aDEVS.time_next = (tn[0] + aDEVS.timeAdvance(), 1 if tn[0] > aDEVS.time_last[0] else tn[1] + 1) aDEVS.time_last = tn aDEVS.my_input = {} # Do reschedules scheduler.massReschedule(transitioning) tn = scheduler.readFirst() return tn[0]
def __getattr__(self, attr): """ Wrapper to inform users that they are using the minimal kernel if they zant to do some unsupported configuration option. """ if attr.startswith("set"): raise Exception("You are using the minimal simulation kernel, which does not support any configuration except for the termination time. Please switch to the normal simulation kernel to use this option.") else: raise AttributeError()