# Copyright 2014 Modelling, Simulation and Design Lab (MSDL) at
# McGill University and the University of Antwerp (http://msdl.cs.mcgill.ca/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
The actual DEVS solvers containing the main DEVS implementation
"""
from collections import defaultdict
from pypdevs.DEVS import *
from pypdevs.util import *
from pypdevs.logger import *
from pypdevs.classicDEVSWrapper import ClassicDEVSWrapper
[docs]class Solver(object):
"""
A unified DEVS solver, containing all necessary functions
"""
[docs] def __init__(self, listeners = {}):
"""
Constructor
"""
self.activities = {}
self.dsdevs_dict = {}
self.listeners = listeners
[docs] def atomicOutputGenerationEventTracing(self, aDEVS, time):
"""
Wrapper for the AtomicDEVS output function, which will save event counts
:param aDEVS: the AtomicDEVS model that generates the output
:param time: the time at which the output must be generated
:returns: dict -- the generated output
"""
retval = Solver.atomicOutputGeneration(self, aDEVS, time)
for port in retval:
port.msg_count += len(retval[port])
return retval
[docs] def atomicOutputGeneration(self, aDEVS, time):
"""
AtomicDEVS function to generate output, invokes the outputFnc function of the model.
:param aDEVS: the AtomicDEVS model that generates the output
:param time: the time at which the output must be generated
:returns: dict -- the generated output
"""
aDEVS.my_output = aDEVS.outputFnc()
# Being here means that this model created output, so it triggered its internal transition
# save this knowledge in the basesimulator for usage in the actual transition step
self.transitioning[aDEVS] |= 1
return aDEVS.my_output
[docs] def massAtomicTransitions(self, trans, clock):
"""
AtomicDEVS function to perform all necessary transitions,
does so on a collection of models for performance.
:param trans: iterable containing all models and their requested transition
:param clock: the time at which the transition must happen
"""
t, age = clock
partialmod = []
for aDEVS in trans:
ttype = trans[aDEVS]
###########
## Memoization and activity tracking code
## Skipped in local simulation
if not self.temporary_irreversible:
# Memo part
if self.memoization and len(aDEVS.memo) >= 2:
found = False
prev = aDEVS.memo.pop()
memo = aDEVS.memo[-1]
if memo.time_last == clock and prev.loadState() == aDEVS.state:
if ttype == 1:
found = True
elif aDEVS.my_input == memo.my_input:
# Inputs should be equal too
if ttype == 3:
found = True
elif aDEVS.elapsed == memo.elapsed and ttype == 2:
found = True
if found:
aDEVS.state = memo.loadState()
aDEVS.time_last = clock
aDEVS.time_next = memo.time_next
# Just add the copy
aDEVS.old_states.append(memo)
if self.do_some_tracing:
# Completely skip all these calls if no tracing, saves us a lot of function calls
if ttype == 1:
self.tracers.tracesInternal(aDEVS)
elif ttype == 2:
self.tracers.tracesExternal(aDEVS)
elif ttype == 3:
self.tracers.tracesConfluent(aDEVS)
aDEVS.my_input = {}
if self.relocation_pending:
# Quit ASAP by throwing an exception
raise QuickStopException()
continue
else:
aDEVS.memo = []
activity_tracking_prevalue = aDEVS.preActivityCalculation()
elif self.activity_tracking:
activity_tracking_prevalue = aDEVS.preActivityCalculation()
###########
# Make a copy of the message before it is passed to the user
if self.msg_copy != 2:
# Prevent a pass statement, which still consumes some time in CPython
if self.msg_copy == 1:
# Using list comprehension inside of dictionary comprehension...
aDEVS.my_input = {key:
[i.copy() for i in aDEVS.my_input[key]]
for key in aDEVS.my_input}
elif self.msg_copy == 0:
# Dictionary comprehension
aDEVS.my_input = {key:
pickle.loads(pickle.dumps(aDEVS.my_input[key],
pickle.HIGHEST_PROTOCOL))
for key in aDEVS.my_input}
# NOTE ttype mappings: (EI)
# 1 -- Internal transition (01)
# 2 -- External transition (10)
# 3 -- Confluent transition (11)
if ttype == 1:
# Internal only
aDEVS.elapsed = None
aDEVS.state = aDEVS.intTransition()
elif ttype == 2:
# External only
aDEVS.elapsed = t - aDEVS.time_last[0]
aDEVS.state = aDEVS.extTransition(aDEVS.my_input)
elif ttype == 3:
# Confluent
aDEVS.elapsed = 0.
aDEVS.state = aDEVS.confTransition(aDEVS.my_input)
else:
raise DEVSException(
"Problem in transitioning dictionary: unknown element %s"
% ttype)
ta = aDEVS.timeAdvance()
aDEVS.time_last = clock
if ta < 0:
raise DEVSException("Negative time advance in atomic model '" + \
aDEVS.getModelFullName() + "' with value " + \
str(ta) + " at time " + str(t))
# Update the time, this is just done in the timeNext, as this will propagate to the basesimulator
aDEVS.time_next = (t + ta, 1 if ta else (age + 1))
# Save the state
if not self.temporary_irreversible:
partialmod.append(aDEVS)
# But only if there are multiple kernels, since otherwise there would be no other kernel to invoke a revertion
# This can save us lots of time for local simulation (however, all other code is written with parallellisation in mind...)
activity = aDEVS.postActivityCalculation(activity_tracking_prevalue)
aDEVS.old_states.append(self.state_saver(aDEVS.time_last,
aDEVS.time_next,
aDEVS.state,
activity,
aDEVS.my_input,
aDEVS.elapsed))
if self.relocation_pending:
# Quit ASAP by throwing an exception
for m in partialmod:
# Roll back these models to before the transitions
m.time_next = m.old_states[-1].time_next
m.time_last = m.old_states[-1].time_last
m.state = m.old_states[-1].loadState()
self.model.scheduler.massReschedule(trans)
self.server.flushQueuedMessages()
raise QuickStopException()
elif self.activity_tracking:
activity = aDEVS.postActivityCalculation(activity_tracking_prevalue)
self.total_activities[aDEVS.model_id] += activity
if self.do_some_tracing:
# Completely skip all these calls if no tracing, saves us a lot of function calls
if ttype == 1:
self.tracers.tracesInternal(aDEVS)
elif ttype == 2:
self.tracers.tracesExternal(aDEVS)
elif ttype == 3:
self.tracers.tracesConfluent(aDEVS)
# Clear the bag
aDEVS.my_input = {}
self.server.flushQueuedMessages()
[docs] def atomicInit(self, aDEVS, time):
"""
AtomicDEVS function to initialise the model
:param aDEVS: the model to initialise
"""
aDEVS.time_last = (time[0] - aDEVS.elapsed, 1)
ta = aDEVS.timeAdvance()
if ta < 0:
raise DEVSException("Negative time advance in atomic model '" + \
aDEVS.getModelFullName() + "' with value " + \
str(ta) + " at initialisation")
aDEVS.time_next = (aDEVS.time_last[0] + ta, 1)
# Save the state
if not self.irreversible:
aDEVS.old_states.append(self.state_saver(aDEVS.time_last,
aDEVS.time_next,
aDEVS.state,
0.0,
{},
0.0))
# All tracing features
self.tracers.tracesInit(aDEVS, time)
[docs] def coupledOutputGenerationClassic(self, time):
"""
CoupledDEVS function to generate the output, calls the atomicDEVS models where necessary. Output is routed too.
:param time: the time at which output should be generated
:returns: the models that should be rescheduled
"""
cDEVS = self.model
imminent = cDEVS.scheduler.getImminent(time)
if not imminent:
# For real time simulation, when a model is interrupted
return self.transitioning
reschedule = set(imminent)
for model in imminent:
model.time_next = (model.time_next[0], model.time_next[1] + 1)
# Return value are the models to reschedule
# self.transitioning are the models that must transition
if len(imminent) > 1:
# Perform all selects
imminent.sort(key=lambda i: i.getModelFullName())
pending = imminent
level = 1
while len(pending) > 1:
# Take the model each time, as we need to make sure that the selectHierarchy is valid everywhere
model = pending[0]
# Make a set first to remove duplicates
colliding = list(set([m.select_hierarchy[level] for m in pending]))
chosen = model.select_hierarchy[level-1].select(
sorted(colliding, key=lambda i:i.getModelFullName()))
pending = [m for m in pending
if m.select_hierarchy[level] == chosen]
level += 1
child = pending[0]
else:
child = imminent[0]
# Recorrect the timeNext of the model that will transition
child.time_next = (child.time_next[0], child.time_next[1] - 1)
outbag = child.my_output = ClassicDEVSWrapper(child).outputFnc()
self.transitioning[child] = 1
for outport in outbag:
for inport, z in outport.routing_outline:
payload = outbag[outport]
if z is not None:
payload = [z(pickle.loads(pickle.dumps(m))) for m in payload]
aDEVS = inport.host_DEVS
aDEVS.my_input[inport] = list(payload)
self.transitioning[aDEVS] = 2
reschedule.add(aDEVS)
# We have now generated the transitioning variable, though we need some small magic to have it work for classic DEVS
self.transitioning = {ClassicDEVSWrapper(m): self.transitioning[m]
for m in self.transitioning}
return reschedule
[docs] def coupledOutputGeneration(self, time):
"""
CoupledDEVS function to generate the output, calls the atomicDEVS models where necessary. Output is routed too.
:param time: the time at which output should be generated
:returns: the models that should be rescheduled
"""
cDEVS = self.model
remotes = {}
for child in cDEVS.scheduler.getImminent(time):
outbag = self.atomicOutputGeneration(child, time)
for outport in outbag:
payload = outbag[outport]
if not hasattr(outport, "routing_outline"):
raise Exception(outport)
for inport, z in outport.routing_outline:
aDEVS = inport.host_DEVS
if z is not None:
payload = [z(pickle.loads(pickle.dumps(m)))
for m in payload]
if aDEVS.model_id in self.model.local_model_ids:
# This setdefault call is responsible for our non-linear runtime in several situations...
aDEVS.my_input.setdefault(inport, []).extend(payload)
self.transitioning[aDEVS] |= 2
else:
remotes.setdefault(aDEVS.model_id,
{}).setdefault(inport.port_id,
[]).extend(payload)
for destination in remotes:
self.send(destination, time, remotes[destination])
return self.transitioning
[docs] def coupledInit(self):
"""
CoupledDEVS function to initialise the model, calls all its _local_ children too.
"""
cDEVS = self.model
time_next = (float('inf'), 1)
# This part isn't fast, but it doesn't matter, since it just inits everything, optimizing here doesn't
# matter as it is only called once AND every element has to be initted.
# Only local models should receive this initialisation from us
for d in self.local:
self.atomicInit(d, (0.0, 0))
time_next = min(time_next, d.time_next)
# NOTE do not immediately assign to the timeNext, as this is used in the GVT algorithm to see whether a node has finished
cDEVS.time_next = time_next
self.model.setScheduler(self.model.scheduler_type)
self.server.flushQueuedMessages()