Source code for graphid.core.mixin_loops

# -*- coding: utf-8 -*-
import numpy as np
import ubelt as ub
import pandas as pd
import itertools as it
from graphid.core import state as const
from graphid import util
from graphid.core.state import (POSTV, NEGTV, INCMP, NULL)
from graphid.core.refresh import RefreshCriteria


[docs] class InfrLoops(object): """ Algorithm control flow loops """
[docs] def main_gen(infr, max_loops=None, use_refresh=True): """ The main outer loop. This function is designed as an iterator that will execute the graph algorithm main loop as automatically as possible, but if user input is needed, it will pause and yield the decision it needs help with. Once feedback is given for this item, you can continue the main loop by calling next. StopIteration is raised once the algorithm is complete. Args: max_loops(int): maximum number of times to run the outer loop, i.e. ranking is run at most this many times. use_refresh(bool): allow the refresh criterion to stop the algo Notes: Different phases of the main loop are implemented as subiterators CommandLine: python -m graphid.core.mixin_loops InfrLoops.main_gen Example: >>> from graphid.core.mixin_simulation import UserOracle >>> from graphid import demo >>> infr = demo.demodata_infr(num_pccs=3, size=5) >>> infr.params['manual.n_peek'] = 10 >>> infr.params['ranking.ntop'] = 1 >>> infr.oracle = UserOracle(.99, rng=0) >>> infr.simulation_mode = False >>> infr.reset() >>> gen = infr.main_gen() >>> while True: >>> try: >>> reviews = next(gen) >>> edge, priority, data = reviews[0] >>> feedback = infr.request_oracle_review(edge) >>> infr.add_feedback(edge, **feedback) >>> except StopIteration: >>> break """ infr.print('Starting main loop', 1) infr.print('infr.params = {}'.format(ub.urepr(infr.params))) if max_loops is None: max_loops = infr.params['algo.max_outer_loops'] if max_loops is None: max_loops = np.inf if infr.test_mode: print('------------------ {} -------------------'.format(infr.name)) # Initialize a refresh criteria infr.init_refresh() # Phase 0.1: Ensure the user sees something immediately if infr.params['algo.quickstart']: infr.loop_phase = 'quickstart_init' # quick startup. Yield a bunch of random edges num = infr.params['manual.n_peek'] user_request = [] for edge in util.random_combinations(infr.aids, 2, num=num): user_request += [infr._make_review_tuple(edge, None)] yield user_request if infr.params['algo.hardcase']: infr.loop_phase = 'hardcase_init' # Check previously labeled edges that where the groundtruth and the # verifier disagree. yield from infr.hardcase_review_gen() if infr.params['inference.enabled']: infr.loop_phase = 'incon_recover_init' # First, fix any inconsistencies yield from infr.incon_recovery_gen() # Phase 0.2: Ensure positive redundancy (this is generally quick) # so the user starts seeing real work after one random review is made # unless the graph is already positive redundant. if infr.params['redun.enabled'] and infr.params['redun.enforce_pos']: infr.loop_phase = 'pos_redun_init' # Fix positive redundancy of anything within the loop yield from infr.pos_redun_gen() if infr.params['ranking.enabled']: for count in it.count(0): infr.print('Outer loop iter %d ' % (count,)) # Phase 1: Try to merge PCCs by searching for LNBNN candidates infr.loop_phase = 'ranking_{}'.format(count) yield from infr.ranked_list_gen(use_refresh) terminate = (infr.refresh.num_meaningful == 0) if terminate: infr.print('Triggered break criteria', 1, color='red') # Phase 2: Ensure positive redundancy. infr.loop_phase = 'posredun_{}'.format(count) if all(ub.take(infr.params, ['redun.enabled', 'redun.enforce_pos'])): # Fix positive redundancy of anything within the loop yield from infr.pos_redun_gen() print('prob_any_remain = %r' % (infr.refresh.prob_any_remain(),)) print('infr.refresh.num_meaningful = {!r}'.format( infr.refresh.num_meaningful)) if (count + 1) >= max_loops: infr.print('early stop', 1, color='red') break if terminate: infr.print('break triggered') break if all(ub.take(infr.params, ['redun.enabled', 'redun.enforce_neg'])): # Phase 3: Try to automatically acheive negative redundancy without # asking the user to do anything but resolve inconsistency. infr.print('Entering phase 3', 1, color='red') infr.loop_phase = 'negredun' yield from infr.neg_redun_gen() infr.print('Terminate', 1, color='red') infr.print('Exiting main loop') if infr.params['inference.enabled']: infr.assert_consistency_invariant()
[docs] def hardcase_review_gen(infr): """ Subiterator for hardcase review Re-review non-confident edges that vsone did not classify correctly """ infr.print('==============================', color='white') infr.print('--- HARDCASE PRIORITY LOOP ---', color='white') verifiers = infr.learn_evaluation_verifiers() verif = verifiers['match_state'] edges_ = list(infr.edges()) real_ = list(infr.edge_decision_from(edges_)) flags_ = [r in {POSTV, NEGTV, INCMP} for r in real_] real = list(ub.compress(real_, flags_)) edges = list(ub.compress(edges_, flags_)) hardness = 1 - verif.easiness(edges, real) if True: df = pd.DataFrame({'edges': edges, 'real': real}) df['hardness'] = hardness pred = verif.predict(edges) df['pred'] = pred.values df.sort_values('hardness', ascending=False) infr.print('hardness analysis') infr.print(str(df)) infr.print('infr status: ' + ub.urepr(infr.status())) # Don't re-review anything that was confidently reviewed # CONFIDENCE = const.CONFIDENCE # CODE_TO_INT = CONFIDENCE.CODE_TO_INT.copy() # CODE_TO_INT[CONFIDENCE.CODE.UNKNOWN] = 0 # conf = ub.take(CODE_TO_INT, infr.gen_edge_values( # 'confidence', edges, on_missing='default', # default=CONFIDENCE.CODE.UNKNOWN)) # This should only be run with certain params assert not infr.params['autoreview.enabled'] assert not infr.params['redun.enabled'] assert not infr.params['ranking.enabled'] assert infr.params['inference.enabled'] # const.CONFIDENCE.CODE.PRETTY_SURE if infr.params['queue.conf.thresh'] is None: # != 'pretty_sure': infr.print('WARNING: should queue.conf.thresh = "pretty_sure"?') # work around add_candidate_edges infr.prioritize(metric='hardness', edges=edges, scores=hardness) infr.set_edge_attrs('hardness', ub.dzip(edges, hardness)) yield from infr._inner_priority_gen(use_refresh=False)
[docs] def ranked_list_gen(infr, use_refresh=True): """ Subiterator for phase1 of the main algorithm Calls the underlying ranking algorithm and prioritizes the results """ infr.print('============================', color='white') infr.print('--- RANKED LIST LOOP ---', color='white') n_prioritized = infr.refresh_candidate_edges() if n_prioritized == 0: infr.print('RANKING ALGO FOUND NO NEW EDGES') return if use_refresh: infr.refresh.clear() yield from infr._inner_priority_gen(use_refresh)
[docs] def incon_recovery_gen(infr): """ Subiterator for recovery mode of the mainm algorithm Iterates until the graph is consistent Note: inconsistency recovery is implicitly handled by the main algorithm, so other phases do not need to call this explicitly. This exists for the case where the only mode we wish to run is inconsistency recovery. """ maybe_error_edges = list(infr.maybe_error_edges()) if len(maybe_error_edges) == 0: return # raise StopIteration() infr.print('============================', color='white') infr.print('--- INCON RECOVER LOOP ---', color='white') infr.queue.clear() infr.add_candidate_edges(maybe_error_edges) yield from infr._inner_priority_gen(use_refresh=False)
[docs] def pos_redun_gen(infr): """ Subiterator for phase2 of the main algorithm. Searches for decisions that would commplete positive redundancy CommandLine: python -m graphid.core.mixin_loops InfrLoops.pos_redun_gen Example: >>> from graphid.core.mixin_loops import * >>> from graphid import demo >>> infr = demo.demodata_infr(num_pccs=3, size=5, pos_redun=1) >>> gen = infr.pos_redun_gen() >>> feedback = next(gen) >>> edge_ = feedback[0][0] >>> print(edge_) (1, 5) """ infr.print('===========================', color='white') infr.print('--- POSITIVE REDUN LOOP ---', color='white') # FIXME: should prioritize inconsistentices first count = -1 def serial_gen(): # use this if threading does bad things if True: new_edges = list(infr.find_pos_redun_candidate_edges()) if len(new_edges) > 0: infr.add_candidate_edges(new_edges) yield new_edges else: for new_edges in ub.chunks(infr.find_pos_redun_candidate_edges(), 100): if len(new_edges) > 0: infr.add_candidate_edges(new_edges) yield new_edges def filtered_gen(): # Buffer one-vs-one scores in the background and present an edge to # the user ASAP. # if infr.test_mode: candgen = serial_gen() for new_edges in candgen: yield new_edges for count in it.count(0): infr.print('check pos-redun iter {}'.format(count)) infr.queue.clear() found_any = False for new_edges in filtered_gen(): found_any = True gen = infr._inner_priority_gen(use_refresh=False) for value in gen: yield value print('found_any = {!r}'.format(found_any)) if not found_any: break infr.print('not pos-reduntant yet.', color='white') infr.print( 'pos-redundancy achieved in {} iterations'.format( count + 1))
[docs] def neg_redun_gen(infr): """ Subiterator for phase3 of the main algorithm. Searches for decisions that would commplete negative redundancy """ infr.print('===========================', color='white') infr.print('--- NEGATIVE REDUN LOOP ---', color='white') infr.queue.clear() only_auto = infr.params['redun.neg.only_auto'] # TODO: outer loop that re-iterates until negative redundancy is # accomplished. needs_neg_redun = infr.find_neg_redun_candidate_edges() chunksize = 500 for new_edges in ub.chunks(needs_neg_redun, chunksize): infr.print('another neg redun chunk') # Add chunks in a little at a time for faster response time infr.add_candidate_edges(new_edges) gen = infr._inner_priority_gen(use_refresh=False, only_auto=only_auto) for value in gen: yield value
[docs] def _inner_priority_gen(infr, use_refresh=False, only_auto=False): """ Helper function that implements the general inner priority loop. Executes reviews until the queue is empty or needs refresh Args: user_refresh (bool): if True enables the refresh criteria. (set to True in Phase 1) only_auto (bool) if True, then the user wont be prompted with reviews unless the graph is inconsistent. (set to True in Phase 3) Notes: The caller is responsible for populating the priority queue. This will iterate until the queue is empty or the refresh critieron is triggered. """ if infr.refresh: infr.refresh.enabled = use_refresh infr.print('Start inner loop with {} items in the queue'.format( len(infr.queue))) for count in it.count(0): if infr.is_recovering(): infr.print('Still recovering after %d iterations' % (count,), 3, color='turquoise') else: # Do not check for refresh if we are recovering if use_refresh and infr.refresh.check(): infr.print('Triggered refresh criteria after %d iterations' % (count,), 1, color='yellow') break # If the queue is empty break if len(infr.queue) == 0: infr.print('No more edges after %d iterations, need refresh' % (count,), 1, color='yellow') break # Try to automatically do the next review. edge, priority = infr.peek() infr.print('next_review. edge={}'.format(edge), 100) inconsistent = infr.is_recovering(edge) feedback = None if infr.params['autoreview.enabled'] and not inconsistent: # Try to autoreview if we aren't in an inconsistent state feedback = infr.try_auto_review(edge) if feedback is not None: # Add feedback from the automated method infr.add_feedback(edge, priority=priority, **feedback) else: # We can't automatically review, ask for help if only_auto and not inconsistent: # We are in auto only mode, skip manual review # unless there is an inconsistency infr.skip(edge) else: if infr.simulation_mode: # Use oracle feedback feedback = infr.request_oracle_review(edge) infr.add_feedback(edge, priority=priority, **feedback) else: # Yield to the user if we need to pause user_request = infr.emit_manual_review(edge, priority) yield user_request if infr.metrics_list: infr._print_previous_loop_statistics(count)
[docs] def init_refresh(infr): refresh_params = infr.subparams('refresh') infr.refresh = RefreshCriteria(**refresh_params)
[docs] def start_id_review(infr, max_loops=None, use_refresh=None): assert infr._gen is None, 'algo already running' # Just exhaust the main generator infr._gen = infr.main_gen(max_loops=max_loops, use_refresh=use_refresh)
# return infr._gen
[docs] def main_loop(infr, max_loops=None, use_refresh=True): """ DEPRICATED use list(infr.main_gen) instead or assert not any(infr.main_gen()) maybe this is fine. """ infr.start_id_review(max_loops=max_loops, use_refresh=use_refresh) # To automatically run through the loop just exhaust the generator try: result = next(infr._gen) assert result is None, 'need user interaction. cannot auto loop' except StopIteration: pass infr._gen = None
[docs] class InfrReviewers(object):
[docs] def try_auto_review(infr, edge): review = { 'user_id': 'algo:auto_clf', 'confidence': const.CONFIDENCE.CODE.PRETTY_SURE, 'evidence_decision': None, 'meta_decision': NULL, 'timestamp_s1': None, 'timestamp_c1': None, 'timestamp_c2': None, 'tags': [], } if infr.is_recovering(): # Do not autoreview if we are in an inconsistent state infr.print('Must manually review inconsistent edge', 3) return None # Determine if anything passes the match threshold primary_task = 'match_state' try: decision_probs = infr.task_probs[primary_task][edge] except KeyError: if infr.verifiers is None: return None if infr.verifiers.get(primary_task, None) is None: return None # Compute probs if they haven't been done yet infr.ensure_priority_scores([edge]) try: decision_probs = infr.task_probs[primary_task][edge] except KeyError: return None primary_thresh = infr.task_thresh[primary_task] decision_flags = {k: decision_probs[k] > thresh for k, thresh in primary_thresh.items()} hasone = sum(decision_flags.values()) == 1 auto_flag = False if hasone: try: # Check to see if it might be confounded by a photobomb pb_probs = infr.task_probs['photobomb_state'][edge] # pb_probs = infr.task_probs['photobomb_state'].loc[edge] # pb_probs = data['task_probs']['photobomb_state'] pb_thresh = infr.task_thresh['photobomb_state']['pb'] confounded = pb_probs['pb'] > pb_thresh except KeyError: print('Warning: confounding task probs not set (i.e. photobombs)') confounded = False if not confounded: # decision = decision_flags.argmax() evidence_decision = ub.argmax(decision_probs) review['evidence_decision'] = evidence_decision # truth = infr.match_state_gt(edge) truth = infr.dummy_verif._get_truth(edge) if review['evidence_decision'] != truth: infr.print( 'AUTOMATIC ERROR edge={}, truth={}, decision={}, probs={}'.format( edge, truth, review['evidence_decision'], decision_probs), 2, color='darkred') auto_flag = True if auto_flag and infr.verbose > 1: infr.print('Automatic review success') if auto_flag: return review else: return None
[docs] def request_oracle_review(infr, edge, **kw): truth = infr.dummy_verif._get_truth(edge) # truth = infr.match_state_gt(edge) feedback = infr.oracle.review(edge, truth, infr, **kw) return feedback
[docs] def _make_review_tuple(infr, edge, priority=None): """ Makes tuple to be sent back to the user """ edge_data = infr.get_nonvisual_edge_data( edge, on_missing='default') # Extra information edge_data['nid_edge'] = infr.pos_graph.node_labels(*edge) if infr.queue is None: edge_data['queue_len'] = 0 else: edge_data['queue_len'] = len(infr.queue) edge_data['n_ccs'] = ( len(infr.pos_graph.connected_to(edge[0])), len(infr.pos_graph.connected_to(edge[1])) ) return (edge, priority, edge_data)
[docs] def emit_manual_review(infr, edge, priority=None): """ Emits a signal containing edges that need review. The callback should present them to a user, get feedback, and then call on_accpet. """ infr.print('emit_manual_review', 100) # Emit a list of reviews that can be considered. # The first is the most important user_request = [] user_request += [infr._make_review_tuple(edge, priority)] try: for edge_, priority in infr.peek_many(infr.params['manual.n_peek']): if edge == edge_: continue user_request += [infr._make_review_tuple(edge_, priority)] except TypeError: pass # If registered, send the request via a callback. request_review = infr.callbacks.get('request_review', None) if request_review is not None: # Send these reviews to a user request_review(user_request) # Otherwise the current process must handle the request by return value return user_request
[docs] def skip(infr, edge): infr.print('skipping edge={}'.format(edge), 100) try: del infr.queue[edge] except Exception: pass
[docs] def accept(infr, feedback): """ Called when user has completed feedback from qt or web """ annot1_state = feedback.pop('annot1_state', None) annot2_state = feedback.pop('annot2_state', None) if annot1_state: infr.add_node_feedback(**annot1_state) if annot2_state: infr.add_node_feedback(**annot2_state) infr.add_feedback(**feedback)
[docs] def continue_review(infr): infr.print('continue_review', 10) if infr._gen is None: return None try: user_request = next(infr._gen) except StopIteration: review_finished = infr.callbacks.get('review_finished', None) if review_finished is not None: review_finished() infr._gen = None user_request = None return user_request
if __name__ == '__main__': """ CommandLine: python ~/code/graphid/graphid.core/mixin_loops.py all """ import xdoctest xdoctest.doctest_module(__file__)