Source code for graphid.util.name_rectifier

import ubelt as ub
import numpy as np
import scipy.optimize


[docs] def demodata_oldnames(n_incon_groups=10, n_con_groups=2, n_per_con=5, n_per_incon=5, con_sep=4, n_empty_groups=0): import numpy as np rng = np.random.RandomState(42) rng.randint(1, con_sep + 1) n_incon_labels = rng.randint(0, n_incon_groups + 1) incon_labels = list(range(n_incon_labels)) # Build up inconsistent groups that may share labels with other groups n_per_incon_list = [rng.randint(min(2, n_per_incon), n_per_incon + 1) for _ in range(n_incon_groups)] incon_groups = [ rng.choice(incon_labels, n, replace=True).tolist() for n in n_per_incon_list ] # Build up consistent groups that may have multiple lables, but does not # share labels with any other group con_groups = [] offset = n_incon_labels + 1 for _ in range(n_con_groups): this_n_per = rng.randint(1, n_per_con + 1) this_n_avail = rng.randint(1, con_sep + 1) this_avail_labels = list(range(offset, offset + this_n_avail)) this_labels = rng.choice(this_avail_labels, this_n_per, replace=True) con_groups.append(this_labels.tolist()) offset += this_n_avail empty_groups = [[] for _ in range(n_empty_groups)] grouped_oldnames = incon_groups + con_groups + empty_groups # rng.shuffle(grouped_oldnames) return grouped_oldnames
[docs] def simple_munkres(part_oldnames): """ Defines a munkres problem to solve name rectification. Notes: We create a matrix where each rows represents a group of annotations in the same PCC and each column represents an original name. If there are more PCCs than original names the columns are padded with extra values. The matrix is first initialized to be negative infinity representing impossible assignments. Then for each column representing a padded name, we set we its value to $1$ indicating that each new name could be assigned to a padded name for some small profit. Finally, let $f_{rc}$ be the the number of annotations in row $r$ with an original name of $c$. Each matrix value $(r, c)$ is set to $f_{rc} + 1$ if $f_{rc} > 0$, to represent how much each name ``wants'' to be labeled with a particular original name, and the extra one ensures that these original names are always preferred over padded names. Example: >>> part_oldnames = [['a', 'b'], ['b', 'c'], ['c', 'a', 'a']] >>> new_names = simple_munkres(part_oldnames) >>> result = ub.urepr(new_names) >>> print(new_names) ['b', 'c', 'a'] Example: >>> part_oldnames = [[], ['a', 'a'], [], >>> ['a', 'a', 'a', 'a', 'a', 'a', 'a', 'b'], ['a']] >>> new_names = simple_munkres(part_oldnames) >>> result = ub.urepr(new_names) >>> print(new_names) [None, 'a', None, 'b', None] Example: >>> part_oldnames = [[], ['b'], ['a', 'b', 'c'], ['b', 'c'], ['c', 'e', 'e']] >>> new_names = find_consistent_labeling(part_oldnames) >>> result = ub.urepr(new_names) >>> print(new_names) ['_extra_name0', 'b', 'a', 'c', 'e'] Profit Matrix b a c e _0 0 -10 -10 -10 -10 1 1 2 -10 -10 -10 1 2 2 2 2 -10 1 3 2 -10 2 -10 1 4 -10 -10 2 3 1 """ unique_old_names = list(ub.unique(ub.flatten(part_oldnames))) num_new_names = len(part_oldnames) num_old_names = len(unique_old_names) # Create padded dummy values. This accounts for the case where it is # impossible to uniquely map to the old db num_pad = max(num_new_names - num_old_names, 0) total = num_old_names + num_pad shape = (total, total) # Allocate assignment matrix. # rows are new-names and cols are old-names. # Initially the profit of any assignment is effectively -inf # This effectively marks all assignments as invalid profit_matrix = np.full(shape, -2 * total, dtype=int) # Overwrite valid assignments with positive profits from graphid import util oldname2_idx = util.make_index_lookup(unique_old_names) name_freq_list = [ub.dict_hist(names) for names in part_oldnames] # Initialize profit of a valid assignment as 1 + freq # This incentivizes using a previously used name for rowx, name_freq in enumerate(name_freq_list): for name, freq in name_freq.items(): colx = oldname2_idx[name] profit_matrix[rowx, colx] = freq + 1 # Set a much smaller profit for using an extra name # This allows the solution to always exist profit_matrix[:, num_old_names:total] = 1 # Convert to minimization problem big_value = (profit_matrix.max()) - (profit_matrix.min()) cost_matrix = big_value - profit_matrix # Use scipy implementation of munkres algorithm. rx2_cx = dict(zip(*scipy.optimize.linear_sum_assignment(cost_matrix))) # Each row (new-name) has now been assigned a column (old-name) # Map this back to the input-space (using None to indicate extras) cx2_name = dict(enumerate(unique_old_names)) if False: import pandas as pd columns = unique_old_names + ['_%r' % x for x in range(num_pad)] print('Profit Matrix') print(pd.DataFrame(profit_matrix, columns=columns)) print('Cost Matrix') print(pd.DataFrame(cost_matrix, columns=columns)) assignment_ = [cx2_name.get(rx2_cx[rx], None) for rx in range(num_new_names)] return assignment_
[docs] def find_consistent_labeling(grouped_oldnames, extra_prefix='_extra_name', verbose=False): """ Solves a a maximum bipirtite matching problem to find a consistent name assignment that minimizes the number of annotations with different names. For each new grouping of annotations we assign For each group of annotations we must assign them all the same name, either from To reduce the running time Args: gropued_oldnames (list): A group of old names where the grouping is based on new names. For instance: Given: aids = [1, 2, 3, 4, 5] old_names = [0, 1, 1, 1, 0] new_names = [0, 0, 1, 1, 0] The grouping is [[0, 1, 0], [1, 1]] This lets us keep the old names in a split case and re-use exising names and make minimal changes to current annotation names while still being consistent with the new and improved grouping. The output will be: [0, 1] Meaning that all annots in the first group are assigned the name 0 and all annots in the second group are assigned the name 1. References: http://stackoverflow.com/questions/1398822/assignment-problem-numpy Example: >>> grouped_oldnames = demodata_oldnames(25, 15, 5, n_per_incon=5) >>> new_names = find_consistent_labeling(grouped_oldnames, verbose=1) >>> grouped_oldnames = demodata_oldnames(0, 15, 5, n_per_incon=1) >>> new_names = find_consistent_labeling(grouped_oldnames, verbose=1) >>> grouped_oldnames = demodata_oldnames(0, 0, 0, n_per_incon=1) >>> new_names = find_consistent_labeling(grouped_oldnames, verbose=1) Example: >>> # xdoctest: +REQUIRES(module:timerit) >>> import timerit >>> ydata = [] >>> xdata = list(range(10, 150, 50)) >>> for x in xdata: >>> print('x = %r' % (x,)) >>> grouped_oldnames = demodata_oldnames(x, 15, 5, n_per_incon=5) >>> t = timerit.Timerit(3, verbose=1) >>> for timer in t: >>> with timer: >>> new_names = find_consistent_labeling(grouped_oldnames) >>> ydata.append(t.min()) >>> # xdoc: +REQUIRES(--show) >>> import plottool_ibeis as pt >>> pt.qtensure() >>> pt.multi_plot(xdata, [ydata]) >>> util.show_if_requested() Example: >>> grouped_oldnames = [['a', 'b', 'c'], ['b', 'c'], ['c', 'e', 'e']] >>> new_names = find_consistent_labeling(grouped_oldnames, verbose=1) >>> result = ub.urepr(new_names) >>> print(new_names) ['a', 'b', 'e'] Example: >>> grouped_oldnames = [['a', 'b'], ['a', 'a', 'b'], ['a']] >>> new_names = find_consistent_labeling(grouped_oldnames) >>> result = ub.urepr(new_names) >>> print(new_names) ['b', 'a', '_extra_name0'] Example: >>> grouped_oldnames = [['a', 'b'], ['e'], ['a', 'a', 'b'], [], ['a'], ['d']] >>> new_names = find_consistent_labeling(grouped_oldnames) >>> result = ub.urepr(new_names) >>> print(new_names) ['b', 'e', 'a', '_extra_name0', '_extra_name1', 'd'] Example: >>> grouped_oldnames = [[], ['a', 'a'], [], >>> ['a', 'a', 'a', 'a', 'a', 'a', 'a', 'b'], ['a']] >>> new_names = find_consistent_labeling(grouped_oldnames) >>> result = ub.urepr(new_names) >>> print(new_names) ['_extra_name0', 'a', '_extra_name1', 'b', '_extra_name2'] """ unique_old_names = list(ub.unique(ub.flatten(grouped_oldnames))) n_old_names = len(unique_old_names) n_new_names = len(grouped_oldnames) # Initialize assignment to all Nones assignment = [None for _ in range(n_new_names)] if verbose: print('finding maximally consistent labeling') print('n_old_names = %r' % (n_old_names,)) print('n_new_names = %r' % (n_new_names,)) # For each old_name, determine now many new_names use it. oldname_sets = list(map(set, grouped_oldnames)) oldname_usage = ub.dict_hist(ub.flatten(oldname_sets)) # Any name used more than once is a conflict and must be resolved conflict_oldnames = {k for k, v in oldname_usage.items() if v > 1} # Partition into trivial and non-trivial cases nontrivial_oldnames = [] nontrivial_new_idxs = [] trivial_oldnames = [] trivial_new_idxs = [] for new_idx, group in enumerate(grouped_oldnames): if set(group).intersection(conflict_oldnames): nontrivial_oldnames.append(group) nontrivial_new_idxs.append(new_idx) else: trivial_oldnames.append(group) trivial_new_idxs.append(new_idx) # Rectify trivial cases # Any new-name that does not share any of its old-names with other # new-names can be resolved trivially n_trivial_unchanged = 0 n_trivial_ignored = 0 n_trivial_merges = 0 for group, new_idx in zip(trivial_oldnames, trivial_new_idxs): if len(group) > 0: # new-names that use more than one old-name are simple merges h = ub.dict_hist(group) if len(h) > 1: n_trivial_merges += 1 else: n_trivial_unchanged += 1 hitems = list(h.items()) hvals = [i[1] for i in hitems] maxval = max(hvals) g = min([k for k, v in hitems if v == maxval]) assignment[new_idx] = g else: # new-names that use no old-names can be ignored n_trivial_ignored += 1 if verbose: n_trivial = len(trivial_oldnames) n_nontrivial = len(nontrivial_oldnames) print('rectify %d trivial groups' % (n_trivial,)) print(' * n_trivial_unchanged = %r' % (n_trivial_unchanged,)) print(' * n_trivial_merges = %r' % (n_trivial_merges,)) print(' * n_trivial_ignored = %r' % (n_trivial_ignored,)) print('rectify %d non-trivial groups' % (n_nontrivial,)) # Partition nontrivial_oldnames into smaller disjoint sets nontrivial_oldnames_sets = list(map(set, nontrivial_oldnames)) import networkx as nx g = nx.Graph() g.add_nodes_from(range(len(nontrivial_oldnames_sets))) for u, group1 in enumerate(nontrivial_oldnames_sets): rest = nontrivial_oldnames_sets[u + 1:] for v, group2 in enumerate(rest, start=u + 1): if group1.intersection(group2): g.add_edge(u, v) nontrivial_partition = list(nx.connected_components(g)) if verbose: print(' * partitioned non-trivial into %d subgroups' % (len(nontrivial_partition))) from graphid import util part_size_stats = util.stats_dict(map(len, nontrivial_partition)) stats_str = ub.urepr(part_size_stats, precision=2, strkeys=True) print(' * partition size stats = %s' % (stats_str,)) # Rectify nontrivial cases for part_idxs in ub.ProgIter(nontrivial_partition, desc='rectify parts', enabled=verbose): part_oldnames = list(ub.take(nontrivial_oldnames, part_idxs)) part_newidxs = list(ub.take(nontrivial_new_idxs, part_idxs)) # Rectify this part assignment_ = simple_munkres(part_oldnames) for new_idx, new_name in zip(part_newidxs, assignment_): assignment[new_idx] = new_name # Any unassigned name is now given a new unique label with a prefix if extra_prefix is not None: num_extra = 0 for idx, val in enumerate(assignment): if val is None: assignment[idx] = '%s%d' % (extra_prefix, num_extra,) num_extra += 1 return assignment
if __name__ == '__main__': """ CommandLine: python -m graphid.util.name_rectifier all """ import xdoctest xdoctest.doctest_module(__file__)