Source code for graphid.util.nx_utils

import numpy as np
import networkx as nx
import itertools as it
import ubelt as ub
import pandas as pd
import functools
import collections


[docs] def _dz(a, b): a = a.tolist() if isinstance(a, np.ndarray) else list(a) b = b.tolist() if isinstance(b, np.ndarray) else list(b) return ub.dzip(a, b)
[docs] def nx_source_nodes(graph): for node in graph.nodes(): if graph.in_degree(node) == 0: yield node
[docs] def nx_sink_nodes(graph): for node in graph.nodes(): if graph.out_degree(node) == 0: yield node
[docs] def take_column(list_, colx): r""" accepts a list of (indexables) and returns a list of indexables can also return a list of list of indexables if colx is a list Args: list_ (list): list of lists colx (int or list): index or key in each sublist get item Returns: list: list of selected items Example0: >>> list_ = [['a', 'b'], ['c', 'd']] >>> colx = 0 >>> result = take_column(list_, colx) >>> result = ub.urepr(result, nl=False) >>> print(result) ['a', 'c'] Example1: >>> list_ = [['a', 'b'], ['c', 'd']] >>> colx = [1, 0] >>> result = take_column(list_, colx) >>> result = ub.urepr(result, nl=False) >>> print(result) [['b', 'a'], ['d', 'c']] Example2: >>> list_ = [{'spam': 'EGGS', 'ham': 'SPAM'}, {'spam': 'JAM', 'ham': 'PRAM'}] >>> # colx can be a key or list of keys as well >>> colx = ['spam'] >>> result = take_column(list_, colx) >>> result = ub.urepr(result, nl=False) >>> print(result) [['EGGS'], ['JAM']] """ return list(itake_column(list_, colx))
[docs] def dict_take_column(list_of_dicts_, colkey, default=None): return [dict_.get(colkey, default) for dict_ in list_of_dicts_]
[docs] def itake_column(list_, colx): """ iterator version of get_list_column """ if isinstance(colx, list): # multi select return ([row[colx_] for colx_ in colx] for row in list_) else: return (row[colx] for row in list_)
[docs] def list_roll(list_, n): """ Like numpy.roll for python lists Args: list_ (list): n (int): Returns: list: References: http://stackoverflow.com/questions/9457832/python-list-rotation Example: >>> list_ = [1, 2, 3, 4, 5] >>> n = 2 >>> result = list_roll(list_, n) >>> print(result) [4, 5, 1, 2, 3] Ignore: np.roll(list_, n) """ return list_[-n:] + list_[:-n]
[docs] def diag_product(s1, s2): """ Does product, but iterates over the diagonal first """ s1 = list(s1) s2 = list(s2) if len(s1) > len(s2): for _ in range(len(s1)): for a, b in zip(s1, s2): yield (a, b) s1 = list_roll(s1, 1) else: for _ in range(len(s2)): for a, b in zip(s1, s2): yield (a, b) s2 = list_roll(s2, 1)
[docs] def e_(u, v): return (u, v) if u < v else (v, u)
[docs] def edges_inside(graph, nodes): """ Finds edges within a set of nodes Running time is O(len(nodes) ** 2) Args: graph (nx.Graph): an undirected graph nodes1 (set): a set of nodes """ result = set([]) upper = nodes.copy() graph_adj = graph.adj for u in nodes: for v in upper.intersection(graph_adj[u]): result.add(e_(u, v)) upper.remove(u) return result
[docs] def edges_outgoing(graph, nodes): """ Finds edges leaving a set of nodes. Average running time is O(len(nodes) * ave_degree(nodes)) Worst case running time is O(G.number_of_edges()). Args: graph (nx.Graph): a graph nodes (set): set of nodes Example: >>> G = demodata_bridge() >>> nodes = {1, 2, 3, 4} >>> outgoing = edges_outgoing(G, nodes) >>> assert outgoing == {(3, 5), (4, 8)} """ if not isinstance(nodes, set): nodes = set(nodes) return {e_(u, v) for u in nodes for v in graph.adj[u] if v not in nodes}
[docs] def edges_cross(graph, nodes1, nodes2): """ Finds edges between two sets of disjoint nodes. Running time is O(len(nodes1) * len(nodes2)) Args: graph (nx.Graph): an undirected graph nodes1 (set): set of nodes disjoint from `nodes2` nodes2 (set): set of nodes disjoint from `nodes1`. """ return {e_(u, v) for u in nodes1 for v in nodes2.intersection(graph.adj[u])}
[docs] def edges_between(graph, nodes1, nodes2=None, assume_disjoint=False, assume_dense=True): r""" Get edges between two components or within a single component Args: graph (nx.Graph): the graph nodes1 (set): list of nodes nodes2 (set): if None it is equivlanet to nodes2=nodes1 (default=None) assume_disjoint (bool): skips expensive check to ensure edges arnt returned twice (default=False) Example: >>> edges = [ >>> (1, 2), (2, 3), (3, 4), (4, 1), (4, 3), # cc 1234 >>> (1, 5), (7, 2), (5, 1), # cc 567 / 5678 >>> (7, 5), (5, 6), (8, 7), >>> ] >>> digraph = nx.DiGraph(edges) >>> graph = nx.Graph(edges) >>> nodes1 = [1, 2, 3, 4] >>> nodes2 = [5, 6, 7] >>> n2 = sorted(edges_between(graph, nodes1, nodes2)) >>> n4 = sorted(edges_between(graph, nodes1)) >>> n5 = sorted(edges_between(graph, nodes1, nodes1)) >>> n1 = sorted(edges_between(digraph, nodes1, nodes2)) >>> n3 = sorted(edges_between(digraph, nodes1)) >>> print('n2 == %r' % (n2,)) >>> print('n4 == %r' % (n4,)) >>> print('n5 == %r' % (n5,)) >>> print('n1 == %r' % (n1,)) >>> print('n3 == %r' % (n3,)) >>> assert n2 == ([(1, 5), (2, 7)]), '2' >>> assert n4 == ([(1, 2), (1, 4), (2, 3), (3, 4)]), '4' >>> assert n5 == ([(1, 2), (1, 4), (2, 3), (3, 4)]), '5' >>> assert n1 == ([(1, 5), (5, 1), (7, 2)]), '1' >>> assert n3 == ([(1, 2), (2, 3), (3, 4), (4, 1), (4, 3)]), '3' >>> n6 = sorted(edges_between(digraph, nodes1 + [6], nodes2 + [1, 2], assume_dense=False)) >>> print('n6 = %r' % (n6,)) >>> n6 = sorted(edges_between(digraph, nodes1 + [6], nodes2 + [1, 2], assume_dense=True)) >>> print('n6 = %r' % (n6,)) >>> assert n6 == ([(1, 2), (1, 5), (2, 3), (4, 1), (5, 1), (5, 6), (7, 2)]), '6' """ if assume_dense: edges = _edges_between_dense(graph, nodes1, nodes2, assume_disjoint) else: edges = _edges_between_sparse(graph, nodes1, nodes2, assume_disjoint) if graph.is_directed(): for u, v in edges: yield u, v else: for u, v in edges: yield e_(u, v)
[docs] def _edges_between_dense(graph, nodes1, nodes2=None, assume_disjoint=False): """ The dense method is where we enumerate all possible edges and just take the ones that exist (faster for very dense graphs) """ if nodes2 is None or nodes2 is nodes1: # Case where we are looking at internal nodes only edge_iter = it.combinations(nodes1, 2) elif assume_disjoint: # We assume len(isect(nodes1, nodes2)) == 0 edge_iter = it.product(nodes1, nodes2) else: # make sure a single edge is not returned twice # in the case where len(isect(nodes1, nodes2)) > 0 if not isinstance(nodes1, set): nodes1 = set(nodes1) if not isinstance(nodes2, set): nodes2 = set(nodes2) nodes_isect = nodes1.intersection(nodes2) nodes_only1 = nodes1 - nodes_isect nodes_only2 = nodes2 - nodes_isect edge_sets = [it.product(nodes_only1, nodes_only2), it.product(nodes_only1, nodes_isect), it.product(nodes_only2, nodes_isect), it.combinations(nodes_isect, 2)] edge_iter = it.chain.from_iterable(edge_sets) if graph.is_directed(): for n1, n2 in edge_iter: if graph.has_edge(n1, n2): yield n1, n2 if graph.has_edge(n2, n1): yield n2, n1 else: for n1, n2 in edge_iter: if graph.has_edge(n1, n2): yield n1, n2
[docs] def _edges_inside_lower(graph, both_adj): """ finds lower triangular edges inside the nodes """ both_lower = set([]) for u, neighbs in both_adj.items(): neighbsBB_lower = neighbs.intersection(both_lower) for v in neighbsBB_lower: yield (u, v) both_lower.add(u)
[docs] def _edges_inside_upper(graph, both_adj): """ finds upper triangular edges inside the nodes """ both_upper = set(both_adj.keys()) for u, neighbs in both_adj.items(): neighbsBB_upper = neighbs.intersection(both_upper) for v in neighbsBB_upper: yield (u, v) both_upper.remove(u)
[docs] def _edges_between_disjoint(graph, only1_adj, only2): """ finds edges between disjoint nodes """ for u, neighbs in only1_adj.items(): # Find the neighbors of u in only1 that are also in only2 neighbs12 = neighbs.intersection(only2) for v in neighbs12: yield (u, v)
[docs] def _edges_between_sparse(graph, nodes1, nodes2=None, assume_disjoint=False): """ In this version we check the intersection of existing edges and the edges in the second set (faster for sparse graphs) """ # Notes: # 1 = edges only in `nodes1` # 2 = edges only in `nodes2` # B = edges only in both `nodes1` and `nodes2` # Test for special cases if nodes2 is None or nodes2 is nodes1: # Case where we just are finding internal edges both = set(nodes1) both_adj = {u: set(graph.adj[u]) for u in both} if graph.is_directed(): edge_sets = ( _edges_inside_upper(graph, both_adj), # B-to-B (u) _edges_inside_lower(graph, both_adj), # B-to-B (l) ) else: edge_sets = ( _edges_inside_upper(graph, both_adj), # B-to-B (u) ) elif assume_disjoint: # Case where we find edges between disjoint sets if not isinstance(nodes1, set): nodes1 = set(nodes1) if not isinstance(nodes2, set): nodes2 = set(nodes2) only1 = nodes1 only2 = nodes2 if graph.is_directed(): only1_adj = {u: set(graph.adj[u]) for u in only1} only2_adj = {u: set(graph.adj[u]) for u in only2} edge_sets = ( _edges_between_disjoint(graph, only1, only2), # 1-to-2 _edges_between_disjoint(graph, only2, only1), # 2-to-1 ) else: only1_adj = {u: set(graph.adj[u]) for u in only1} edge_sets = ( _edges_between_disjoint(graph, only1, only2), # 1-to-2 ) else: # Full general case if not isinstance(nodes1, set): nodes1 = set(nodes1) if nodes2 is None: nodes2 = nodes1 elif not isinstance(nodes2, set): nodes2 = set(nodes2) both = nodes1.intersection(nodes2) only1 = nodes1 - both only2 = nodes2 - both # Precompute all calls to set(graph.adj[u]) to avoid duplicate calls only1_adj = {u: set(graph.adj[u]) for u in only1} only2_adj = {u: set(graph.adj[u]) for u in only2} both_adj = {u: set(graph.adj[u]) for u in both} if graph.is_directed(): edge_sets = ( _edges_between_disjoint(graph, only1_adj, only2), # 1-to-2 _edges_between_disjoint(graph, only1_adj, both), # 1-to-B _edges_inside_upper(graph, both_adj), # B-to-B (u) _edges_inside_lower(graph, both_adj), # B-to-B (l) _edges_between_disjoint(graph, both_adj, only1), # B-to-1 _edges_between_disjoint(graph, both_adj, only2), # B-to-2 _edges_between_disjoint(graph, only2_adj, both), # 2-to-B _edges_between_disjoint(graph, only2_adj, only1), # 2-to-1 ) else: edge_sets = ( _edges_between_disjoint(graph, only1_adj, only2), # 1-to-2 _edges_between_disjoint(graph, only1_adj, both), # 1-to-B _edges_inside_upper(graph, both_adj), # B-to-B (u) _edges_between_disjoint(graph, only2_adj, both), # 2-to-B ) for u, v in it.chain.from_iterable(edge_sets): yield u, v
[docs] def group_name_edges(g, node_to_label): ne_to_edges = ub.ddict(set) for u, v in g.edges(): name_edge = e_(node_to_label[u], node_to_label[v]) ne_to_edges[name_edge].add(e_(u, v)) return ne_to_edges
[docs] def ensure_multi_index(index, names): import pandas as pd if not isinstance(index, (pd.MultiIndex, pd.Index)): names = ('aid1', 'aid2') if len(index) == 0: index = pd.MultiIndex([[], []], [[], []], names=names) else: index = pd.MultiIndex.from_tuples(index, names=names) return index
[docs] def demodata_bridge(): # define 2-connected compoments and bridges cc2 = [(1, 2, 4, 3, 1, 4), (8, 9, 10, 8), (11, 12, 13, 11)] bridges = [(4, 8), (3, 5), (20, 21), (22, 23, 24)] G = nx.Graph(ub.flatten(ub.iter_window(path, 2) for path in cc2 + bridges)) return G
[docs] def demodata_tarjan_bridge(): """ Example: >>> from graphid import util >>> G = demodata_tarjan_bridge() >>> # xdoc: +REQUIRES(--show) >>> util.show_nx(G) >>> util.show_if_requested() """ # define 2-connected compoments and bridges cc2 = [(1, 2, 4, 3, 1, 4), (5, 6, 7, 5), (8, 9, 10, 8), (17, 18, 16, 15, 17), (11, 12, 14, 13, 11, 14)] bridges = [(4, 8), (3, 5), (3, 17)] G = nx.Graph(ub.flatten(ub.iter_window(path, 2) for path in cc2 + bridges)) return G
# def is_tri_edge_connected(G): # """ # Yet another Simple Algorithm for Triconnectivity # http://www.sciencedirect.com/science/article/pii/S1570866708000415 # """ # pass
[docs] def is_k_edge_connected(G, k): return nx.is_k_edge_connected(G, k)
[docs] def complement_edges(G): from networkx.algorithms.connectivity.edge_augmentation import complement_edges return it.starmap(e_, complement_edges(G))
[docs] def k_edge_augmentation(G, k, avail=None, partial=False): return it.starmap(e_, nx.k_edge_augmentation(G, k, avail=avail, partial=partial))
[docs] def is_complete(G, self_loops=False): assert not G.is_multigraph() n_edges = G.number_of_edges() n_nodes = G.number_of_nodes() if G.is_directed(): n_need = (n_nodes * (n_nodes - 1)) else: n_need = (n_nodes * (n_nodes - 1)) // 2 if self_loops: n_need += n_nodes return n_edges == n_need
[docs] def random_k_edge_connected_graph(size, k, p=.1, rng=None): """ Super hacky way of getting a random k-connected graph Example: >>> from graphid import util >>> size, k, p = 25, 3, .1 >>> rng = util.ensure_rng(0) >>> gs = [] >>> for x in range(4): >>> G = random_k_edge_connected_graph(size, k, p, rng) >>> gs.append(G) >>> # xdoc: +REQUIRES(--show) >>> pnum_ = util.PlotNums(nRows=2, nSubplots=len(gs)) >>> fnum = 1 >>> for g in gs: >>> util.show_nx(g, fnum=fnum, pnum=pnum_()) """ for count in it.count(0): seed = None if rng is None else rng.randint((2 ** 31 - 1)) # Randomly generate a graph g = nx.fast_gnp_random_graph(size, p, seed=seed) conn = nx.edge_connectivity(g) # If it has exactly the desired connectivity we are one if conn == k: break # If it has more, then we regenerate the graph with fewer edges elif conn > k: p = p / 2 # If it has less then we add a small set of edges to get there elif conn < k: # p = 2 * p - p ** 2 # if count == 2: aug_edges = list(k_edge_augmentation(g, k)) g.add_edges_from(aug_edges) break return g
[docs] def edge_df(graph, edges, ignore=None): edge_dict = {e: graph.get_edge_data(*e) for e in edges} df = pd.DataFrame.from_dict(edge_dict, orient='index') if len(df): if ignore: ignore = df.columns.intersection(ignore) df = df.drop(ignore, axis=1) try: df.index.names = ('u', 'v') except Exception: pass return df
[docs] def nx_delete_node_attr(graph, name, nodes=None): """ Removes node attributes Doctest: >>> G = nx.karate_club_graph() >>> nx.set_node_attributes(G, name='foo', values='bar') >>> datas = nx.get_node_attributes(G, 'club') >>> assert len(nx.get_node_attributes(G, 'club')) == 34 >>> assert len(nx.get_node_attributes(G, 'foo')) == 34 >>> nx_delete_node_attr(G, ['club', 'foo'], nodes=[1, 2]) >>> assert len(nx.get_node_attributes(G, 'club')) == 32 >>> assert len(nx.get_node_attributes(G, 'foo')) == 32 >>> nx_delete_node_attr(G, ['club']) >>> assert len(nx.get_node_attributes(G, 'club')) == 0 >>> assert len(nx.get_node_attributes(G, 'foo')) == 32 """ if nodes is None: nodes = list(graph.nodes()) removed = 0 # names = [name] if not isinstance(name, list) else name node_dict = graph.nodes if isinstance(name, list): for node in nodes: for name_ in name: try: del node_dict[node][name_] removed += 1 except KeyError: pass else: for node in nodes: try: del node_dict[node][name] removed += 1 except KeyError: pass return removed
[docs] def nx_delete_edge_attr(graph, name, edges=None): """ Removes an attributes from specific edges in the graph Doctest: >>> G = nx.karate_club_graph() >>> nx.set_edge_attributes(G, name='spam', values='eggs') >>> nx.set_edge_attributes(G, name='foo', values='bar') >>> assert len(nx.get_edge_attributes(G, 'spam')) == 78 >>> assert len(nx.get_edge_attributes(G, 'foo')) == 78 >>> nx_delete_edge_attr(G, ['spam', 'foo'], edges=[(1, 2)]) >>> assert len(nx.get_edge_attributes(G, 'spam')) == 77 >>> assert len(nx.get_edge_attributes(G, 'foo')) == 77 >>> nx_delete_edge_attr(G, ['spam']) >>> assert len(nx.get_edge_attributes(G, 'spam')) == 0 >>> assert len(nx.get_edge_attributes(G, 'foo')) == 77 Doctest: >>> G = nx.MultiGraph() >>> G.add_edges_from([(1, 2), (2, 3), (3, 4), (4, 5), (4, 5), (1, 2)]) >>> nx.set_edge_attributes(G, name='spam', values='eggs') >>> nx.set_edge_attributes(G, name='foo', values='bar') >>> assert len(nx.get_edge_attributes(G, 'spam')) == 6 >>> assert len(nx.get_edge_attributes(G, 'foo')) == 6 >>> nx_delete_edge_attr(G, ['spam', 'foo'], edges=[(1, 2, 0)]) >>> assert len(nx.get_edge_attributes(G, 'spam')) == 5 >>> assert len(nx.get_edge_attributes(G, 'foo')) == 5 >>> nx_delete_edge_attr(G, ['spam']) >>> assert len(nx.get_edge_attributes(G, 'spam')) == 0 >>> assert len(nx.get_edge_attributes(G, 'foo')) == 5 """ removed = 0 keys = [name] if not isinstance(name, (list, tuple)) else name if edges is None: if graph.is_multigraph(): edges = graph.edges(keys=True) else: edges = graph.edges() if graph.is_multigraph(): for u, v, k in edges: for key_ in keys: try: del graph[u][v][k][key_] removed += 1 except KeyError: pass else: for u, v in edges: for key_ in keys: try: del graph[u][v][key_] removed += 1 except KeyError: pass return removed
[docs] def nx_gen_node_values(G, key, nodes, default=ub.NoParam): """ Generates attributes values of specific nodes """ node_dict = G.nodes if default is ub.NoParam: return (node_dict[n][key] for n in nodes) else: return (node_dict[n].get(key, default) for n in nodes)
[docs] def nx_gen_node_attrs(G, key, nodes=None, default=ub.NoParam, on_missing='error', on_keyerr='default'): """ Improved generator version of nx.get_node_attributes Args: on_missing (str): Strategy for handling nodes missing from G. Can be {'error', 'default', 'filter'}. defaults to 'error'. on_keyerr (str): Strategy for handling keys missing from node dicts. Can be {'error', 'default', 'filter'}. defaults to 'default' if default is specified, otherwise defaults to 'error'. Notes: strategies are: error - raises an error if key or node does not exist default - returns node, but uses value specified by default filter - skips the node Example: >>> # ENABLE_DOCTEST >>> from graphid import util >>> G = nx.Graph([(1, 2), (2, 3)]) >>> nx.set_node_attributes(G, name='part', values={1: 'bar', 3: 'baz'}) >>> nodes = [1, 2, 3, 4] >>> # >>> assert len(list(nx_gen_node_attrs(G, 'part', default=None, on_missing='error', on_keyerr='default'))) == 3 >>> assert len(list(nx_gen_node_attrs(G, 'part', default=None, on_missing='error', on_keyerr='filter'))) == 2 >>> assert_raises(KeyError, list, nx_gen_node_attrs(G, 'part', on_missing='error', on_keyerr='error')) >>> # >>> assert len(list(nx_gen_node_attrs(G, 'part', nodes, default=None, on_missing='filter', on_keyerr='default'))) == 3 >>> assert len(list(nx_gen_node_attrs(G, 'part', nodes, default=None, on_missing='filter', on_keyerr='filter'))) == 2 >>> assert_raises(KeyError, list, nx_gen_node_attrs(G, 'part', nodes, on_missing='filter', on_keyerr='error')) >>> # >>> assert len(list(nx_gen_node_attrs(G, 'part', nodes, default=None, on_missing='default', on_keyerr='default'))) == 4 >>> assert len(list(nx_gen_node_attrs(G, 'part', nodes, default=None, on_missing='default', on_keyerr='filter'))) == 2 >>> assert_raises(KeyError, list, nx_gen_node_attrs(G, 'part', nodes, on_missing='default', on_keyerr='error')) Example: >>> # DISABLE_DOCTEST >>> # ALL CASES >>> from graphid import util >>> G = nx.Graph([(1, 2), (2, 3)]) >>> nx.set_node_attributes(G, name='full', values={1: 'A', 2: 'B', 3: 'C'}) >>> nx.set_node_attributes(G, name='part', values={1: 'bar', 3: 'baz'}) >>> nodes = [1, 2, 3, 4] >>> attrs = dict(nx_gen_node_attrs(G, 'full')) >>> input_grid = { >>> 'nodes': [None, (1, 2, 3, 4)], >>> 'key': ['part', 'full'], >>> 'default': [ub.NoParam, None], >>> } >>> inputs = util.all_dict_combinations(input_grid) >>> kw_grid = { >>> 'on_missing': ['error', 'default', 'filter'], >>> 'on_keyerr': ['error', 'default', 'filter'], >>> } >>> kws = util.all_dict_combinations(kw_grid) >>> for in_ in inputs: >>> for kw in kws: >>> kw2 = ub.dict_union(kw, in_) >>> #print(kw2) >>> on_missing = kw['on_missing'] >>> on_keyerr = kw['on_keyerr'] >>> if on_keyerr == 'default' and in_['default'] is ub.NoParam: >>> on_keyerr = 'error' >>> will_miss = False >>> will_keyerr = False >>> if on_missing == 'error': >>> if in_['key'] == 'part' and in_['nodes'] is not None: >>> will_miss = True >>> if in_['key'] == 'full' and in_['nodes'] is not None: >>> will_miss = True >>> if on_keyerr == 'error': >>> if in_['key'] == 'part': >>> will_keyerr = True >>> if on_missing == 'default': >>> if in_['key'] == 'full' and in_['nodes'] is not None: >>> will_keyerr = True >>> want_error = will_miss or will_keyerr >>> gen = nx_gen_node_attrs(G, **kw2) >>> try: >>> attrs = list(gen) >>> except KeyError: >>> if not want_error: >>> raise AssertionError('should not have errored') >>> else: >>> if want_error: >>> raise AssertionError('should have errored') """ if on_missing is None: on_missing = 'error' if default is ub.NoParam and on_keyerr == 'default': on_keyerr = 'error' if nodes is None: nodes = G.nodes() # Generate `node_data` nodes and data dictionary node_dict = G.nodes if on_missing == 'error': node_data = ((n, node_dict[n]) for n in nodes) elif on_missing == 'filter': node_data = ((n, node_dict[n]) for n in nodes if n in G) elif on_missing == 'default': node_data = ((n, node_dict.get(n, {})) for n in nodes) else: raise KeyError('on_missing={} must be error, filter or default'.format( on_missing)) # Get `node_attrs` desired value out of dictionary if on_keyerr == 'error': node_attrs = ((n, d[key]) for n, d in node_data) elif on_keyerr == 'filter': node_attrs = ((n, d[key]) for n, d in node_data if key in d) elif on_keyerr == 'default': node_attrs = ((n, d.get(key, default)) for n, d in node_data) else: raise KeyError('on_keyerr={} must be error filter or default'.format(on_keyerr)) return node_attrs
[docs] def graph_info(graph, ignore=None, stats=False, verbose=False): from graphid import util import pandas as pd node_dict = graph.nodes node_attrs = list(node_dict.values()) edge_attrs = list(take_column(graph.edges(data=True), 2)) if stats: node_df = pd.DataFrame(node_attrs) edge_df = pd.DataFrame(edge_attrs) if ignore is not None: util.delete_dict_keys(node_df, ignore) util.delete_dict_keys(edge_df, ignore) # Not really histograms anymore try: node_attr_hist = node_df.describe().to_dict() except ValueError: node_attr_hist try: edge_attr_hist = edge_df.describe().to_dict() except ValueError: edge_attr_hist = {} key_order = ['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max'] node_attr_hist = ub.map_dict_vals(lambda x: util.order_dict_by(x, key_order), node_attr_hist) edge_attr_hist = ub.map_dict_vals(lambda x: util.order_dict_by(x, key_order), edge_attr_hist) else: node_attr_hist = ub.dict_hist(ub.flatten([attr.keys() for attr in node_attrs])) edge_attr_hist = ub.dict_hist(ub.flatten([attr.keys() for attr in edge_attrs])) if ignore is not None: util.delete_dict_keys(edge_attr_hist, ignore) util.delete_dict_keys(node_attr_hist, ignore) node_type_hist = ub.dict_hist(list(map(type, graph.nodes()))) info_dict = ub.odict([ ('directed', graph.is_directed()), ('multi', graph.is_multigraph()), ('num_nodes', len(graph)), ('num_edges', len(list(graph.edges()))), ('edge_attr_hist', util.sort_dict(edge_attr_hist)), ('node_attr_hist', util.sort_dict(node_attr_hist)), ('node_type_hist', util.sort_dict(node_type_hist)), ('graph_attrs', graph.graph), ('graph_name', graph.name), ]) if verbose: print(ub.urepr(info_dict)) return info_dict
[docs] def assert_raises(ex_type, func, *args, **kwargs): """ Checks that a function raises an error when given specific arguments. Args: ex_type (Exception): exception type func (callable): live python function Example: >>> ex_type = AssertionError >>> func = len >>> assert_raises(ex_type, assert_raises, ex_type, func, []) >>> assert_raises(ValueError, [].index, 0) """ try: func(*args, **kwargs) except Exception as ex: assert isinstance(ex, ex_type), ( 'Raised %r but type should have been %r' % (ex, ex_type)) return True else: raise AssertionError('No error was raised')
[docs] def bfs_conditional(G, source, reverse=False, keys=True, data=False, yield_nodes=True, yield_if=None, continue_if=None, visited_nodes=None, yield_source=False): """ Produce edges in a breadth-first-search starting at source, but only return nodes that satisfiy a condition, and only iterate past a node if it satisfies a different condition. conditions are callables that take (G, child, edge) and return true or false Example: >>> import networkx as nx >>> G = nx.Graph() >>> G.add_edges_from([(1, 2), (1, 3), (2, 3), (2, 4)]) >>> continue_if = lambda G, child, edge: True >>> result = list(bfs_conditional(G, 1, yield_nodes=False)) >>> print(result) [(1, 2), (1, 3), (2, 1), (2, 3), (2, 4), (3, 1), (3, 2), (4, 2)] Example: >>> import networkx as nx >>> G = nx.Graph() >>> continue_if = lambda G, child, edge: (child % 2 == 0) >>> yield_if = lambda G, child, edge: (child % 2 == 1) >>> G.add_edges_from([(0, 1), (1, 3), (3, 5), (5, 10), >>> (4, 3), (3, 6), >>> (0, 2), (2, 4), (4, 6), (6, 10)]) >>> result = list(bfs_conditional(G, 0, continue_if=continue_if, >>> yield_if=yield_if)) >>> print(result) [1, 3, 5] """ if reverse and hasattr(G, 'reverse'): G = G.reverse() if isinstance(G, nx.Graph): neighbors = functools.partial(G.edges, data=data) else: neighbors = functools.partial(G.edges, keys=keys, data=data) queue = collections.deque([]) if visited_nodes is None: visited_nodes = set([]) else: visited_nodes = set(visited_nodes) if source not in visited_nodes: if yield_nodes and yield_source: yield source visited_nodes.add(source) new_edges = neighbors(source) if isinstance(new_edges, list): new_edges = iter(new_edges) queue.append((source, new_edges)) while queue: parent, edges = queue[0] for edge in edges: child = edge[1] if yield_nodes: if child not in visited_nodes: if yield_if is None or yield_if(G, child, edge): yield child else: if yield_if is None or yield_if(G, child, edge): yield edge if child not in visited_nodes: visited_nodes.add(child) # Add new children to queue if the condition is satisfied if continue_if is None or continue_if(G, child, edge): new_edges = neighbors(child) if isinstance(new_edges, list): new_edges = iter(new_edges) queue.append((child, new_edges)) queue.popleft()
[docs] def nx_gen_edge_attrs(G, key, edges=None, default=ub.NoParam, on_missing='error', on_keyerr='default'): """ Improved generator version of nx.get_edge_attributes Args: on_missing (str): Strategy for handling nodes missing from G. Can be {'error', 'default', 'filter'}. defaults to 'error'. is on_missing is not error, then we allow any edge even if the endpoints are not in the graph. on_keyerr (str): Strategy for handling keys missing from node dicts. Can be {'error', 'default', 'filter'}. defaults to 'default' if default is specified, otherwise defaults to 'error'. CommandLine: python -m graphid.util.nx_utils nx_gen_edge_attrs Example: >>> from graphid import util >>> from functools import partial >>> G = nx.Graph([(1, 2), (2, 3), (3, 4)]) >>> nx.set_edge_attributes(G, name='part', values={(1, 2): 'bar', (2, 3): 'baz'}) >>> edges = [(1, 2), (2, 3), (3, 4), (4, 5)] >>> func = partial(nx_gen_edge_attrs, G, 'part', default=None) >>> # >>> assert len(list(func(on_missing='error', on_keyerr='default'))) == 3 >>> assert len(list(func(on_missing='error', on_keyerr='filter'))) == 2 >>> util.assert_raises(KeyError, list, func(on_missing='error', on_keyerr='error')) >>> # >>> assert len(list(func(edges, on_missing='filter', on_keyerr='default'))) == 3 >>> assert len(list(func(edges, on_missing='filter', on_keyerr='filter'))) == 2 >>> util.assert_raises(KeyError, list, func(edges, on_missing='filter', on_keyerr='error')) >>> # >>> assert len(list(func(edges, on_missing='default', on_keyerr='default'))) == 4 >>> assert len(list(func(edges, on_missing='default', on_keyerr='filter'))) == 2 >>> util.assert_raises(KeyError, list, func(edges, on_missing='default', on_keyerr='error')) """ if on_missing is None: on_missing = 'error' if default is ub.NoParam and on_keyerr == 'default': on_keyerr = 'error' if edges is None: if G.is_multigraph(): raise NotImplementedError('') # uvk_iter = G.edges(keys=True) else: edges = G.edges() # Generate `edge_data` edges and data dictionary if on_missing == 'error': edge_data = (((u, v), G.adj[u][v]) for u, v in edges) elif on_missing == 'filter': edge_data = (((u, v), G.adj[u][v]) for u, v in edges if G.has_edge(u, v)) elif on_missing == 'default': edge_data = (((u, v), G.adj[u][v]) if G.has_edge(u, v) else ((u, v), {}) for u, v in edges) else: raise KeyError('on_missing={}'.format(on_missing)) # Get `edge_attrs` desired value out of dictionary if on_keyerr == 'error': edge_attrs = ((e, d[key]) for e, d in edge_data) elif on_keyerr == 'filter': edge_attrs = ((e, d[key]) for e, d in edge_data if key in d) elif on_keyerr == 'default': edge_attrs = ((e, d.get(key, default)) for e, d in edge_data) else: raise KeyError('on_keyerr={}'.format(on_keyerr)) return edge_attrs
[docs] def nx_gen_edge_values(G, key, edges=None, default=ub.NoParam, on_missing='error', on_keyerr='default'): """ Generates attributes values of specific edges Args: on_missing (str): Strategy for handling nodes missing from G. Can be {'error', 'default'}. defaults to 'error'. on_keyerr (str): Strategy for handling keys missing from node dicts. Can be {'error', 'default'}. defaults to 'default' if default is specified, otherwise defaults to 'error'. """ if edges is None: edges = G.edges() if on_missing is None: on_missing = 'error' if on_keyerr is None: on_keyerr = 'default' if default is ub.NoParam and on_keyerr == 'default': on_keyerr = 'error' # Generate `data_iter` edges and data dictionary if on_missing == 'error': data_iter = (G.adj[u][v] for u, v in edges) elif on_missing == 'default': data_iter = (G.adj[u][v] if G.has_edge(u, v) else {} for u, v in edges) else: raise KeyError('on_missing={} must be error, filter or default'.format( on_missing)) # Get `value_iter` desired value out of dictionary if on_keyerr == 'error': value_iter = (d[key] for d in data_iter) elif on_keyerr == 'default': value_iter = (d.get(key, default) for d in data_iter) else: raise KeyError('on_keyerr={} must be error or default'.format(on_keyerr)) return value_iter
# if default is ub.NoParam:
[docs] def nx_edges(graph, keys=False, data=False): if graph.is_multigraph(): edges = graph.edges(keys=keys, data=data) else: edges = graph.edges(data=data) #if keys: # edges = [e[0:2] + (0,) + e[:2] for e in edges] return edges
[docs] def nx_delete_None_edge_attr(graph, edges=None): removed = 0 if graph.is_multigraph(): if edges is None: edges = list(graph.edges(keys=graph.is_multigraph())) for edge in edges: u, v, k = edge data = graph[u][v][k] for key in list(data.keys()): try: if data[key] is None: del data[key] removed += 1 except KeyError: pass else: if edges is None: edges = list(graph.edges()) for edge in graph.edges(): u, v = edge data = graph[u][v] for key in list(data.keys()): try: if data[key] is None: del data[key] removed += 1 except KeyError: pass return removed
[docs] def nx_delete_None_node_attr(graph, nodes=None): removed = 0 if nodes is None: nodes = list(graph.nodes()) for node in graph.nodes(): node_dict = graph.nodes data = node_dict[node] for key in list(data.keys()): try: if data[key] is None: del data[key] removed += 1 except KeyError: pass return removed
[docs] def nx_node_dict(G): print('Warning: use G.nodes instead') if nx.__version__.startswith('1'): return getattr(G, 'node') else: return G.nodes
if __name__ == '__main__': """ CommandLine: python ~/code/graphid/graphid/util/nx_utils.py all """ import xdoctest xdoctest.doctest_module(__file__)