Source code for graphid.util.priority_queue

import ubelt as ub
import heapq


[docs] def _heappush_max(heap, item): """ why is this not in heapq """ heap.append(item) heapq._siftdown_max(heap, 0, len(heap) - 1)
[docs] class PriorityQueue(ub.NiceRepr): """ abstracted priority queue for our needs Combines properties of dicts and heaps Uses a heap for fast minimum/maximum value search Uses a dict for fast read only operations References: http://code.activestate.com/recipes/522995-priority-dict-a-priority-queue-with-updatable-prio/ https://stackoverflow.com/questions/33024215/built-in-max-heap-api-in-python Example: >>> items = dict(a=42, b=29, c=40, d=95, e=10) >>> self = PriorityQueue(items) >>> print(self) >>> assert len(self) == 5 >>> print(self.pop()) >>> assert len(self) == 4 >>> print(self.pop()) >>> assert len(self) == 3 >>> print(self.pop()) >>> print(self.pop()) >>> print(self.pop()) >>> assert len(self) == 0 Example: >>> items = dict(a=(1.0, (2, 3)), b=(1.0, (1, 2)), c=(.9, (3, 2))) >>> self = PriorityQueue(items) Ignore: # TODO: can also use sortedcontainers to maintain priority queue import sortedcontainers queue = sortedcontainers.SortedListWithKey(items, key=lambda x: x[1]) queue.add(('a', 1)) """ def __init__(self, items=None, ascending=True): # Use a heap for the priority queue aspect self._heap = [] # Use a dict for very quick read only operations self._dict = {} if ascending: self._heapify = heapq.heapify self._heappush = heapq.heappush self._heappop = heapq.heappop else: self._heapify = heapq._heapify_max self._heappush = _heappush_max self._heappop = heapq._heappop_max if items is not None: self.update(items)
[docs] def _rebuild(self): # Worst Case O(N) self._heap = [(v, k) for k, v in self._dict.items()] self._heapify(self._heap)
def __len__(self): return len(self._dict) def __eq__(self, other): return self._dict == other._dict def __nice__(self): return 'size=%r' % (len(self),) def __contains__(self, key): return key in self._dict def __getitem__(self, key): # Worse Case O(1) return self._dict[key]
[docs] def get(self, key, default=None): return self._dict.get(key, default)
def __setitem__(self, key, val): # Ammortized O(1) # assert not np.isnan(val), 'no nan in PQ: {}, {}'.format(key, val) self._dict[key] = val if len(self._heap) > 2 * len(self._dict): # When the heap grows larger than 2 * len(self), we rebuild it from # scratch to avoid wasting too much memory. self._rebuild() else: # Simply append the new value self._heappush(self._heap, (val, key))
[docs] def clear(self): self._heap.clear() self._dict.clear()
def __delitem__(self, key): del self._dict[key]
[docs] def update(self, items): if isinstance(items, dict): items = items.items() items = list(items) if items: if len(items) > len(self._dict) / 2: self._dict.update(items) self._rebuild() else: for k, v in items: self[k] = v
[docs] def delete_items(self, key_list): for key in key_list: try: del self[key] except KeyError: pass
[docs] def peek(self): """ Peek at the next item in the queue """ # Ammortized O(1) _heap = self._heap _dict = self._dict val, key = _heap[0] # Remove items marked for lazy deletion as they are encountered while key not in _dict or _dict[key] != val: self._heappop(_heap) val, key = _heap[0] return key, val
[docs] def peek_many(self, n): """ Actually this can be quite inefficient Example: >>> from graphid import util >>> items = list(zip(range(256), range(256))) >>> n = 32 >>> util.shuffle(items) >>> self = PriorityQueue(items, ascending=False) >>> self.peek_many(56) """ if n == 0: return [] elif n == 1: return [self.peek()] else: items = list(self.pop_many(n)) self.update(items) return items
[docs] def pop_many(self, n): count = 0 while len(self._dict) > 0 and count < n: yield self.pop() count += 1
[docs] def pop(self, key=ub.NoParam, default=ub.NoParam): """ Pop the next item off the queue """ # Dictionary pop if key is specified if key is not ub.NoParam: if default is ub.NoParam: return (key, self._dict.pop(key)) else: return (key, self._dict.pop(key, default)) # Otherwise do a heap pop try: # Ammortized O(1) _heap = self._heap _dict = self._dict val, key = self._heappop(_heap) # Remove items marked for lazy deletion as they are encountered while key not in _dict or _dict[key] != val: val, key = self._heappop(_heap) except IndexError: if len(_heap) == 0: raise IndexError('queue is empty') else: raise del _dict[key] return key, val