"""
GFA representation through a networkx MulitGraph.
The dovetail operations are available thanks to
the dovetail_operation.Iterator class, that considers only
dovetail overlaps edges.
:TODO:
* Rewrite pprint method.
"""
import logging
import copy
import re
import networkx as nx
from networkx.classes.function import all_neighbors as nx_all_neighbors
from pygfa.graph_element.parser import header, segment, link, containment, path
from pygfa.graph_element.parser import edge, gap, fragment, group
from pygfa.graph_element import node, edge as ge, subgraph as sg
from pygfa.serializer import gfa1_serializer as gs1, gfa2_serializer as gs2
from pygfa.dovetail_operations.iterator import DovetailIterator
GRAPH_LOGGER = logging.getLogger(__name__)
[docs]class InvalidSearchParameters(Exception):
pass
[docs]class InvalidElementError(Exception):
pass
[docs]class GFAError(Exception):
pass
[docs]class Element:
"""Represent the types of graph a GFA graph object can have.
"""
NODE = 0
EDGE = 1
SUBGRAPH = 2
def _index(obj, other):
"""Given an object O and a list
of objects L check that exist an object O'
in the list such that O == O'.
:return True: If O' exists.
:return: The position of O' in the list.
"""
found = False
index = 0
max_len = len(other)
while not found and index < max_len:
if obj == other[index]:
found = True
else:
index += 1
return found, index
[docs]class GFA(DovetailIterator):
"""GFA will use a networkx MultiGraph as structure to contain
the elements of the specification.
GFA graphs directly accept only instances coming from the
graph_elements package, but can contains any kind of data
undirectly by accessing the `_graph` attribute.
"""
def __init__(self, base_graph=None):
"""Creates a GFA graph.
If :param base_graph: is not `None` use the graph provided
as base for the graph.
A `virtual id` is assigned to edges(graph edges) that don't
have an id.
Their id will be `virtual_#` where `#` will be given
by `next_virtual_id`.
:param base graph: An instance of a networkx.MultiGraph.
"""
if base_graph != None and not isinstance(base_graph, nx.MultiGraph):
raise GFAError("{0} ".format(type(base_graph)) \
+ "cannot be used as base " \
+ "graph, "\
+ "use networkx.MultiGraph instead.")
self._graph = nx.MultiGraph(base_graph)
self._subgraphs = {}
self._next_virtual_id = 0 if base_graph is None else \
self._find_max_virtual_id()
def __contains__(self, id_):
try:
if id_ in self._graph.node:
return True
edge_keys = (key for from_node in self._graph.adj \
for to_node in self._graph.adj[from_node] \
for key in self._graph.adj[from_node][to_node])
if id_ in edge_keys:
return True
if id_ in self._subgraphs:
return True
return False
except TypeError:
return False
[docs] def clear(self):
"""Clear all GFA object elements.
Call networkx `clear` method, reset the virtual id counter and
delete all the subgraphs.
"""
self._graph.clear()
self._next_virtual_id = 0
self._subgraphs = {}
def _get_virtual_id(self, increment=True):
"""Return the next virtual id value available.
:param increment: If set to False, the virtual id is not
incremented. Useful mainly in interactive mode.
"""
key = self._next_virtual_id
if increment:
self._next_virtual_id += 1
return key
def _find_max_virtual_id(self):
"""Traverse the graph to find the greatest virtual id value.
"""
# nodes cannot have a virtual_id, so don't search inside them
virtual_rxp = "^virtual_([\d]+)$"
regexp = re.compile(virtual_rxp)
virtual_keys = [0]
for from_, to_, key in self.edges_iter(keys=True):
match = regexp.fullmatch(key)
if match:
virtual_keys.append(int(match.group(1)))
for key, data in self._subgraphs.items():
match = regexp.fullmatch(key)
if match:
virtual_keys.append(int(match.group(1)))
return max(virtual_keys)
[docs] def nodes(self, data=False, with_sequence=False):
"""Return a list of the nodes in the graph.
:param with_sequence: If set return only nodes with
a `sequence` property.
"""
return list(self.nodes_iter(data=data, with_sequence=with_sequence))
[docs] def edges(self, **kwargs):
"""Return all the edges in the graph."""
return self._graph.edges(**kwargs)
[docs] def subgraphs(self, identifier=None):
"""An interface to access to the subgraphs inside
the GFA object.
If `identifier` is `None` all the graph Subgraph objects are
returned.
"""
if identifier is None:
return self._subgraphs
else:
if identifier in self._subgraphs:
return self._subgraphs[identifier]
[docs] def node(self, identifier=None):
"""An interface to access the node method of the netwrokx
graph.
If `identifier` is `None` all the graph nodes are returned.
"""
if identifier is None:
return self._graph.node
else:
if identifier in self._graph.node:
return self._graph.node[identifier]
[docs] def edge(self, identifier=None):
"""GFA edge accessor.
* If `identifier` is `None` all the graph edges are returned.
* If `identifier` is a tuple perform a search by nodes with
the tuple values as nodes id.
* If `identifier` is a single defined value then perform
a search by edge key, where the edge key is the given value.
"""
if identifier is None:
return self._graph.edge
if isinstance(identifier, tuple):
return self._search_edge_by_nodes(identifier)
return self._search_edge_by_key(identifier)
def _search_edge_by_key(self, edge_key):
from_node, to_node = self._get_edge_end_nodes(edge_key)
if (from_node, to_node) != (None, None):
return self._graph.edge[from_node][to_node][edge_key]
return None
def _search_edge_by_nodes(self, nodes):
"""Search for edge and edges providing end nodes.
If given a tuple with from_node and to_node return all the edges
between the two nodes.
If a third element is present in the tuple return the exact edge
between the two nodes with the key specified by the third element.
If no match is found return `None`.
:returns list of dictionary: If `nodes` is a two element tuple.
:returns dictionary: Otherwise.
"""
if len(nodes) < 2:
raise InvalidSearchParameters("At least two values are required.")
from_node = nodes[0]
to_node = nodes[1]
try:
if len(nodes) > 2:
key = nodes[2]
return self._graph.edge[from_node][to_node][key]
return self._graph.edge[from_node][to_node]
except:
return None
def _get_edge_end_nodes(self, edge_key):
"""Given an edge key return a tuple that contains
the end nodes for that edge.
"""
for from_node, to_node, key in self.edges_iter(keys=True):
if key == edge_key:
return from_node, to_node
return None, None
[docs] def get(self, key):
"""Return the element pointed by the specified key."""
if key in self._graph.node:
return self.node(key)
if key in self._subgraphs:
return self._subgraphs[key]
edge_ = self._search_edge_by_key(key)
if not edge_ is None:
return edge_
[docs] def as_graph_element(self, key):
"""Given a key of an existing node, edge or subgraph, return
its equivalent graph element object.
"""
element = self.get(key)
if element is None:
raise InvalidElementError(\
"No graph element has the given key: {0}".format(key))
# Subgraph objects don't need to be converted
if sg.is_subgraph(element):
return copy.deepcopy(element)
tmp_list = copy.deepcopy(element)
try:
if 'nid' in element:
tmp_list.pop('nid')
tmp_list.pop('sequence')
tmp_list.pop('slen')
return node.Node(\
element['nid'], \
element['sequence'], \
element['slen'], \
opt_fields=tmp_list)
if 'eid' in element:
tmp_list.pop('eid')
tmp_list.pop('from_node')
tmp_list.pop('from_orn')
tmp_list.pop('to_node')
tmp_list.pop('to_orn')
tmp_list.pop('from_positions')
tmp_list.pop('to_positions')
tmp_list.pop('alignment')
tmp_list.pop('variance')
tmp_list.pop('distance')
edge_ = ge.Edge(\
element['eid'], \
element['from_node'], element['from_orn'], \
element['to_node'], element['to_orn'], \
element['from_positions'], \
element['to_positions'], \
element['alignment'], element['distance'], \
element['variance'], \
opt_fields=tmp_list, \
is_dovetail=element['is_dovetail'] \
)
return edge_
except KeyError:
return None
[docs] def add_graph_element(self, element):
"""Add a graph element -Node, Edge or Subgraph- object to
the graph."""
if isinstance(element, node.Node):
self.add_node(element)
elif isinstance(element, ge.Edge):
self.add_edge(element)
elif isinstance(element, sg.Subgraph):
self.add_subgraph(element)
[docs] def add_node(self, new_node, safe=False):
"""Add a graph_element Node to the GFA graph
using the node id as key.
Its sequence and sequence length will be individual attributes
on the graph and all the remainders optional field will be stored
individually as node data.
:param new_node: A graph_element.Node object or a string
that can represent a node (such as the Segment line).
:param safe: If set check if the given identifier has already
been added to the graph, and in that case raise
an exception
"""
if isinstance(new_node, str) and new_node[0] == "S":
if segment.is_segmentv1(new_node):
new_node = node.Node.from_line(\
segment.SegmentV1.from_string(new_node.strip()))
else:
new_node = node.Node.from_line(\
segment.SegmentV2.from_string(new_node.strip()))
if not node.is_node(new_node):
raise node.InvalidNodeError("The object given is not a node.")
if safe and new_node.nid in self:
raise GFAError("An element with the same id already exists.")
self._graph.add_node(\
new_node.nid, \
nid=new_node.nid, \
sequence=new_node.sequence, \
slen=new_node.slen, \
**new_node.opt_fields)
return True
[docs] def remove_node(self, nid):
"""Remove a node with nid as its node id.
Edges containing nid as end node will be automatically
deleted.
:param nid: The id belonging to the node to delete.
:raise InvalidNodeError: If `nid` doesn't point to any node.
"""
try:
self._graph.remove_node(nid)
except:
raise node.InvalidNodeError("{0} doesn't point".format(nid) \
+ " to any node in the graph.")
[docs] def nodes_iter(self, data=False, with_sequence=False):
"""Return an iterator over nodes in the graph.
:para with_sequence: If set return only nodes with
a sequence property.
"""
if with_sequence is True:
if data is True:
return iter((nid, data_) \
for nid, data_ in self._graph.node.items() \
if 'sequence' in data_)
else:
return iter(nid \
for nid, data_ in self._graph.node.items() \
if 'sequence' in data_)
else:
return self._graph.nodes_iter(data)
[docs] def nbunch_iter(self, nbunch=None):
"""Return an iterator of nodes contained in nbunch that are
also in the graph.
Interface to the networkx method.
"""
return self._graph.nbunch_iter(nbunch=nbunch)
[docs] def add_edge(self, new_edge, safe=False):
"""Add a graph_element Edge or a networkx edge to the GFA
graph using the edge id as key.
If its id is `*` or `None` the edge will be given a
**virtual_id**, in either case the original edge id will
be preserved as edge attribute.
All edge attributes will be stored as netwrorkx edge
attributes and all the remainders optional field will be stored
individually as edge data.
"""
if isinstance(new_edge, str):
if new_edge[0] == 'L':
new_edge = ge.Edge.from_line(\
link.Link.from_string(new_edge.strip()))
elif new_edge[0] == 'C':
new_edge = ge.Edge.from_line(\
containment.Containment.from_string(new_edge.strip()))
elif new_edge[0] == 'E':
new_edge = ge.Edge.from_line(\
edge.Edge.from_string(new_edge.strip()))
elif new_edge[0] == 'G':
new_edge = ge.Edge.from_line(\
gap.Gap.from_string(new_edge.strip()))
elif new_edge[0] == 'F':
new_edge = ge.Edge.from_line(\
fragment.Fragment.from_string(new_edge.strip()))
else:
raise ge.InvalidEdgeError(\
"The string given doesn't represent a GFA line that could" \
+ " be represented as an edge,\n" \
+ "given: {0}".format(new_edge))
if not ge.is_edge(new_edge):
raise ge.InvalidEdgeError("The object is not a valid edge.")
key = new_edge.eid
if new_edge.eid is None or new_edge.eid == '*':
key = "virtual_{0}".format(self._get_virtual_id())
if safe:
edge_exists = key in self
node1_exists = new_edge.from_node in self
node2_exists = new_edge.to_node in self
if edge_exists:
raise GFAError("An element with the same id already exists.")
if not (node1_exists and \
node2_exists):
raise GFAError("From/To node are not already in the graph.")
self._graph.add_edge( \
new_edge.from_node, new_edge.to_node, key=key, \
eid=new_edge.eid, \
from_node=new_edge.from_node, \
from_orn=new_edge.from_orn, \
to_node=new_edge.to_node, \
to_orn=new_edge.to_orn, \
from_positions=new_edge.from_positions, \
to_positions=new_edge.to_positions, \
alignment=new_edge.alignment, \
distance=new_edge.distance, \
variance=new_edge.variance, \
is_dovetail=new_edge.is_dovetail, \
from_segment_end=new_edge.from_segment_end, \
to_segment_end=new_edge.to_segment_end, \
**new_edge.opt_fields \
)
[docs] def remove_edge(self, identifier):
"""Remove an edge or all edges identified by an id
or by a tuple with end node, respectively.
* If `identifier` is a two elements tuple remove all the
all the edges between the two nodes.
* If `identifier` is a three elements tuple remove the edge
specified by the third element of the tuple with end nodes
given by the first two elements of the tuple itself.
* If `identifier` is not a tuple, treat it as it should be
an edge id.
:raise InvalidEdgeError: If `identifier` is not in the cases
described above.
"""
try:
if isinstance(identifier, tuple):
if len(identifier) == 2:
self.remove_edges(identifier[0], identifier[1])
else:
self._graph.remove_edge(identifier[0], \
identifier[1], \
identifier[2])
else:
from_node, to_node = self._get_edge_end_nodes(identifier)
self._graph.remove_edge(from_node, \
to_node, \
identifier)
except nx.NetworkXError as nxe:
raise ge.InvalidEdgeError(nxe)
[docs] def remove_edges(self, from_node, to_node):
"""Remove all the direct edges between the two nodes given.
Call iteratively remove_edge (remove a not specified edge
from `from_node` and `to_node`) for n-times where n is
the number of edges between the given nodes,
removing all the edges indeed.
"""
num_edges = len(self.edge((from_node, to_node)))
for edge_ in range(0, num_edges):
self._graph.remove_edge(from_node, to_node)
[docs] def edges_iter(self, nbunch=None, data=False, keys=False, default=None):
"""Interface to networx edges iterator."""
return self._graph.edges_iter(nbunch=nbunch, \
data=data, \
keys=keys, \
default=default)
[docs] def add_subgraph(self, subgraph, safe=False):
"""Add a Subgraph object to the graph.
The object is not altered in any way.
A deepcopy of the object given is attached to the graph.
"""
if isinstance(subgraph, str):
if subgraph[0] == "P":
subgraph = sg.Subgraph.from_line(\
path.Path.from_string(subgraph))
elif subgraph[0] == "O":
subgraph = sg.Subgraph.from_line(\
group.OGroup.from_string(subgraph))
elif subgraph[0] == "U":
subgraph = sg.Subgraph.from_line(\
group.UGroup.from_string(subgraph))
else:
raise sg.InvalidSubgraphError(\
"The string given cannot be represented as a subgraph,\n" \
+ "given: {0}".format(subgraph))
if not sg.is_subgraph(subgraph):
raise sg.InvalidSubgraphError("The object given is not a subgraph.")
key = subgraph.sub_id
if key == '*':
key = "virtual_{0}".format(self._get_virtual_id())
if safe and key in self:
raise GFAError("An element with the same id already exists.")
self._subgraphs[key] = copy.deepcopy(subgraph)
[docs] def remove_subgraph(self, subgraph_id):
"""Remove the Subgraph object identified by the given id.
"""
try:
del(self._subgraphs[subgraph_id])
except:
raise sg.InvalidSubgraphError("The given id doesn't " \
+ " identify any subgraph.")
[docs] def subgraphs_iter(self, data=False):
"""Return an iterator over subgraphs elements
in the GFA graph.
"""
if data is True:
return iter(self._subgraphs.items())
else:
return iter(self._subgraphs)
[docs] def get_subgraph(self, sub_key):
"""Return a GFA subgraph from the parent graph.
Return a new GFA graph structure with the nodes,
edges and subgraphs specified in the elements attributes
of the subgraph object pointed by the id.
The returned GFA is *independent* from the original object.
:param sub_key: The id of a subgraph present in the GFA graph.
:returns None: if the subgraph id doesn't exist.
"""
if not sub_key in self._subgraphs:
raise sg.InvalidSubgraphError(\
"There is no subgraph pointed by this key.")
subgraph = self._subgraphs[sub_key]
sub_gfa = GFA()
for id_, orn in subgraph.elements.items():
# creating a new GFA graph and the add method,
# the virtual id are recomputed
sub_gfa.add_graph_element(self.as_graph_element(id_))
return sub_gfa
[docs] def subgraph(self, nbunch, copy=True):
"""Given a bunch of nodes return a graph with
all the given nodes and the edges between them.
The returne object is not a GFA Graph, but a
MultiGraph. To create a new GFA graph, just
use the GFA initializer an give the subgraph to it.
Interface to the networkx subgraph method.
Given a collection of nodes return a subgraph with the nodes
given and all the edges between each pair of nodes.
:param nbunch: The nodes.
:param copy: If set to True return a copy of the subgraph.
"""
subgraph_ = self._graph.subgraph(nbunch)
if copy:
return subgraph_.copy()
return subgraph_
[docs] def dovetails_subgraph(self, nbunch=None, copy=True):
"""Given a collection of nodes return a subgraph with the nodes
given and all the edges between each pair of nodes.
Only dovetails overlaps are considered.
"""
bunch = self.nbunch_iter(nbunch)
# create new graph and copy subgraph into it
H = self._graph.__class__()
# copy node and attribute dictionaries
for n in bunch:
H.node[n] = self._graph.node[n]
# namespace shortcuts for speed
H_adj = H.adj
# filter edges based on is_dovetail property
self_adj = self._graph.adjlist_dict_factory()
for from_node in self._graph.adj:
self_adj[from_node] = self._graph.adjlist_dict_factory()
for to_node in self._graph.adj[from_node]:
self_adj[from_node][to_node] = self._graph.adjlist_dict_factory()
for edge_ in self._graph.adj[from_node][to_node]:
if self._graph.adj[from_node][to_node][edge_]['is_dovetail'] is True:
self_adj[from_node][to_node][edge_] = \
self._graph.adj[from_node][to_node][edge_]
# add nodes and edges (undirected method)
for n in H:
Hnbrs = H.adjlist_dict_factory()
H_adj[n] = Hnbrs
for nbr, edgedict in self_adj[n].items():
if nbr in H_adj:
# add both representations of edge: n-nbr and nbr-n
# they share the same edgedict
ed = edgedict.copy()
Hnbrs[nbr] = ed
H_adj[nbr][n] = ed
H.graph = self._graph.graph
if copy is True:
return H.copy()
return H
[docs] def neighbors(self, nid):
"""Return all the nodes id of the nodes connected to
the given node.
Return all the predecessors and successors of the
given source node.
:params nid: The id of the selected node
"""
if self.node(nid) is None:
raise GFAError("The source node is not in the graph.")
return list(nx_all_neighbors(self._graph, nid))
[docs] def search(self, \
comparator, \
limit_type=None):
"""Perform a query applying the comparator on each graph element.
"""
if limit_type == Element.NODE:
return self.search_on_nodes(comparator)
elif limit_type == Element.EDGE:
return self.search_on_edges(comparator)
elif limit_type == Element.SUBGRAPH:
return self.search_on_subgraph(comparator)
retval = []
retval.extend(self.search_on_nodes(comparator))
retval.extend(self.search_on_edges(comparator))
retval.extend(self.search_on_subgraph(comparator))
return retval
[docs] def search_on_nodes(self, comparator):
retval = []
for key, data in self.nodes_iter(data=True):
try:
if comparator(data):
retval.append(key)
except KeyError: pass
return retval
[docs] def search_on_edges(self, comparator):
retval = []
for u, v, key, data in self.edges_iter(data=True, keys=True):
try:
if comparator(data):
retval.append(key)
except KeyError: pass
return retval
[docs] def search_on_subgraph(self, comparator):
retval = []
for key, data in self._subgraphs.items():
data = data.as_dict()
try:
if comparator(data):
retval.append(key)
except KeyError: pass
return retval
[docs] def from_string(self, string):
"""Add a GFA string to the graph once it has been
converted.
:TODO:
Maybe this could be used instead of checking for line type
in the add_xxx methods...
"""
lines = re.split("\n", string)
for line_ in lines:
line_ = line_.strip()
if len(line_) < 1:
continue
if line_[0] == 'S':
if segment.is_segmentv1(line_):
self.add_graph_element(\
node.Node.from_line(\
segment.SegmentV1.from_string(line_)))
else:
self.add_graph_element(\
node.Node.from_line(\
segment.SegmentV2.from_string(line_)))
elif line_[0] == 'L':
self.add_graph_element(\
ge.Edge.from_line(\
link.Link.from_string(line_)))
elif line_[0] == 'C':
self.add_graph_element(\
ge.Edge.from_line(\
containment.Containment.from_string(line_)))
elif line_[0] == 'E':
self.add_graph_element(\
ge.Edge.from_line(\
edge.Edge.from_string(line_)))
elif line_[0] == 'G':
self.add_graph_element(\
ge.Edge.from_line(\
gap.Gap.from_string(line_)))
elif line_[0] == 'F':
self.add_graph_element(\
ge.Edge.from_line(\
fragment.Fragment.from_string(line_)))
elif line_[0] == 'P':
self.add_graph_element(\
sg.Subgraph.from_line(\
path.Path.from_string(line_)))
elif line_[0] == 'O':
self.add_graph_element(\
sg.Subgraph.from_line(\
group.OGroup.from_string(line_)))
elif line_[0] == 'U':
self.add_graph_element(\
sg.Subgraph.from_line(\
group.UGroup.from_string(line_)))
# This method has been checked manually
@classmethod
[docs] def from_file(cls, filepath): # pragma: no cover
"""Parse the given file and return a GFA object.
"""
pygfa_ = GFA()
file_handler = open(filepath)
file_content = file_handler.read()
file_handler.close()
pygfa_.from_string(file_content)
return pygfa_
[docs] def pprint(self): # pragma: no cover
"""A basic pretty print function for nodes and edges.
"""
string = "\nGRAPH:\nNodes: [\n"
for node, datas in self.nodes_iter(data=True):
string += str(node) + "\t: {"
for name, data in datas.items():
string += str(name) + ": " + str(data) + "\t"
string += "}\n"
string += "]\n"
string += "\nEdges: [\n"
for from_node, to_node, key, datas in self.edges_iter( \
keys=True,\
data=True):
string += str(key) + "\t: {"
for name, data in datas.items():
string += str(name) + ": " + str(data) + "\t"
string += "}\n"
string += "]\n"
string += "\nSubgraphs: [\n"
for key, data in self._subgraphs.items():
string += str(key) + "\t: {" + str(data) + "}\n"
string += "]\n"
return string
[docs] def dump(self, gfa_version=1, out=None):
try:
dump_ = ""
if gfa_version == 1:
dump_ = gs1.serialize_gfa(self)
elif gfa_version == 2:
dump_ = gs2.serialize_gfa(self)
else:
raise ValueError("Invalid GFA output version.")
if out is None:
return dump_
with open(out, 'w') as out_file:
out_file.write(dump_)
except EnvironmentError as env_error:
GRAPH_LOGGER.error(repr(env_error))
def _make_edge_lut(self):
"""Return a lookup table that associate each edge id with a best
match unique id dependent on edge information (from_node, to_node).
All the edges between a pair of nodes will have the same alias,
so a single alias will collect a list of real id.
False positive will also be place inside this lists.
If both from_node and to_node are not specified the id
is placed into a set for further processing.
"""
virtual_rxp = "^virtual_([\d]+)$"
regexp = re.compile(virtual_rxp)
edge_lut = {}
pure_virtuals = []
for from_node, to_node, edge_, data_ in self.edges_iter(keys=True, data=True):
from_data = self.node(from_node)
to_data = self.node(to_node)
match = regexp.fullmatch(edge_)
if match is not None:
from_sequence = ""
to_sequence = ""
if 'sequence' in from_data \
and from_data['sequence'] not in (None, '*'):
from_sequence = from_data['sequence']
if 'sequence' in to_data and \
to_data['sequence'] not in (None, '*'):
to_sequence = to_data['sequence']
if not from_sequence and not to_sequence:
pure_virtuals.append(edge_)
else:
alias = from_sequence + to_sequence
if alias not in edge_lut:
edge_lut[alias] = []
edge_lut[alias].append(edge_)
else:
edge_lut[edge_] = [edge_]
return edge_lut, pure_virtuals
def _make_edge_table(self):
"""Create a table to list each edge id to
its end nodes.
A networkx Multigraph edge is identified by its end nodes and by
a key/id. To access a specific edge one should do:
>>> graph.edge[from_node][to_node][edge_key]
This way, to identify an edge by only its key, a search
operation that could take :math:`O(n)` is required.
This function instead write end nodes and key in the opposite
order, so that a search by key operation could be done
in constant time, but requires :math:`O(n)` space.
"""
edges = {}
for from_node, to_node, key in self.edges_iter(keys=True):
edges[key] = (from_node, to_node)
return edges
def _look_for_edge(self, key, edge_table):
from_node, to_node = edge_table[key]
return self._graph.edge[from_node][to_node][key]
def __eq__(self, other):
"""
:TODO:
* make a lut for subgraphs (try to think for a way to write
_make_edge_lut in a reusable way...
"""
try:
# Nodes must be defined, so there is no reason to
# create a LUT
for nid, node_ in self.nodes_iter(data=True):
if node_ != other.node(nid):
return False
self_edge_table = self._make_edge_table()
other_edge_table = other._make_edge_table()
self_lut, self_edge_virtuals = self._make_edge_lut()
other_lut, other_edge_virtuals = other._make_edge_lut()
for alias, list_ids in self_lut.items():
while len(list_ids):
id_ = list_ids.pop()
found = False
index = 0
edge_ = self._look_for_edge(id_, self_edge_table)
while not found and index < len(other_lut[alias]):
other_id = other_lut[alias][index]
if edge_ == other._look_for_edge(\
other_id, \
other_edge_table):
found = True
else:
index += 1
if not found:
return False
# if is found remove it from list
# to speed up next searches.
other_lut[alias].pop(index)
# if other_lut has other ids attached to that alias, then
# graphs are not equals
#if not len(other_lut[alias]):
# return False
for edge_ in self_edge_virtuals:
found, index = _index(edge_, other_edge_virtuals)
if not found:
return False
other_edge_virtuals.pop(index)
# I think it's difficult to have lots of subgraphs
# If I am wrong a subgraphs lut will be made and the comparison
# should be nearly linear in time
self_subgraphs = [sub.as_dict() for sub in self.subgraphs().values()]
other_subgraphs = [sub.as_dict() for sub in other.subgraphs().values()]
for sub_ in self_subgraphs:
found, index = _index(sub_, other_subgraphs)
if not found:
return False
other_subgraphs.pop(index)
except (AttributeError, KeyError) as e:
return False
return True
def __neq__(self, other):
return not self == other
if __name__ == '__main__': #pragma: no cover
pass