Source code for pypdevs.messageScheduler
# Copyright 2014 Modelling, Simulation and Design Lab (MSDL) at
# McGill University and the University of Antwerp (http://msdl.cs.mcgill.ca/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Scheduler for external input messages
"""
from heapq import heappop, heappush, heapify
from pypdevs.logger import *
[docs]class MessageScheduler(object):
"""
An efficient implementation of a message scheduler for the inputQueue,
it supports very fast invalidations (O(1)) and fast retrievals of first
element (O(log(n) in average case)
"""
[docs] def __init__(self):
"""
Constructor.
"""
# List of processed messages
self.processed = []
# Heap of the to be processed messages
self.heap = []
# All invalidated messages, simply adding a message's UUID will invalidate
# the message. The counter that it keeps is for multiple invalidations
self.invalids = set()
def __getstate__(self):
"""
For pickling
"""
retdict = {}
unpicklable = frozenset(["instancemethod", "lock", "_Event"])
for i in dir(self):
if getattr(self, i).__class__.__name__ in unpicklable:
# unpicklable, so don't copy it
continue
elif str(i).startswith("__"):
continue
else:
retdict[str(i)] = getattr(self, i)
return retdict
[docs] def insert(self, extraction, model_list):
"""
Insert several messages that were created elsewhere and merge them in.
:param extraction: the output of the extract method on the other message scheduler
:param model_list: models that are inserted and for which extraction happened
"""
msgs, invalids = extraction
# A simple update suffices, as these messages have a unique ID
self.invalids |= invalids
for msg in msgs:
moddata = {}
for entry in msg.content:
inport = model_list[entry[0]].ports[entry[1]]
moddata[inport] = msg.content[entry]
# Overwrite the original message
msg.content = moddata
self.schedule(msg)
[docs] def schedule(self, msg):
"""
Schedule a message for processing
:param msg: the message to schedule
"""
try:
self.invalids.remove(msg.uuid)
except KeyError:
heappush(self.heap, msg)
[docs] def massUnschedule(self, uuids):
"""
Unschedule several messages, this way it will no longer be processed.
:param uuids: iterable of UUIDs that need to be removed
"""
self.invalids = self.invalids.union(uuids)
[docs] def readFirst(self):
"""
Returns the first (valid) message. Not necessarily O(1), as it could be
the case that a lot of invalid messages are still to be deleted.
"""
self.cleanFirst()
return self.heap[0]
[docs] def removeFirst(self):
"""
Notify that the first (valid) message is processed.
:returns: msg -- the next first message that is valid
"""
self.cleanFirst()
self.processed.append(heappop(self.heap))
[docs] def purgeFirst(self):
"""
Notify that the first (valid) message must be removed
:returns: msg -- the next first message that is valid
"""
self.cleanFirst()
heappop(self.heap)
[docs] def cleanFirst(self):
"""
Clean all invalid messages at the front of the list. Method MUST be called
before any accesses should happen to the first element, otherwise this
first element might be a message that was just invalidated
"""
try:
while 1:
self.invalids.remove(self.heap[0].uuid)
# If it got removed, it means that the message was indeed invalidated, so we can simply pop it
heappop(self.heap)
except (KeyError, IndexError):
# Seems that the UUID was not invalidated, so we are done
# OR
# Reached the end of the heap and all were invalid
pass
[docs] def revert(self, time):
"""
Revert the inputqueue to the specified time, will also clean up the list of processed elements
:param time: time to which revertion should happen
"""
try:
i = 0
while self.processed[i].timestamp < time:
i += 1
for msg in self.processed[i:]:
# All processed messages were valid, so no need for the more expensive check
# Should an invalidation for a processed message have just arrived, it will
# be processed AFTER this revertion, thus using the normal unschedule() function
heappush(self.heap, msg)
self.processed = self.processed[:i]
except IndexError:
# All elements are smaller
pass
[docs] def cleanup(self, time):
"""
Clean up the processed list, also removes all invalid elements
:param time: time up to which cleanups are allowed to happen
"""
# We can be absolutely certain that ONLY elements from the processed list should be deleted
self.processed = [i for i in self.processed if i.timestamp >= time]
# Clean up the dictionary too, as otherwise it will start to contain a massive amount of entries, consuming both memory and increasing the amortized worst case