Source code for graphid.core.mixin_helpers

from __future__ import absolute_import, division, print_function, unicode_literals
import itertools as it
import networkx as nx
import operator
import numpy as np
import ubelt as ub
import pandas as pd
from graphid import util
from graphid.core import state as const
from graphid.core.state import POSTV, NEGTV, INCMP, UNREV, UNKWN
from graphid.core.state import SAME, DIFF, NULL  # NOQA
from graphid.util import nx_utils as nxu
from graphid.util.nx_utils import e_


[docs] class AttrAccess(object): """ Contains non-core helper functions """
[docs] def gen_node_attrs(infr, key, nodes=None, default=ub.NoParam): return util.nx_gen_node_attrs( infr.graph, key, nodes=nodes, default=default)
[docs] def gen_edge_attrs(infr, key, edges=None, default=ub.NoParam, on_missing=None): """ maybe change to gen edge items """ return util.nx_gen_edge_attrs( infr.graph, key, edges=edges, default=default, on_missing=on_missing)
[docs] def gen_node_values(infr, key, nodes, default=ub.NoParam): return util.nx_gen_node_values( infr.graph, key, nodes, default=default)
[docs] def gen_edge_values(infr, key, edges=None, default=ub.NoParam, on_missing='error', on_keyerr='default'): return util.nx_gen_edge_values( infr.graph, key, edges, default=default, on_missing=on_missing, on_keyerr=on_keyerr)
[docs] def get_node_attrs(infr, key, nodes=None, default=ub.NoParam): """ Networkx node getter helper """ return dict(infr.gen_node_attrs(key, nodes=nodes, default=default))
[docs] def get_edge_attrs(infr, key, edges=None, default=ub.NoParam, on_missing=None): """ Networkx edge getter helper """ return dict(infr.gen_edge_attrs(key, edges=edges, default=default, on_missing=on_missing))
[docs] def _get_edges_where(infr, key, op, val, edges=None, default=ub.NoParam, on_missing=None): edge_to_attr = infr.gen_edge_attrs(key, edges=edges, default=default, on_missing=on_missing) return (e for e, v in edge_to_attr if op(v, val))
[docs] def get_edges_where_eq(infr, key, val, edges=None, default=ub.NoParam, on_missing=None): return infr._get_edges_where(key, operator.eq, val, edges=edges, default=default, on_missing=on_missing)
[docs] def get_edges_where_ne(infr, key, val, edges=None, default=ub.NoParam, on_missing=None): return infr._get_edges_where(key, operator.ne, val, edges=edges, default=default, on_missing=on_missing)
[docs] def set_node_attrs(infr, key, node_to_prop): """ Networkx node setter helper """ return nx.set_node_attributes(infr.graph, name=key, values=node_to_prop)
[docs] def set_edge_attrs(infr, key, edge_to_prop): """ Networkx edge setter helper """ return nx.set_edge_attributes(infr.graph, name=key, values=edge_to_prop)
[docs] def get_edge_attr(infr, edge, key, default=ub.NoParam, on_missing='error'): """ single edge getter helper """ return infr.get_edge_attrs(key, [edge], default=default, on_missing=on_missing)[edge]
[docs] def set_edge_attr(infr, edge, attr): """ single edge setter helper """ for key, value in attr.items(): infr.set_edge_attrs(key, {edge: value})
[docs] def get_annot_attrs(infr, key, aids): """ Wrapper around get_node_attrs specific to annotation nodes """ attr_list = list(infr.get_node_attrs(key, aids).values()) return attr_list
[docs] def edges(infr, data=False): if data: return ((e_(u, v), d) for u, v, d in infr.graph.edges(data=True)) else: return (e_(u, v) for u, v in infr.graph.edges())
[docs] def has_edge(infr, edge): return infr.graph.has_edge(*edge)
# redge = edge[::-1] # flag = infr.graph.has_edge(*edge) or infr.graph.has_edge(*redge) # return flag
[docs] def get_edge_data(infr, edge): return infr.graph.get_edge_data(*edge)
[docs] def get_nonvisual_edge_data(infr, edge, on_missing='filter'): data = infr.get_edge_data(edge) if data is not None: data = util.delete_dict_keys(data.copy(), infr.visual_edge_attrs) else: if on_missing == 'filter': data = None elif on_missing == 'default': data = {} elif on_missing == 'error': raise KeyError('graph does not have edge %r ' % (edge,)) return data
[docs] def get_edge_dataframe(infr, edges=None, all=False): if edges is None: edges = infr.edges() edge_datas = {e: infr.get_nonvisual_edge_data(e) for e in edges} edge_datas = {e: {k: None for k in infr.feedback_data_keys} if d is None else d for e, d in edge_datas.items()} edge_df = pd.DataFrame.from_dict(edge_datas, orient='index') part = ['evidence_decision', 'meta_decision', 'tags', 'user_id'] neworder = util.partial_order(edge_df.columns, part) if hasattr(edge_df, 'reindex_axis'): edge_df = edge_df.reindex_axis(neworder, axis=1) else: edge_df = edge_df.reindex(neworder, axis=1) if not all: todrop = ['review_id', 'timestamp', 'timestamp_s1', 'timestamp_c2', 'timestamp_c1'] todrop = [c for c in todrop if c in edge_df.columns] edge_df = edge_df.drop(todrop, axis=1) # pd.DataFrame.from_dict(edge_datas, orient='list') return edge_df
[docs] def get_edge_df_text(infr, edges=None, highlight=True): df = infr.get_edge_dataframe(edges) df_str = df.to_string() if highlight: df_str = util.highlight_regex(df_str, util.regex_word(SAME), color='blue') df_str = util.highlight_regex(df_str, util.regex_word(POSTV), color='blue') df_str = util.highlight_regex(df_str, util.regex_word(DIFF), color='red') df_str = util.highlight_regex(df_str, util.regex_word(NEGTV), color='red') df_str = util.highlight_regex(df_str, util.regex_word(INCMP), color='yellow') return df_str
[docs] class Convenience(object):
[docs] @staticmethod def e_(u, v): return e_(u, v)
@property def pos_graph(infr): return infr.review_graphs[POSTV] @property def neg_graph(infr): return infr.review_graphs[NEGTV] @property def incomp_graph(infr): return infr.review_graphs[INCMP] @property def unreviewed_graph(infr): return infr.review_graphs[UNREV] @property def unknown_graph(infr): return infr.review_graphs[UNKWN]
[docs] def print_graph_info(infr): print(ub.urepr(util.graph_info(infr.simplify_graph())))
[docs] def print_graph_connections(infr, label='orig_name_label'): """ label = 'orig_name_label' """ node_to_label = infr.get_node_attrs(label) label_to_nodes = ub.group_items(node_to_label.keys(), node_to_label.values()) print('CC info') for name, cc in label_to_nodes.items(): print('\nname = %r' % (name,)) edges = list(nxu.edges_between(infr.graph, cc)) print(infr.get_edge_df_text(edges)) print('CC pair info') for (n1, cc1), (n2, cc2) in it.combinations(label_to_nodes.items(), 2): if n1 == n2: continue print('\nname_pair = {}-vs-{}'.format(n1, n2)) edges = list(nxu.edges_between(infr.graph, cc1, cc2)) print(infr.get_edge_df_text(edges))
[docs] def print_within_connection_info(infr, edge=None, cc=None, aid=None, nid=None): if edge is not None: aid, aid2 = edge if nid is not None: cc = infr.pos_graph._ccs[nid] if aid is not None: cc = infr.pos_graph.connected_to(aid) # subgraph = infr.graph.subgraph(cc) # list(nxu.complement_edges(subgraph)) edges = list(nxu.edges_between(infr.graph, cc)) print(infr.get_edge_df_text(edges))
[docs] def pair_connection_info(infr, aid1, aid2): """ Helps debugging when ibs.nids has info that annotmatch/staging do not Note: the relevant ibs parts were removed. Perhaps this is not useful now or should be moved to the ibeis plugin? Example: >>> from graphid import demo >>> infr = demo.demodata_infr(num_pccs=3, size=4) >>> aid1, aid2 = 1, 2 >>> print(infr.pair_connection_info(aid1, aid2)) """ nid1, nid2 = infr.pos_graph.node_labels(aid1, aid2) cc1 = infr.pos_graph.connected_to(aid1) cc2 = infr.pos_graph.connected_to(aid2) # First check directly relationships def get_aug_df(edges): df = infr.get_edge_dataframe(edges) if len(df): df.index.names = ('aid1', 'aid2') nids = np.array([ infr.pos_graph.node_labels(u, v) for u, v in list(df.index)]) df = df.assign(nid1=nids.T[0], nid2=nids.T[1]) part = ['nid1', 'nid2', 'evidence_decision', 'tags', 'user_id'] neworder = util.partial_order(df.columns, part) if hasattr(df, 'reindex_axis'): df = df.reindex_axis(neworder, axis=1) else: df = df.reindex(neworder, axis=1) todrop = [c for c in ['review_id', 'timestamp'] if c in df.columns] df = df.drop(todrop, axis=1) return df def print_df(df, lbl): df_str = df.to_string() df_str = util.highlight_regex(df_str, util.regex_word(str(aid1)), color='blue') df_str = util.highlight_regex(df_str, util.regex_word(str(aid2)), color='red') if nid1 not in {aid1, aid2}: df_str = util.highlight_regex(df_str, util.regex_word(str(nid1)), color='darkblue') if nid2 not in {aid1, aid2}: df_str = util.highlight_regex(df_str, util.regex_word(str(nid2)), color='darkred') print('\n\n=====') print(lbl) print('=====') print(df_str) print('================') print('Pair Connection Info') print('================') # ibs = infr.ibs # nid1_, nid2_ = ibs.get_annot_nids([aid1, aid2]) print('AIDS aid1, aid2 = %r, %r' % (aid1, aid2)) # print('INFR NAMES: nid1, nid2 = %r, %r' % (nid1, nid2)) if nid1 == nid2: print('INFR cc = %r' % (sorted(cc1),)) else: print('INFR cc1 = %r' % (sorted(cc1),)) print('INFR cc2 = %r' % (sorted(cc2),)) # if (nid1 == nid2) != (nid1_ == nid2_): # util.cprint('DISAGREEMENT IN GRAPH AND DB', 'red') # else: # util.cprint('GRAPH AND DB AGREE', 'green') # print('IBS NAMES: nid1, nid2 = %r, %r' % (nid1_, nid2_)) # if nid1_ == nid2_: # print('IBS CC: %r' % (sorted(ibs.get_name_aids(nid1_)),)) # else: # print('IBS CC1: %r' % (sorted(ibs.get_name_aids(nid1_)),)) # print('IBS CC2: %r' % (sorted(ibs.get_name_aids(nid2_)),)) # Does this exist in annotmatch? # in_am = ibs.get_annotmatch_rowid_from_undirected_superkey([aid1], [aid2]) # print('in_am = %r' % (in_am,)) # Does this exist in staging? # staging_rowids = ibs.get_review_rowids_from_edges([(aid1, aid2)])[0] # print('staging_rowids = %r' % (staging_rowids,)) # if False: # # Make absolutely sure # stagedf = ibs.staging.get_table_as_pandas('reviews') # aid_cols = ['annot_1_rowid', 'annot_2_rowid'] # has_aid1 = (stagedf[aid_cols] == aid1).any(axis=1) # from_aid1 = stagedf[has_aid1] # conn_aid2 = (from_aid1[aid_cols] == aid2).any(axis=1) # print('# connections = %r' % (conn_aid2.sum(),)) # Next check indirect relationships graph = infr.graph if cc1 != cc2: edge_df1 = get_aug_df(nxu.edges_between(graph, cc1)) edge_df2 = get_aug_df(nxu.edges_between(graph, cc2)) print_df(edge_df1, 'Inside1') print_df(edge_df2, 'Inside1') out_df1 = get_aug_df(nxu.edges_outgoing(graph, cc1)) print_df(out_df1, 'Outgoing1') out_df2 = get_aug_df(nxu.edges_outgoing(graph, cc2)) print_df(out_df2, 'Outgoing2') else: subgraph = infr.pos_graph.subgraph(cc1) print('Shortest path between endpoints') print(nx.shortest_path(subgraph, aid1, aid2)) edge_df3 = get_aug_df(nxu.edges_between(graph, cc1, cc2)) print_df(edge_df3, 'Between')
[docs] def node_tag_hist(infr): tags_list = infr.ibs.get_annot_case_tags(infr.aids) tag_hist = util.tag_hist(tags_list) return tag_hist
[docs] def edge_tag_hist(infr): tags_list = list(infr.gen_edge_values('tags', None)) tag_hist = util.tag_hist(tags_list) return tag_hist
[docs] def match_state_df(infr, index): """ Returns the current matching state of a list of edges. PERHAPS WE SHOULD DEPRICATE THIS FUNCTION? Note: This does NOT use the IBEIS database state, where as the original version of this function did. CommandLine: python -m graphid.core.mixin_helpers Convenience.match_state_df Example: >>> from graphid import demo >>> infr = demo.demodata_infr(num_pccs=2, p_incomp=.8, size=4) >>> index = list(infr.edges()) >>> print(infr.match_state_df(index)) NEGTV POSTV INCMP aid1 aid2 1 3 False False True 4 False False True 2 False True False 2 3 False False True 4 False False True 3 4 False True False 5 False False True 5 8 False False True 7 False False True 6 False False True 6 8 False False True 7 False False True 7 8 False False True """ index = util.ensure_multi_index(index, ('aid1', 'aid2')) aid_pairs = np.asarray(index.tolist()) aid_pairs = aid_pairs.reshape(-1, 2) # is_same = np.array( # [infr.pos_graph.are_nodes_connected(u, v) for u, v in aid_pairs]) u_nids = np.array(list(infr.gen_node_values('name_label', [ u for u, v in aid_pairs]))) v_nids = np.array(list(infr.gen_node_values('name_label', [ v for u, v in aid_pairs]))) is_same = np.equal(u_nids, v_nids) edge_states = infr.gen_edge_values('evidence_decision', edges=aid_pairs, default=UNREV, on_missing='default') is_comp = np.array([s == INCMP for s in edge_states]) if hasattr(pd.DataFrame, 'from_items'): match_state_df = pd.DataFrame.from_items([ (NEGTV, ~is_same & is_comp), (POSTV, is_same & is_comp), (INCMP, ~is_comp), ]) else: match_state_df = pd.DataFrame(ub.odict([ (NEGTV, ~is_same & is_comp), (POSTV, is_same & is_comp), (INCMP, ~is_comp), ])) match_state_df.index = index return match_state_df
[docs] class DummyEdges(object):
[docs] def ensure_mst(infr, label='name_label', meta_decision=SAME): """ Ensures that all names are names are connected. Args: label (str): node attribute to use as the group id to form the mst. meta_decision (str): if specified adds clique edges as feedback items with this decision. Otherwise the edges are only explicitly added to the graph. This makes feedback items with user_id=algo:mst and with a confidence of guessing. Example: >>> from graphid import demo >>> infr = demo.demodata_infr(num_pccs=3, size=4) >>> assert infr.status()['nCCs'] == 3 >>> infr.clear_edges() >>> assert infr.status()['nCCs'] == 12 >>> infr.ensure_mst() >>> assert infr.status()['nCCs'] == 3 """ infr.print('ensure_mst', 1) new_edges = infr.find_mst_edges(label=label) # Add new MST edges to original graph infr.print('adding %d MST edges' % (len(new_edges)), 2) infr.add_feedback_from(new_edges, meta_decision=SAME, confidence=const.CONFIDENCE.CODE.GUESSING, user_id='algo:mst', verbose=False)
[docs] def ensure_cliques(infr, label='name_label', meta_decision=None): """ Force each name label to be a clique. Args: label (str): node attribute to use as the group id to form the cliques. meta_decision (str): if specified adds clique edges as feedback items with this decision. Otherwise the edges are only explicitly added to the graph. Args: label (str): defaults to 'name_label' meta_decision (str): if specified, the feedback edges added are added this meta decision and with the `user_id=algo:clique`. CommandLine: python -m graphid.core.mixin_helpers ensure_cliques Example: >>> from graphid import demo >>> label = 'name_label' >>> infr = demo.demodata_infr(num_pccs=3, size=5) >>> print(ub.urepr(infr.status())) >>> assert infr.status()['nEdges'] < 33 >>> infr.ensure_cliques() >>> print(ub.urepr(infr.status())) >>> assert infr.status()['nEdges'] == 31 >>> assert infr.status()['nUnrevEdges'] == 12 >>> assert len(list(infr.find_clique_edges(label))) > 0 >>> infr.ensure_cliques(meta_decision=SAME) >>> assert infr.status()['nUnrevEdges'] == 0 >>> assert len(list(infr.find_clique_edges(label))) == 0 """ infr.print('ensure_cliques', 1) new_edges = infr.find_clique_edges(label) infr.print('ensuring %d clique edges' % (len(new_edges)), 2) if meta_decision is None: infr.ensure_edges_from(new_edges) else: infr.add_feedback_from(new_edges, meta_decision=SAME, confidence=const.CONFIDENCE.CODE.GUESSING, user_id='algo:clique', verbose=False)
# infr.assert_disjoint_invariant()
[docs] def ensure_full(infr): """ Explicitly places all edges, but does not make any feedback items """ infr.print('ensure_full with %d nodes' % (len(infr.graph)), 2) new_edges = list(nx.complement(infr.graph).edges()) infr.ensure_edges_from(new_edges)
[docs] def find_clique_edges(infr, label='name_label'): """ Augmenting edges that would complete each the specified cliques. (based on the group inferred from `label`) Args: label (str): node attribute to use as the group id to form the cliques. """ node_to_label = infr.get_node_attrs(label) label_to_nodes = ub.group_items(node_to_label.keys(), node_to_label.values()) new_edges = [] for label, nodes in label_to_nodes.items(): for edge in it.combinations(nodes, 2): if infr.edge_decision(edge) == UNREV: new_edges.append(edge) return new_edges
[docs] def find_mst_edges(infr, label='name_label'): """ Returns edges to augment existing PCCs (by label) in order to ensure they are connected with positive edges. Example: >>> # DISABLE_DOCTEST >>> from graphid.core.mixin_helpers import * # NOQA >>> import ibeis >>> ibs = ibeis.opendb(defaultdb='PZ_MTEST') >>> infr = ibeis.AnnotInference(ibs, 'all', autoinit=True) >>> label = 'orig_name_label' >>> label = 'name_label' >>> infr.find_mst_edges() >>> infr.ensure_mst() Ignore: old_mst_edges = [ e for e, d in infr.edges(data=True) if d.get('user_id', None) == 'algo:mst' ] infr.graph.remove_edges_from(old_mst_edges) infr.pos_graph.remove_edges_from(old_mst_edges) infr.neg_graph.remove_edges_from(old_mst_edges) infr.incomp_graph.remove_edges_from(old_mst_edges) """ # Find clusters by labels node_to_label = infr.get_node_attrs(label) label_to_nodes = ub.group_items(node_to_label.keys(), node_to_label.values()) weight_heuristic = False # infr.ibs is not None if weight_heuristic: annots = infr.ibs.annots(infr.aids) node_to_time = ub.dzip(annots, annots.time) node_to_view = ub.dzip(annots, annots.viewpoint_code) enabled_heuristics = { 'view_weight', 'time_weight', } def _heuristic_weighting(nodes, avail_uv): avail_uv = np.array(avail_uv) weights = np.ones(len(avail_uv)) if 'view_weight' in enabled_heuristics: from graphid.core import _rhomb_dist view_edge = [(node_to_view[u], node_to_view[v]) for (u, v) in avail_uv] view_weight = np.array([ _rhomb_dist.VIEW_CODE_DIST[(v1, v2)] for (v1, v2) in view_edge ]) # Assume comparable by default and prefer undefined # more than probably not, but less than definately so. view_weight[np.isnan(view_weight)] = 1.5 # Prefer viewpoint 10x more than time weights += 10 * view_weight if 'time_weight' in enabled_heuristics: # Prefer linking annotations closer in time times = list(ub.take(node_to_time, nodes)) maxtime = util.safe_max(times, fill=1, nans=False) mintime = util.safe_min(times, fill=0, nans=False) time_denom = maxtime - mintime # Try linking by time for lynx data time_delta = np.array([ abs(node_to_time[u] - node_to_time[v]) for u, v in avail_uv ]) time_weight = time_delta / time_denom weights += time_weight weights = np.array(weights) weights[np.isnan(weights)] = 1.0 avail = [(u, v, {'weight': w}) for (u, v), w in zip(avail_uv, weights)] return avail new_edges = [] prog = ub.ProgIter(list(label_to_nodes.keys()), desc='finding mst edges', enabled=infr.verbose > 0) for nid in prog: nodes = set(label_to_nodes[nid]) if len(nodes) == 1: continue # We want to make this CC connected pos_sub = infr.pos_graph.subgraph(nodes, dynamic=False) impossible = set(it.starmap(e_, it.chain( nxu.edges_inside(infr.neg_graph, nodes), nxu.edges_inside(infr.incomp_graph, nodes), # nxu.edges_inside(infr.unknown_graph, nodes), ))) if len(impossible) == 0 and not weight_heuristic: # Simple mst augmentation aug_edges = list(nxu.k_edge_augmentation(pos_sub, k=1)) else: complement = it.starmap(e_, nxu.complement_edges(pos_sub)) avail_uv = [ (u, v) for u, v in complement if (u, v) not in impossible ] if weight_heuristic: # Can do heuristic weighting to improve the MST avail = _heuristic_weighting(nodes, avail_uv) else: avail = avail_uv # print(len(pos_sub)) try: aug_edges = list(nxu.k_edge_augmentation( pos_sub, k=1, avail=avail)) except nx.NetworkXUnfeasible: print('Warning: MST augmentation is not feasible') print('explicit negative edges might disconnect a PCC') aug_edges = list(nxu.k_edge_augmentation( pos_sub, k=1, avail=avail, partial=True)) new_edges.extend(aug_edges) prog.ensure_newline() for edge in new_edges: assert not infr.graph.has_edge(*edge), ( 'alrady have edge={}'.format(edge)) return new_edges
[docs] def find_connecting_edges(infr): """ Searches for a small set of edges, which if reviewed as positive would ensure that each PCC is k-connected. Note that in somes cases this is not possible """ label = 'name_label' node_to_label = infr.get_node_attrs(label) label_to_nodes = ub.group_items(node_to_label.keys(), node_to_label.values()) # k = infr.params['redun.pos'] k = 1 new_edges = [] prog = ub.ProgIter(list(label_to_nodes.keys()), desc='finding connecting edges', enabled=infr.verbose > 0) for nid in prog: nodes = set(label_to_nodes[nid]) G = infr.pos_graph.subgraph(nodes, dynamic=False) impossible = nxu.edges_inside(infr.neg_graph, nodes) impossible |= nxu.edges_inside(infr.incomp_graph, nodes) candidates = set(nx.complement(G).edges()) candidates.difference_update(impossible) aug_edges = nxu.k_edge_augmentation(G, k=k, avail=candidates) new_edges += aug_edges prog.ensure_newline() return new_edges
if __name__ == '__main__': """ CommandLine: python ~/code/graphid/graphid.core/mixin_helpers.py all """ import xdoctest xdoctest.doctest_module(__file__)