|
@@ -10,19 +10,26 @@ Item = TypeVar('Item')
|
|
|
class EventQueue(Generic[Timestamp, Item]):
|
|
|
__slots__ = ["queue", "counters", "removed"]
|
|
|
|
|
|
+ # We don't define our own event queue item class here
|
|
|
+ # with __lt__ because tuples are faster to compare.
|
|
|
+ # Tuples are however immutable, so we "wrap" the 'removed'
|
|
|
+ # flag in an object:
|
|
|
+ class RemovedWrapper:
|
|
|
+ __slots__ = ["removed"]
|
|
|
+ def __init__(self):
|
|
|
+ self.removed = False
|
|
|
+
|
|
|
def __init__(self):
|
|
|
- self.queue: List[Tuple[Timestamp, int, Item]] = []
|
|
|
+ self.queue: List[Tuple[Timestamp, int, RemovedWrapper, Item]] = []
|
|
|
self.counters = {} # mapping from timestamp to number of items at timestamp
|
|
|
- self.removed: Set[Item] = set()
|
|
|
|
|
|
def __str__(self):
|
|
|
- return str(sorted([tup for tup in self.queue if tup[2] not in self.removed]))
|
|
|
+ return str(sorted([tup for tup in self.queue if not tup[2].removed]))
|
|
|
|
|
|
def earliest_timestamp(self) -> Optional[Timestamp]:
|
|
|
with timer.Context("event_queue"):
|
|
|
- while self.queue and (self.queue[0][2] in self.removed):
|
|
|
- tup = heappop(self.queue)
|
|
|
- self.removed.remove(tup[2])
|
|
|
+ while self.queue and self.queue[0][2].removed:
|
|
|
+ heappop(self.queue)
|
|
|
try:
|
|
|
return self.queue[0][0]
|
|
|
except IndexError:
|
|
@@ -31,28 +38,24 @@ class EventQueue(Generic[Timestamp, Item]):
|
|
|
def add(self, timestamp: Timestamp, item: Item):
|
|
|
# print("add", item)
|
|
|
with timer.Context("event_queue"):
|
|
|
- self.counters[timestamp] = self.counters.setdefault(timestamp, 0) + 1
|
|
|
- def_event = (timestamp, self.counters[timestamp], item)
|
|
|
+ n = self.counters[timestamp] = self.counters.setdefault(timestamp, 0) + 1
|
|
|
+ def_event = (timestamp, n, EventQueue.RemovedWrapper(), item)
|
|
|
heappush(self.queue, def_event)
|
|
|
+ return def_event
|
|
|
|
|
|
- def remove(self, item: Item):
|
|
|
+ def remove(self, item: Tuple[Timestamp, int, RemovedWrapper, Item]):
|
|
|
# print("remove", item)
|
|
|
with timer.Context("event_queue"):
|
|
|
- self.removed.add(item)
|
|
|
- if len(self.removed) > 100:
|
|
|
- self.queue = [x for x in self.queue if x[2] not in self.removed]
|
|
|
- # print("heapify")
|
|
|
- heapify(self.queue)
|
|
|
- self.removed.clear()
|
|
|
+ item[2].removed = True
|
|
|
|
|
|
# Raises exception if called on empty queue
|
|
|
def pop(self) -> Tuple[Timestamp, Item]:
|
|
|
with timer.Context("event_queue"):
|
|
|
while 1:
|
|
|
- timestamp, n, item = heappop(self.queue)
|
|
|
+ timestamp, n, removed, item = heappop(self.queue)
|
|
|
if self.counters[timestamp] == n:
|
|
|
del self.counters[timestamp]
|
|
|
- if item not in self.removed:
|
|
|
+ if not removed.removed:
|
|
|
return (timestamp, item)
|
|
|
|
|
|
def is_due(self, timestamp: Optional[Timestamp]) -> bool:
|