Source code for graphid.demo.dummy_infr

import itertools as it
import networkx as nx
import numpy as np
from graphid import util
from graphid.util import nx_utils as nxu
from graphid.core.state import POSTV, NEGTV, INCMP, UNREV
from graphid.core.state import SAME, DIFF, NULL  # NOQA
import ubelt as ub


[docs] def demodata_infr(**kwargs): """ Kwargs: num_pccs (list): implicit number of individuals ccs (list): explicit list of connected components p_incon (float): probability a PCC is inconsistent p_incomp (float): probability an edge is incomparable n_incon (int): target number of inconsistent components (default 3) pcc_size_mean (int): average number of annots per PCC pcc_size_std (float): std dev of annots per PCC pos_redun (int): desired level of positive redundancy infer (bool): whether or not to run inference by default default True ignore_pair (bool): if True ignores all pairwise dummy edge generation p_pair_neg (float): default = .4 p_pair_incmp (float): default = .2 p_pair_unrev (float): default = 0.0 CommandLine: python -m graphid.demo.dummy_infr demodata_infr:0 --show python -m graphid.demo.dummy_infr demodata_infr:1 --show python -m utool.util_inspect recursive_parse_kwargs:2 --mod graphid.demo.dummy_infr --func demodata_infr Example: >>> from graphid import demo >>> import networkx as nx >>> kwargs = dict(num_pccs=6, p_incon=.5, size_std=2) >>> infr = demo.demodata_infr(**kwargs) >>> pccs = list(infr.positive_components()) >>> assert len(pccs) == kwargs['num_pccs'] >>> nonfull_pccs = [cc for cc in pccs if len(cc) > 1 and nx.is_empty(nx.complement(infr.pos_graph.subgraph(cc)))] >>> expected_n_incon = len(nonfull_pccs) * kwargs['p_incon'] >>> n_incon = len(list(infr.inconsistent_components())) >>> print('status = ' + ub.urepr(infr.status(extended=True))) >>> # xdoctest: +REQUIRES(--show) >>> infr.show(pickable=True, groupby='name_label') >>> util.show_if_requested() Doctest: >>> from graphid import demo >>> import networkx as nx >>> kwargs = dict(num_pccs=0) >>> infr = demo.demodata_infr(**kwargs) """ from graphid.core.annot_inference import AnnotInference from graphid.demo import dummy_algos def kwalias(*args): params = args[0:-1] default = args[-1] for key in params: if key in kwargs: return kwargs[key] return default num_pccs = kwalias('num_pccs', 16) size_mean = kwalias('pcc_size_mean', 'pcc_size', 'size', 5) size_std = kwalias('pcc_size_std', 'size_std', 0) # p_pcc_incon = kwargs.get('p_incon', .1) p_pcc_incon = kwargs.get('p_incon', 0) p_pcc_incomp = kwargs.get('p_incomp', 0) pcc_sizes = kwalias('pcc_sizes', None) pos_redun = kwalias('pos_redun', [1, 2, 3]) pos_redun = util.ensure_iterable(pos_redun) # number of maximum inconsistent edges per pcc max_n_incon = kwargs.get('n_incon', 3) rng = np.random.RandomState(0) counter = 1 if pcc_sizes is None: pcc_sizes = [int(util.randn(size_mean, size_std, rng=rng, a_min=1)) for _ in range(num_pccs)] else: num_pccs = len(pcc_sizes) if 'ccs' in kwargs: # Overwrites other options pcc_sizes = list(map(len, kwargs['ccs'])) num_pccs = len(pcc_sizes) size_mean = None size_std = 0 new_ccs = [] pcc_iter = list(enumerate(pcc_sizes)) pcc_iter = ub.ProgIter(pcc_iter, enabled=num_pccs > 20, desc='make pos-demo') for i, size in pcc_iter: p = .1 want_connectivity = rng.choice(pos_redun) want_connectivity = min(size - 1, want_connectivity) # Create basic graph of positive edges with desired connectivity g = nxu.random_k_edge_connected_graph( size, k=want_connectivity, p=p, rng=rng) nx.set_edge_attributes(g, name='evidence_decision', values=POSTV) nx.set_edge_attributes(g, name='truth', values=POSTV) # nx.set_node_attributes(g, name='orig_name_label', values=i) assert nx.is_connected(g) # Relabel graph with non-conflicting names if 'ccs' in kwargs: g = nx.relabel_nodes(g, dict(enumerate(kwargs['ccs'][i]))) else: # Make sure nodes do not conflict with others g = nx.relabel_nodes(g, dict( enumerate(range(counter, len(g) + counter + 1)))) counter += len(g) # The probability any edge is inconsistent is `p_incon` # This is 1 - P(all edges consistent) # which means p(edge is consistent) = (1 - p_incon) / N complement_edges = util.estarmap(nxu.e_, nxu.complement_edges(g)) if len(complement_edges) > 0: # compute probability that any particular edge is inconsistent # to achieve probability the PCC is inconsistent p_edge_inconn = 1 - (1 - p_pcc_incon) ** (1 / len(complement_edges)) p_edge_unrev = .1 p_edge_notcomp = 1 - (1 - p_pcc_incomp) ** (1 / len(complement_edges)) probs = np.array([p_edge_inconn, p_edge_unrev, p_edge_notcomp]) # if the total probability is greater than 1 the parameters # are invalid, so we renormalize to "fix" it. # if probs.sum() > 1: # warnings.warn('probabilities sum to more than 1') # probs = probs / probs.sum() pcumsum = probs.cumsum() # Determine which mutually exclusive state each complement edge is in # print('pcumsum = %r' % (pcumsum,)) states = np.searchsorted(pcumsum, rng.rand(len(complement_edges))) incon_idxs = np.where(states == 0)[0] if len(incon_idxs) > max_n_incon: print('max_n_incon = %r' % (max_n_incon,)) chosen = rng.choice(incon_idxs, max_n_incon, replace=False) states[np.setdiff1d(incon_idxs, chosen)] = len(probs) grouped_edges = ub.group_items(complement_edges, states) for state, edges in grouped_edges.items(): truth = POSTV if state == 0: # Add in inconsistent edges evidence_decision = NEGTV # TODO: truth could be INCMP or POSTV # new_edges.append((u, v, {'evidence_decision': NEGTV})) elif state == 1: evidence_decision = UNREV # TODO: truth could be INCMP or POSTV # new_edges.append((u, v, {'evidence_decision': UNREV})) elif state == 2: evidence_decision = INCMP truth = INCMP else: continue # Add in candidate edges attrs = {'evidence_decision': evidence_decision, 'truth': truth} for (u, v) in edges: g.add_edge(u, v, **attrs) new_ccs.append(g) # (list(g.nodes()), new_edges)) if len(new_ccs) == 0: pos_g = nx.Graph() else: pos_g = nx.union_all(new_ccs) assert len(new_ccs) == len(list(nx.connected_components(pos_g))) assert num_pccs == len(new_ccs) # Add edges between the PCCS neg_edges = [] if not kwalias('ignore_pair', False): print('making pairs') pair_attrs_lookup = { 0: {'evidence_decision': NEGTV, 'truth': NEGTV}, 1: {'evidence_decision': INCMP, 'truth': INCMP}, 2: {'evidence_decision': UNREV, 'truth': NEGTV}, # could be incomp or neg } # These are the probabilities that one edge has this state p_pair_neg = kwalias('p_pair_neg', .4) p_pair_incmp = kwalias('p_pair_incmp', .2) p_pair_unrev = kwalias('p_pair_unrev', 0) # p_pair_neg = 1 cc_combos = ((list(g1.nodes()), list(g2.nodes())) for (g1, g2) in it.combinations(new_ccs, 2)) valid_cc_combos = [ (cc1, cc2) for cc1, cc2 in cc_combos if len(cc1) and len(cc2) ] for cc1, cc2 in ub.ProgIter(valid_cc_combos, desc='make neg-demo'): possible_edges = util.estarmap(nxu.e_, it.product(cc1, cc2)) # probability that any edge between these PCCs is negative n_edges = len(possible_edges) p_edge_neg = 1 - (1 - p_pair_neg) ** (1 / n_edges) p_edge_incmp = 1 - (1 - p_pair_incmp) ** (1 / n_edges) p_edge_unrev = 1 - (1 - p_pair_unrev) ** (1 / n_edges) # Create event space with sizes proportional to probabilities pcumsum = np.cumsum([p_edge_neg, p_edge_incmp, p_edge_unrev]) # Roll dice for each of the edge to see which state it lands on possible_pstate = rng.rand(len(possible_edges)) states = np.searchsorted(pcumsum, possible_pstate) flags = states < len(pcumsum) stateful_states = states.compress(flags) stateful_edges = list(ub.compress(possible_edges, flags)) unique_states, groupxs_list = util.group_indices(stateful_states) for state, groupxs in zip(unique_states, groupxs_list): # print('state = %r' % (state,)) # Add in candidate edges edges = list(ub.take(stateful_edges, groupxs)) attrs = pair_attrs_lookup[state] for (u, v) in edges: neg_edges.append((u, v, attrs)) print('Made {} neg_edges between PCCS'.format(len(neg_edges))) else: print('ignoring pairs') G = AnnotInference._graph_cls() G.add_nodes_from(pos_g.nodes(data=True)) G.add_edges_from(pos_g.edges(data=True)) G.add_edges_from(neg_edges) infr = AnnotInference.from_netx(G, infer=kwargs.get('infer', True)) infr.verbose = 3 infr.relabel_using_reviews(rectify=False) # fontname = 'Ubuntu' fontsize = 12 fontname = 'sans' splines = 'spline' # splines = 'ortho' # splines = 'line' infr.set_node_attrs('shape', 'circle') infr.graph.graph['ignore_labels'] = True infr.graph.graph['dark_background'] = False infr.graph.graph['fontname'] = fontname infr.graph.graph['fontsize'] = fontsize infr.graph.graph['splines'] = splines infr.set_node_attrs('width', 29) infr.set_node_attrs('height', 29) infr.set_node_attrs('fontsize', fontsize) infr.set_node_attrs('fontname', fontname) infr.set_node_attrs('fixed_size', True) # Set synthetic ground-truth attributes for testing infr.edge_truth = infr.get_edge_attrs('truth') # Make synthetic verif dummy_verif = dummy_algos.DummyVerif(infr) dummy_ranker = dummy_algos.DummyRanker(dummy_verif) infr.set_verifier(dummy_verif) infr.set_ranker(dummy_ranker) infr.dummy_verif = dummy_verif infr.demokw = kwargs return infr
if __name__ == '__main__': """ CommandLine: python -m graphid.demo.dummy_infr all """ import xdoctest xdoctest.doctest_module(__file__)