Source code for graphid.core.mixin_callbacks

import numpy as np
import ubelt as ub
import pandas as pd
from graphid.core.state import (POSTV, NEGTV, INCMP, NULL)  # NOQA
from graphid import util


[docs] class InfrCallbacks(object): """ Methods relating to callbacks that must be registered with the inference object for it to work properly. """
[docs] def set_ranker(infr, ranker): """ ranker should be a function that accepts a list of annotation ids and return a list of the top K ranked annotations. """ infr.ranker = ranker
[docs] def set_verifier(infr, verifier, task='match_state'): """ verifier should be a function that accepts a list of annotation pairs and produces the 3-state match_state probabilities. """ if infr.verifiers is None: infr.verifiers = {} infr.verifiers[task] = verifier infr.verifier = verifier
# def set_edge_attr_predictor(infr, func): # infr.predict_edge_attrs = func # def set_node_attr_predictor(infr, func): # infr.predict_node_attrs = func # def _default_candidate_edge_search(infr): # raise NotImplementedError
[docs] def refresh_candidate_edges(infr): """ CommandLine: python -m graphid.core.mixin_callbacks InfrCallbacks.refresh_candidate_edges Example: >>> from graphid import demo >>> kwargs = dict(num_pccs=40, size=2) >>> infr = demo.demodata_infr(**kwargs) >>> infr.refresh_candidate_edges() """ infr.print('refresh_candidate_edges', 1) infr.assert_consistency_invariant() if hasattr(infr, 'dummy_verif'): infr.print('Searching for dummy candidates') infr.print('dummy vsone params =' + ub.urepr( infr.dummy_verif.dummy_params, nl=1, si=True)) if infr.ranker is None: raise Exception( 'No method available to search for candidate edges') ranks_top = infr.params['ranking.ntop'] qaids = list(infr.aids) rankings = infr.ranker.predict_rankings(qaids, K=ranks_top) candidate_edges = [ infr.e_(aid, v) for aid, rankings in zip(qaids, rankings) for v in rankings ] infr.add_candidate_edges(candidate_edges) infr.assert_consistency_invariant()
[docs] class InfrCandidates(object): """ Methods that should be used by callbacks to add new edges to be considered as candidates in the priority queue. """
[docs] def add_candidate_edges(infr, candidate_edges): candidate_edges = list(candidate_edges) new_edges = infr.ensure_edges_from(candidate_edges) if infr.params['redun.enabled']: priority_edges = list(infr.filter_edges_flagged_as_redun( candidate_edges)) infr.print('Got {} candidate edges, {} are new, ' 'and {} are non-redundant'.format( len(candidate_edges), len(new_edges), len(priority_edges))) else: infr.print('Got {} candidate edges and {} are new'.format( len(candidate_edges), len(new_edges))) priority_edges = candidate_edges if len(priority_edges) > 0: priority_edges = list(priority_edges) metric, priority = infr.ensure_priority_scores(priority_edges) infr.prioritize(metric=metric, edges=priority_edges, scores=priority) if hasattr(infr, 'on_new_candidate_edges'): # hack callback for demo infr.on_new_candidate_edges(infr, new_edges) return len(priority_edges)
[docs] def ensure_task_probs(infr, edges): """ Ensures that probabilities are assigned to the edges. This gaurentees that infr.task_probs contains data for edges. (Currently only the primary task is actually ensured) CommandLine: python -m graphid.core.mixin_callbacks InfrCandidates.ensure_task_probs Doctest: >>> from graphid import demo >>> infr = demo.demodata_infr(num_pccs=6, p_incon=.5, size_std=2) >>> edges = list(infr.edges()) >>> infr.ensure_task_probs(edges) >>> assert all([np.isclose(sum(p.values()), 1) >>> for p in infr.task_probs['match_state'].values()]) """ if not infr.verifiers: raise Exception('Verifiers are needed to predict probabilities') # Construct pairwise features on edges in infr primary_task = 'match_state' match_task = infr.task_probs[primary_task] need_flags = [e not in match_task for e in edges] if any(need_flags): need_edges = list(ub.compress(edges, need_flags)) infr.print('There are {} edges without probabilities'.format( len(need_edges)), 1) # Only recompute for the needed edges # task_probs = infr._make_task_probs(need_edges) task_probs = { primary_task: infr.verifier.predict_proba_df(need_edges) } # Store task probs in internal data structure # FIXME: this is slow for task, probs in task_probs.items(): probs_dict = probs.to_dict(orient='index') if task not in infr.task_probs: infr.task_probs[task] = probs_dict else: infr.task_probs[task].update(probs_dict) # Set edge task attribute as well infr.set_edge_attrs(task, probs_dict)
[docs] def ensure_priority_scores(infr, priority_edges): """ Ensures that priority attributes are assigned to the edges. This does not change the state of the queue. Doctest: >>> from graphid import demo >>> infr = demo.demodata_infr(num_pccs=6, p_incon=.5, size_std=2) >>> edges = list(infr.edges()) >>> infr.ensure_priority_scores(edges) """ if infr.verifiers: infr.print('Prioritizing {} edges with one-vs-one probs'.format( len(priority_edges)), 1) infr.ensure_task_probs(priority_edges) primary_task = 'match_state' match_probs = infr.task_probs[primary_task] primary_thresh = infr.task_thresh[primary_task] # Read match_probs into a DataFrame primary_probs = pd.DataFrame( list(ub.take(match_probs, priority_edges)), index=util.ensure_multi_index(priority_edges, ('aid1', 'aid2')) ) # Convert match-state probabilities into priorities prob_match = primary_probs[POSTV] # Initialize priorities to probability of matching default_priority = prob_match.copy() # If the edges are currently between the same individual, then # prioritize by non-positive probability (because those edges might # expose an inconsistency) already_pos = [ infr.pos_graph.node_label(u) == infr.pos_graph.node_label(v) for u, v in priority_edges ] default_priority[already_pos] = 1 - default_priority[already_pos] if infr.params['autoreview.enabled']: if infr.params['autoreview.prioritize_nonpos']: # Give positives that pass automatic thresholds high priority _probs = primary_probs[POSTV] flags = _probs > primary_thresh[POSTV] default_priority[flags] = np.maximum(default_priority[flags], _probs[flags]) + 1 # Give negatives that pass automatic thresholds high priority _probs = primary_probs[NEGTV] flags = _probs > primary_thresh[NEGTV] default_priority[flags] = np.maximum(default_priority[flags], _probs[flags]) + 1 # Give not-comps that pass automatic thresholds high priority _probs = primary_probs[INCMP] flags = _probs > primary_thresh[INCMP] default_priority[flags] = np.maximum(default_priority[flags], _probs[flags]) + 1 infr.set_edge_attrs('prob_match', prob_match.to_dict()) infr.set_edge_attrs('default_priority', default_priority.to_dict()) metric = 'default_priority' priority = default_priority elif infr.cm_list is not None: infr.print( 'Prioritizing {} edges with one-vs-vsmany scores'.format( len(priority_edges))) # Not given any deploy classifier, this is the best we can do scores = infr._make_lnbnn_scores(priority_edges) metric = 'normscore' priority = scores else: infr.print( 'WARNING: No verifiers to prioritize {} edge(s)'.format( len(priority_edges))) metric = 'random' priority = np.zeros(len(priority_edges)) + 1e-6 infr.set_edge_attrs(metric, ub.dzip(priority_edges, priority)) return metric, priority
if __name__ == '__main__': """ CommandLine: python -m graphid.core.mixin_callbacks all """ import xdoctest xdoctest.doctest_module(__file__)