event_queue.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. from heapq import heappush, heappop
  2. from typing import *
  3. from collections import Counter
  4. from sccd.util import timer
  5. Timestamp = TypeVar('Timestamp')
  6. Item = TypeVar('Item')
  7. class EventQueue(Generic[Timestamp, Item]):
  8. __slots__ = ["queue", "counters", "removed"]
  9. # We don't define our own event queue item class here
  10. # with __lt__ because tuples are faster to compare.
  11. # Tuples are however immutable, so we "wrap" the 'removed'
  12. # flag in an object:
  13. class RemovedWrapper:
  14. __slots__ = ["removed"]
  15. def __init__(self):
  16. self.removed = False
  17. def __init__(self):
  18. self.queue: List[Tuple[Timestamp, int, RemovedWrapper, Item]] = []
  19. self.counters = Counter() # mapping from timestamp to number of items at timestamp
  20. def __str__(self):
  21. return str(sorted([tup for tup in self.queue if not tup[2].removed]))
  22. def earliest_timestamp(self) -> Optional[Timestamp]:
  23. with timer.Context("event_queue"):
  24. while self.queue and self.queue[0][2].removed:
  25. heappop(self.queue)
  26. try:
  27. return self.queue[0][0]
  28. except IndexError:
  29. return None
  30. def add(self, timestamp: Timestamp, item: Item):
  31. # print("add", item)
  32. with timer.Context("event_queue"):
  33. def_event = (timestamp, self.counters[timestamp], EventQueue.RemovedWrapper(), item)
  34. self.counters[timestamp] += 1
  35. heappush(self.queue, def_event)
  36. return def_event
  37. def remove(self, item: Tuple[Timestamp, int, RemovedWrapper, Item]):
  38. # print("remove", item)
  39. with timer.Context("event_queue"):
  40. item[2].removed = True
  41. # Raises exception if called on empty queue
  42. def pop(self) -> Tuple[Timestamp, Item]:
  43. with timer.Context("event_queue"):
  44. while 1:
  45. timestamp, n, removed, item = heappop(self.queue)
  46. if self.counters[timestamp] == n:
  47. del self.counters[timestamp]
  48. if not removed.removed:
  49. return (timestamp, item)
  50. def is_due(self, timestamp: Optional[Timestamp]) -> bool:
  51. earliest = self.earliest_timestamp()
  52. # print("is_due", earliest, timestamp, earliest is not None and (timestamp is None or earliest <= timestamp))
  53. return earliest is not None and (timestamp is None or earliest <= timestamp)
  54. # Safe to call on empty queue
  55. # Safe to call other methods on the queue while the returned generator exists
  56. def due(self, timestamp: Optional[Timestamp]) -> Generator[Tuple[Timestamp, Item], None, None]:
  57. while self.is_due(timestamp):
  58. yield self.pop()