import numpy as np
import networkx as nx
import ubelt as ub # NOQA
from graphid import util
from graphid.core import state as const
from graphid.util import nx_utils as nxu
from graphid.core.state import (POSTV, NEGTV)
from graphid.core.state import (SAME, DIFF, NULL) # NOQA
[docs]
class Priority(object):
"""
Handles prioritization of edges for review.
Example:
>>> from graphid.core.mixin_priority import * # NOQA
>>> from graphid import demo
>>> infr = demo.demodata_infr(num_pccs=20)
"""
[docs]
def remaining_reviews(infr):
assert infr.queue is not None
return len(infr.queue)
[docs]
def _pop(infr, *args):
""" Wraps queue so ordering is determenistic """
(e, (p, _)) = infr.queue.pop(*args)
return (e, -p)
[docs]
def _push(infr, edge, priority):
""" Wraps queue so ordering is determenistic """
tiebreaker = edge
infr.assert_edge(edge)
# tiebreaker = (chaotic(chaotic(u) + chaotic(v)), u, v)
infr.queue[edge] = (-priority, tiebreaker)
[docs]
def _peek_many(infr, n):
""" Wraps queue so ordering is determenistic """
return [(k, -p) for (k, (p, _)) in infr.queue.peek_many(n)]
[docs]
def _remove_edge_priority(infr, edges):
if infr.queue is None:
return
edges_ = [edge for edge in edges if edge in infr.queue]
if len(edges_) > 0:
infr.print('removed priority from {} edges'.format(len(edges_)), 5)
infr.queue.delete_items(edges_)
[docs]
def _reinstate_edge_priority(infr, edges):
if infr.queue is None:
return
edges_ = [edge for edge in edges if edge not in infr.queue]
if len(edges_) > 0:
# TODO: use whatever the current metric is
metric = 'prob_match'
infr.print('reprioritize {} edges'.format(len(edges_)), 5)
priorities = infr.gen_edge_values(metric, edges_, default=1e-9)
for edge, priority in zip(edges_, priorities):
infr._push(edge, priority)
[docs]
def _increase_priority(infr, edges, amount=10):
if infr.queue is None:
return
infr.print('increase priority of {} edges'.format(len(edges)), 5)
metric = 'prob_match'
priorities = infr.gen_edge_values(metric, edges, default=1e-9)
for edge, base in zip(edges, priorities):
infr.push(edge, base + amount)
[docs]
def remove_internal_priority(infr, cc):
if infr.queue is not None:
infr._remove_edge_priority(nxu.edges_inside(infr.graph, cc))
# def remove_external_priority(infr, cc):
# if infr.queue is not None:
# infr._remove_edge_priority(nxu.edges_outgoing(infr.graph, cc))
# def remove_between_priority(infr, cc1, cc2):
# if infr.queue is not None:
# infr._remove_edge_priority(nxu.edges_cross(infr.graph, cc1, cc2))
# def reinstate_between_priority(infr, cc1, cc2):
# if infr.queue is not None:
# # Reinstate the appropriate edges into the queue
# edges = nxu.edges_cross(infr.unreviewed_graph, cc1, cc2)
# infr._reinstate_edge_priority(edges)
[docs]
def reinstate_internal_priority(infr, cc):
if infr.queue is not None:
# Reinstate the appropriate edges into the queue
edges = nxu.edges_inside(infr.unreviewed_graph, cc)
infr._reinstate_edge_priority(edges)
# def reinstate_external_priority(infr, cc):
# if infr.queue is not None:
# # Reinstate the appropriate edges into the queue
# edges = nxu.edges_outgoing(infr.unreviewed_graph, cc)
# infr._reinstate_edge_priority(edges)
[docs]
def prioritize(infr, metric=None, edges=None, scores=None,
force_inconsistent=True, reset=False):
"""
Adds edges to the priority queue.
Note that these edges must already exist in the `infr.unreviewed_graph`
as unreviewed edges. By default the `prob_match` edge attribute is used
to sort edges. If you have registered a verification algorithm, then
these scores are computed using `infr.ensure_priority_scores(edges)`.
However, you can have all this done for you by simply calling
`infr.add_candidate_edges(edges)` or `infr.refresh_candidate_edges()`.
Example:
>>> from graphid.core.mixin_priority import * # NOQA
>>> from graphid import demo
>>> infr = demo.demodata_infr(num_pccs=7, size=5)
>>> infr.ensure_cliques(meta_decision=SAME)
>>> # Add a negative edge inside a PCC
>>> ccs = list(infr.positive_components())
>>> edge1 = tuple(list(ccs[0])[0:2])
>>> edge2 = tuple(list(ccs[1])[0:2])
>>> infr.add_feedback(edge1, NEGTV)
>>> infr.add_feedback(edge2, NEGTV)
>>> num_new = infr.prioritize(reset=True)
>>> order = infr._peek_many(np.inf)
>>> scores = util.take_column(order, 1)
>>> assert scores[0] > 10
>>> assert len(scores) == num_new, 'should prioritize two hypotheis edges'
>>> unrev_edges = set(infr.unreviewed_graph.edges())
>>> err_edges = set(ub.flatten(infr.nid_to_errors.values()))
>>> edges = set(list(unrev_edges - err_edges)[0:2])
>>> edges.update(list(err_edges)[0:2])
>>> num_new = infr.prioritize(edges=edges, reset=True)
>>> order2 = infr._peek_many(np.inf)
>>> scores2 = np.array(util.take_column(order2, 1))
>>> assert np.all(scores2[0:2] > 10)
>>> assert np.all(scores2[2:] < 10)
"""
if reset or infr.queue is None:
infr.queue = util.PriorityQueue()
low = 1e-9
if metric is None:
metric = 'prob_match'
# If edges are not explicilty specified get unreviewed and error edges
# that are not redundant
if edges is None:
if scores is not None:
raise ValueError('must provide edges with scores')
unrev_edges = infr.unreviewed_graph.edges()
edges = set(infr.filter_edges_flagged_as_redun(unrev_edges))
infr.print('ensuring {} edge(s) get priority'.format(
len(edges)), 5)
if infr.params['inference.enabled'] and force_inconsistent:
# Ensure that maybe_error edges are always prioritized
maybe_error_edges = set(infr.maybe_error_edges())
extra_edges = set(maybe_error_edges).difference(set(edges))
extra_edges = list(extra_edges)
infr.print('ensuring {} inconsistent edge(s) get priority'.format(
len(extra_edges)), 5)
if scores is not None:
pgen = list(infr.gen_edge_values(metric, extra_edges, default=low))
extra_scores = np.array(pgen)
extra_scores[np.isnan(extra_scores)] = low
scores = util.aslist(scores) + util.aslist(extra_scores)
edges = util.aslist(edges) + extra_edges
# Ensure edges are in some arbitrary order
edges = list(edges)
# Ensure given scores do not have nan values
if scores is None:
pgen = infr.gen_edge_values(metric, edges, default=low)
priorities = np.array(list(pgen))
priorities[np.isnan(priorities)] = low
else:
priorities = np.asarray(scores)
if np.any(np.isnan(priorities)):
priorities[np.isnan(priorities)] = low
if infr.params['inference.enabled']:
# Increase priority of any flagged maybe_error edges
err_flags = [e in maybe_error_edges for e in edges]
priorities[err_flags] += 10
# Push new items into the priority queue
num_new = 0
for edge, priority in zip(edges, priorities):
if edge not in infr.queue:
num_new += 1
infr._push(edge, priority)
infr.print('added %d edges to the queue' % (num_new,), 1)
return num_new
[docs]
def push(infr, edge, priority=None):
"""
Push an edge back onto the queue
"""
if priority is None:
priority = 'prob_match'
if isinstance(priority, str):
prob_match = infr.get_edge_attr(edge, priority, default=1e-9)
priority = prob_match
# Use edge-nids to break ties for determenistic behavior
infr._push(edge, priority)
[docs]
def pop(infr):
"""
Main interface to the priority queue used by the algorithm loops.
Pops the highest priority edge from the queue.
"""
# The outer loop simulates recursive calls without using the stack
while True:
try:
edge, priority = infr._pop()
except IndexError:
raise StopIteration('no more to review!')
else:
if infr.params['redun.enabled']:
u, v = edge
nid1, nid2 = infr.node_labels(u, v)
pos_graph = infr.pos_graph
pos_graph[nid1]
if nid1 == nid2:
if nid1 not in infr.nid_to_errors:
# skip edges that increase local connectivity beyond
# redundancy thresholds.
k_pos = infr.params['redun.pos']
# Much faster to compute local connectivity on subgraph
cc = infr.pos_graph.component(nid1)
pos_subgraph = infr.pos_graph.subgraph(cc)
pos_conn = nx.connectivity.local_edge_connectivity(
pos_subgraph, u, v, cutoff=k_pos)
# Compute local connectivity
if pos_conn >= k_pos:
continue # Loop instead of recursion
# return infr.pop()
if infr.params['queue.conf.thresh'] is not None:
# Ignore reviews that would re-enforce a relationship that
# already has high confidence.
thresh_code = infr.params['queue.conf.thresh']
thresh = const.CONFIDENCE.CODE_TO_INT[thresh_code]
# FIXME: at the time of writing a hard coded priority of 10
# or more means that this is part of an inconsistency
if priority < 10:
u, v = edge
nid1, nid2 = infr.node_labels(u, v)
if nid1 == nid2:
if infr.confidently_connected(u, v, thresh):
infr.pop()
else:
if infr.confidently_separated(u, v, thresh):
infr.pop()
if getattr(infr, 'fix_mode_split', False):
# only checking edges within a name
nid1, nid2 = infr.pos_graph.node_labels(*edge)
if nid1 != nid2:
continue # Loop instead of recursion
# return infr.pop()
if getattr(infr, 'fix_mode_merge', False):
# only checking edges within a name
nid1, nid2 = infr.pos_graph.node_labels(*edge)
if nid1 == nid2:
continue # Loop instead of recursive (infr.pop())
if getattr(infr, 'fix_mode_predict', False):
# No longer needed.
pred = infr.get_edge_data(edge).get('pred', None)
# only report cases where the prediction differs
# FIXME: at the time of writing a hard coded priority of 10
# or more means that this is part of an inconsistency
if priority < 10:
nid1, nid2 = infr.node_labels(*edge)
if nid1 == nid2:
u, v = edge
# Don't re-review confident CCs
thresh = const.CONFIDENCE.CODE_TO_INT['pretty_sure']
if infr.confidently_connected(u, v, thresh):
continue # Loop instead of recursive (infr.pop())
if pred == POSTV and nid1 == nid2:
continue # Loop instead of recursive (infr.pop())
if pred == NEGTV and nid1 != nid2:
continue # Loop instead of recursive (infr.pop())
else:
print('in error recover mode')
infr.assert_edge(edge)
return edge, priority
[docs]
def peek(infr):
return infr.peek_many(n=1)[0]
[docs]
def peek_many(infr, n):
"""
Peeks at the top n edges in the queue.
Example:
>>> # ENABLE_DOCTEST
>>> from graphid.core.mixin_priority import * # NOQA
>>> from graphid import demo
>>> infr = demo.demodata_infr(num_pccs=7, size=5)
>>> infr.refresh_candidate_edges()
>>> infr.peek_many(50)
"""
# Do pops that may invalidate pos redun edges internal to PCCs
items = []
count = 0
# Pop the top n edges off the queue
while len(infr.queue) > 0 and count < n:
items.append(infr.pop())
count += 1
# Push them back because we are just peeking
# (although we may have invalidated things based on local connectivity)
for edge, priority in items:
infr.push(edge, priority)
return items
[docs]
def confidently_connected(infr, u, v, thresh=2):
"""
Checks if u and v are conneted by edges above a confidence threshold
"""
def satisfied(G, child, edge):
decision = infr.edge_decision(edge)
if decision != POSTV:
return False
data = G.get_edge_data(*edge)
conf = data.get('confidence', 'unspecified')
conf_int = const.CONFIDENCE.CODE_TO_INT[conf]
conf_int = 0 if conf_int is None else conf_int
return conf_int >= thresh
for node in util.bfs_conditional(infr.graph, u, yield_if=satisfied,
continue_if=satisfied):
if node == v:
return True
return False
[docs]
def confidently_separated(infr, u, v, thresh=2):
"""
Checks if u and v are conneted by edges above a confidence threshold
Example:
>>> from graphid.core.mixin_priority import * # NOQA
>>> from graphid import demo
>>> infr = demo.demodata_infr(ccs=[(1, 2), (3, 4), (5, 6), (7, 8)])
>>> infr.add_feedback((1, 5), NEGTV)
>>> infr.add_feedback((5, 8), NEGTV)
>>> infr.add_feedback((6, 3), NEGTV)
>>> u, v = (1, 4)
>>> thresh = 0
>>> assert not infr.confidently_separated(u, v, thresh)
>>> infr.add_feedback((2, 3), NEGTV)
>>> assert not infr.confidently_separated(u, v, thresh)
"""
def can_cross(G, edge, n_negs):
"""
DFS state condition
Args:
edge (tuple): the edge we are trying to cross
n_negs (int): the number of negative edges crossed so far
Returns:
flag, new_state -
flag (bool): True if the edge can be crossed
new_state: new state for future decisions in this path.
"""
decision = infr.edge_decision(edge)
# only cross positive or negative edges
if decision in {POSTV, NEGTV}:
# only cross a negative edge once
willcross = (decision == NEGTV)
if willcross and n_negs == 0:
data = G.get_edge_data(*edge)
# only cross edges above a threshold
conf = data.get('confidence', 'unspecified')
conf_int = const.CONFIDENCE.CODE_TO_INT[conf]
conf_int = 0 if conf_int is None else conf_int
flag = conf_int >= thresh
num = n_negs + willcross
return flag, num
return False, n_negs
# need to do DFS check for this. Make DFS only allowed to
# cross a negative edge once.
# def dfs_cond_rec(G, parent, state, visited=None):
# if visited is None:
# visited = set()
# visited.add(parent)
# for child in G.neighbors(parent):
# if child not in visited:
# edge = (parent, child)
# flag, new_state = can_cross(G, edge, state)
# if flag:
# yield child
# for _ in dfs_cond_rec(G, child, new_state, visited):
# yield _
# need to do DFS check for this. Make DFS only allowed to
# cross a negative edge once.
def dfs_cond_stack(G, source, state):
# stack based version
visited = {source}
stack = [(source, iter(G[source]), state)]
while stack:
parent, children, state = stack[-1]
try:
child = next(children)
if child not in visited:
edge = (parent, child)
flag, new_state = can_cross(G, edge, state)
if flag:
yield child
visited.add(child)
stack.append((child, iter(G[child]), new_state))
except StopIteration:
stack.pop()
for node in dfs_cond_stack(infr.graph, u, 0):
if node == v:
return True
return False
[docs]
def generate_reviews(infr, pos_redun=None, neg_redun=None,
data=False):
"""
Dynamic generator that yeilds high priority reviews
"""
if pos_redun is not None:
infr.params['redun.pos'] = pos_redun
if neg_redun is not None:
infr.params['redun.neg'] = neg_redun
infr.prioritize()
return infr._generate_reviews(data=data)
[docs]
def _generate_reviews(infr, data=False):
if data:
while True:
try:
edge, priority = infr.pop()
except StopIteration:
return
else:
yield edge, priority
else:
while True:
try:
edge, priority = infr.pop()
except StopIteration:
return
else:
yield edge
if __name__ == '__main__':
"""
CommandLine:
python ~/code/graphid/graphid.core/mixin_priority.py all
"""
import xdoctest
xdoctest.doctest_module(__file__)