# Copyright 2020 Modelling, Simulation and Design Lab (MSDL) at
# McGill University and the University of Antwerp (http://msdl.cs.mcgill.ca/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Set of PythonPDEVS blocks for routing messages."""
from pypdevs.DEVS import AtomicDEVS
from pypdevs.infinity import INFINITY
[docs]class Finish(AtomicDEVS):
"""Destroys a message from the simulation.
Same as a collector (:mod:`pypdevsbbl.generic.collectors`), only no
data was gathered. The routing is just terminated.
Note:
This block is only added for a better user experience.
``Python`` has no necessity for garbage collection within
the `PythonPDEVS` simulator. It's a better practice to use
this block, but it is not enforced nor required.
.. image:: ../img/routing/finish.svg
:align: center
Args:
name (str): The name of the block.
Input Ports:
input (any): The message to be destroyed.
"""
def __init__(self, name):
super(Finish, self).__init__(name)
self.input = self.addInPort("input")
[docs] def extTransition(self, inputs):
if self.input in inputs:
del inputs[self.input]
return None
[docs]class Halt(AtomicDEVS):
"""Provides an end-point for the simulation.
Similar to a :class:`Finish` block, but the value will be
stored instead of destroyed. It can be used to indicate the
end of a simulation, or as a simple single-value collector.
Example:
.. code-block:: python
sim = Simulator(model) # -- [1]
sim.setTerminationCondition(Halt.anyHalted) # -- [2]
sim.simulate()
1. :attr:`model` is a :class:`CoupledDEVS` which may contain a :class:`Halt` block.
2. Let's set a termination condition on any :class:`Halt` block. Note that this is
decreases the efficiency, as mentioned in the `PythonPDEVS documentation
<https://msdl.uantwerpen.be/documentation/PythonPDEVS/examples_classic.html#termination>`_.
Note:
It is preferred only to use a :class:`Halt` block and its corresponding termination
condition when absolutely necessary, but know that this will decrease the overall
efficieny of the simulation. More often than not, usage of this block can and should
be omitted.
.. image:: ../img/routing/halt.svg
:align: center
Args:
name (str): The name of the block.
:State: The item that arrived.
Input Ports:
input (any): The item to store.
"""
def __init__(self, name):
super(Halt, self).__init__(name)
self.state = None
self.input = self.addInPort("input")
[docs] def extTransition(self, inputs):
if self.input in inputs:
self.state = inputs[self.input]
return self.state
[docs] def isHalted(self):
"""Whether or not an input was received.
Helper function to pass to the simulator as termination condition.
"""
return self.state is not None
[docs] @staticmethod
def anyHalted(time, model):
"""Helper function to check that there is at least one :class:`Halt` block halted.
See the above-mentioned example on how to use this method.
"""
return any((x.isHalted() for x in model.component_set if isinstance(x, Halt)))
[docs]class ChooseOutput(AtomicDEVS):
"""Choose a specific output to be opened.
All other outputs will be closed and messages arrived there are
discarded. This acts like a "path" that is enabled. The output
ports can be indexed with an integer in [0, n).
See Also:
:class:`Pick`
.. image:: ../img/routing/choose-output.svg
:align: center
Args:
name (str): The name of the block.
n (int): The amount of output ports there are available
in this block. Defaults to 2.
selected (int): The initial output port that is enabled.
State:
selected (int): The currently selected output port to be used.
item (any): The item that needs to be sent through a specific
output.
Input Ports:
select (int): The index of the new input port to be selected.
When not in [0, n), a `round robin` method is
applied.
input (any): The item that needs to be routed to the currently
enabled output port.
Output Ports:
outputs (list): A list of the output ports to choose from. These
ports are named `output-N`, where `N` represents
their identifier.
"""
def __init__(self, name: str, n: int=2, selected: int=0):
super(ChooseOutput, self).__init__(name)
self.state = {
"selected": selected % n,
"item": None
}
self.outputs = []
for i in range(n):
self.outputs.append(self.addOutPort("output-%i" % i))
self.select = self.addInPort("select")
self.input = self.addInPort("input")
[docs] def timeAdvance(self):
if self.state["item"] is not None:
return 0
return INFINITY
[docs] def outputFnc(self):
if self.state["item"] is not None:
return { self.outputs[self.state["selected"]]: self.state["item"] }
return {}
[docs] def intTransition(self):
self.state["item"] = None
return self.state
[docs] def extTransition(self, inputs):
if self.select in inputs:
val = inputs[self.select] % len(self.outputs)
self.state["selected"] = val
if self.input in inputs:
self.state["item"] = inputs[self.input]
return self.state
[docs]class Pick(AtomicDEVS):
"""Marks certain paths as available.
This block helps the :class:`ChooseOutput` block to balance the load
if each output of that block can be busy. Whenever such an event occurs,
any free output is chosen with the :class:`Pick` block. It is up to the
user to prevent access to the :class:`ChooseOutput` block if no path is
free.
Note:
Whenever no free path is found, no new value is outputted and therefore,
the :class:`ChooseOutput` will select the last chosen path.
See Also:
:class:`ChooseOutput`
.. image:: ../img/routing/pick.svg
:align: center
Args:
name (str): The name of the block.
n (numeric): The amount of paths to choose from.
contents (iter): A sorted iterable of size :attr:`n`, consisting of
a set of booleans. Each value corresponds to the
freedom of the path. E.g. ``[True, False, True]``
marks path 1 to be busy and paths 0 and 2 to be free.
When ``None``, all paths are marked as free. Defaults
to ``None``.
select (def): Function that selects a path from a list of available
paths. As an argument, a list of pairs is given in the
form of ``(port, idle time)``. By default, the first
available path is chosen.
fc (def): Function that checks if the claim input may mark the
path as busy. Defaults to always ``True``.
lif (bool): Indicative that the last port the fallback port is or
not. This port cannot be marked busy.
State:
claim (bool): When ``True``, the last selected path was marked as busy.
input-n (numeric): The freedom of the `n` th input port and its idle time.
Input Ports:
claim (any): Marks the currently selected path to be busy.
inputs (list): A list of the input ports to choose from. These ports are
named `input-N`, where `N` represents their identifier.
When a message arrives, the corresponding path is marked
as "free".
Output Ports:
output (int): The index of the path to select. Should be used as-is as
an input to the corresponding :class:`ChooseOutput` block.
free (bool): Outputs ``True`` when there is at least one free path.
"""
def __init__(self, name, n, contents=None, select=lambda x: x[0], fc=lambda x: True, lif=False):
assert contents is None or len(contents) == n
super(Pick, self).__init__(name)
self.select = select
self.fc = fc
self.lif = lif
self.state = {
"changed": True
}
self.inputs = []
for i in range(n):
self.state["input-%i" % i] = [True if contents is None else contents[i], 0.0]
self.inputs.append(self.addInPort("input-%i" % i))
self.state["port"] = self.next()
self.claim = self.addInPort("claim")
self.output = self.addOutPort("output")
self.free = self.addOutPort("free")
[docs] def next(self):
"""Gets the next available port."""
ports = [(x, self.state[x.name][1]) for x in self.inputs if self.state[x.name][0]]
if len(ports) > 0:
return self.select(ports)[0]
return None
[docs] def timeAdvance(self):
if self.state["changed"] and self.state["port"] is not None:
return 0.0
return INFINITY
[docs] def outputFnc(self):
if self.state["port"] is not None:
name = self.state["port"].name
if self.state[name][0]:
return {
self.output: int(name[6:]),
self.free: True
}
return {}
[docs] def intTransition(self):
self.state["changed"] = False
return self.state
[docs] def extTransition(self, inputs):
for port in self.inputs:
if self.state[port.name][0]:
self.state[port.name][1] += self.elapsed
for ip in inputs:
if ip is self.claim and self.fc(inputs[self.claim]):
if not self.lif or (self.lif and self.state["port"] != self.inputs[-1]):
self.state[self.state["port"].name][0] = False
self.state["changed"] = True
else:
self.state[ip.name] = [True, 0.0]
self.state["changed"] = True
self.state["port"] = self.next()
return self.state
[docs]class Guard(AtomicDEVS):
"""Shields sections to a limited amount of resources.
Items that request access should arrive on the `input` port,
whereas items that release their access should arrive on the
`leave` port. When no access is granted, the message will be
outputted on the `blocked` port and otherwise, it is passed
on to the `guarded` port. Items that leave are also outputted
on the `unguarded` port.
.. image:: ../img/routing/guard.svg
:align: center
Args:
name (str): The name of the block.
n (int): Amount of available resources at the start.
Defaults to 0.
w (def): Weight function for items that enter the block.
Takes the item itself as an argument and should
return an integer. It may not alter the item.
u (def): Weight function for items that leave the block.
Takes the item itself as an argument and should
return an integer. It may not alter the item.
State:
n (int): Amount of available resources.
enter (any): The item that requested access to the resources.
leave (any): The item that releases access to resources.
Input Ports:
input (any): The items that request access to resources.
leave (any): The items that release access to resources.
Output Ports:
blocked (any): The item that requested access will be outputted
here if there are not enough resources for it.
guarded (any): Items that were granted access to resources.
unguarded (any): Items that released access to the resources.
"""
def __init__(self, name: str, n: int=0, w = lambda i: 1, u = lambda i: 1):
super(Guard, self).__init__(name)
self.state = {
"n": n,
"enter": None,
"leave": None
}
self.w = w
self.u = u
self.input = self.addInPort("input")
self.leave = self.addInPort("leave")
self.blocked = self.addOutPort("blocked")
self.guarded = self.addOutPort("guarded")
self.unguarded = self.addOutPort("unguarded")
[docs] def timeAdvance(self):
if self.state["enter"] is not None or self.state["leave"] is not None:
return 0
return INFINITY
[docs] def outputFnc(self):
res = {}
if self.state["leave"] is not None:
res[self.unguarded] = self.state["leave"]
if self.state["enter"] is not None:
n = self.state["n"] - self.w(self.state["enter"])
if self.state["leave"] is not None:
n += self.u(self.state["leave"])
if n < 0:
res[self.blocked] = self.state["enter"]
else:
res[self.guarded] = self.state["enter"]
return res
[docs] def intTransition(self):
if self.state["leave"] is not None:
self.state["n"] += self.u(self.state["leave"])
self.state["leave"] = None
if self.state["enter"] is not None:
w = self.w(self.state["enter"])
if self.state["n"] - w >= 0:
self.state["n"] -= w
self.state["enter"] = None
return self.state
[docs] def extTransition(self, inputs):
if self.input in inputs:
self.state["enter"] = inputs[self.input]
if self.leave in inputs:
self.state["leave"] = inputs[self.leave]
return self.state
[docs]class Gate(AtomicDEVS):
"""Toggleable access to a section of a model.
Works similar to the :class:`Guard`, but does not make use of
resources. Instead a boolean value indicates if the message
receives access or not.
When this value is ``True``, all items will be outputted on the
`blocked` output, otherwise on the `out` output.
.. image:: ../img/routing/gate.svg
:align: center
Args:
name (str): The name of the block.
blocked (bool): The initial state of the gate.
State:
blocked (bool): Indicates where the items should be outputted.
item (any): The item to be outputted.
Input Ports:
input (any): The items that want to gain access to the critical
section.
block (bool): Indicates the state of the gate.
``True`` == closed; ``False`` == open
Output Ports:
output (any): All items that arrived on `input` if the gate is open.
blocked (any): All items that arrived on `input` if the gate is closed.
"""
def __init__(self, name: str, blocked: bool=True):
super(Gate, self).__init__(name)
self.state = {
"blocked": blocked,
"item": None
}
self.input = self.addInPort("input")
self.block = self.addInPort("block")
self.output = self.addOutPort("output")
self.blocked = self.addOutPort("blocked")
[docs] def timeAdvance(self):
if self.state["item"] is not None:
return 0
return INFINITY
[docs] def outputFnc(self):
if self.state["item"] is not None:
if self.state["blocked"]:
return { self.blocked: self.state["item"] }
return { self.output: self.state["item"] }
return {}
[docs] def intTransition(self):
self.state["item"] = None
return self.state
[docs] def extTransition(self, inputs):
if self.block in inputs:
self.state["blocked"] = inputs[self.block]
if self.input in inputs:
self.state["item"] = inputs[self.input]
return self.state
[docs]class _DT(AtomicDEVS):
"""Superclass for both the Delayer and the Timer.
Warning:
This class should not be used as a block by itself.
Args:
name (str): The name of the block.
"""
def __init__(self, name: str):
super(_DT, self).__init__(name)
self.state = {
"el": 0.0,
"dt": INFINITY,
"item": None,
"other": None
}
self.start = self.addInPort("start")
self.blocked = self.addOutPort("blocked")
self.finished = self.addOutPort("finished")
[docs] def timeAdvance(self):
if self.state["other"] is not None:
return 0.0
if self.state["item"] is not None:
return max(0.0, self.state["dt"] - self.state["el"])
return INFINITY
[docs] def outputFnc(self):
ta = self.state["dt"] - self.state["el"] - self.timeAdvance()
res = {}
if self.state["item"] is not None and ta <= 0:
res[self.finished] = self.state["item"]
elif self.state["other"] is not None:
res[self.blocked] = self.state["other"]
return res
[docs] def intTransition(self):
ta = self.state["dt"] - self.state["el"] - self.timeAdvance()
if self.state["item"] is not None and ta <= 0:
self.state["item"] = self.state["other"]
self.state["el"] = 0.0
self.state["other"] = None
return self.state
[docs] def extTransition(self, inputs):
raise NotImplementedError("The _DT class should not be used as a block by itself!")
[docs]class Delayer(_DT):
"""Holds a message for a certain delay, as is defined by the item.
.. image:: ../img/routing/delayer.svg
:align: center
Args:
name (str): The name of the block.
ft (def): Time delay function that takes the item as
argument and returns a numeric value. When
the returnvalue is less than 0, INFINITY is
assumed. The function musn't change the item.
Defaults to a function that always returns 1.
State:
el (numeric): The elapsed time since an item is being held.
dt (numeric): The time delay to hold an item for. Must be
positive or 0. This is the result of :meth:`ft`.
item (any): The item to hold.
other (any): When an item is already being held, store the
new item that arrives.
Input Ports:
start (any): Start the timer for the item.
Output Ports:
blocked (any): All items that arrive when an item is being held.
finished (any): The item thet was held for time delay `dt`.
"""
def __init__(self, name: str, ft= lambda i: 1):
super(Delayer, self).__init__(name)
self.ft = ft
[docs] def extTransition(self, inputs):
if self.state["item"] is not None:
self.state["el"] += self.elapsed
if self.start in inputs:
val = inputs[self.start]
if self.state["item"] is None:
self.state["item"] = val
self.state["dt"] = self.ft(val)
else:
self.state["other"] = val
return self.state
[docs]class Timer(_DT):
"""Holds a message for a predefined delay.
Basically corresponds to a blocking queue with a capacity of 1.
When there is a requirement for more items. Use a
:class:`pypdevsbbl.generic.queues.Queue` or a :class:`pypdevsbbl.generic.queues.SimpleQueue`
with an infinite `dd` and a non-infinte `dr`.
Note:
`DEVS` are evaluated at each event, not at each time instance, meaning that
there can be multiple events at the same time. If some of them are external,
while others are internal, they will never be displayed in the same *timeframe*.
A timeframe is therefore a collection of events. Each point in time can have
multiple timeframes.
Use this block (with a dt of 0) to sync up events that happen out-of-order w.r.t.
these timeframes.
.. image:: ../img/routing/timer.svg
:align: center
Args:
name (str): The name of the block.
dt (numeric): The initial delay to hold an item for.
State:
el (numeric): The elapsed time since an item is being held.
dt (numeric): The time delay to hold an item for. Must be
positive or 0. Defaults to 0.
item (any): The item to hold.
other (any): When an item is already being held, store the
new item that arrives.
Input Ports:
start (any): Start the timer for the item.
dt (numeric): New time delay to hold an item for. Must be
positive or 0. Any other value is discarded.
Output Ports:
blocked (any): All items that arrive when an item is being held.
finished (any): The item thet was held for time delay `dt`.
"""
def __init__(self, name: str, dt=0.0):
assert 0 <= dt
super(Timer, self).__init__(name)
self.state["dt"] = dt
self.dt = self.addInPort("dt")
[docs] def extTransition(self, inputs):
if self.dt in inputs:
t = inputs[self.dt]
if t >= 0:
self.state["dt"] = t
if self.state["item"] is not None:
self.state["el"] += self.elapsed
if self.start in inputs:
val = inputs[self.start]
if self.state["item"] is None:
self.state["item"] = val
else:
self.state["other"] = val
return self.state
[docs]class Sync(AtomicDEVS):
"""Synchronizes a set of connections.
Note:
This block does not enqueue the arriving messages
and should thus not be used to delay messages that
arrive too fast.
.. image:: ../img/routing/sync.svg
:align: center
Args:
name (str): The name of the block.
n (int): The amount of ports to sync. Must be greater
than 0.
:State: A list of all currently arrived messages.
Output Ports:
input (list): A list of the input ports to choose from. These
ports are named `input-N`, where `N` represents
their identifier.
output (list): A list of the output ports to choose from. These
ports are named `output-N`, where `N` represents
their identifier.
"""
def __init__(self, name, n):
assert n > 0
super(Sync, self).__init__(name)
self.state = []
self.inputs = []
self.outputs = []
for i in range(n):
self.inputs.append(self.addInPort("input-%i" % i))
self.outputs.append(self.addOutPort("output-%i" % i))
self.state.append(None)
[docs] def timeAdvance(self):
if any((x is None for x in self.state)):
return INFINITY
return 0.0
[docs] def outputFnc(self):
if any((x is None for x in self.state)):
return {}
return {self.outputs[i]: self.state[i] for i in range(len(self.state))}
[docs] def intTransition(self):
self.state = [None] * len(self.state)
return self.state
[docs] def extTransition(self, inputs):
for inp in inputs:
self.state[self.inputs.index(inp)] = inputs[inp]
return self.state