Source code for pygfa.serializer.gfa1_serializer

"""
GFA1 Serializer for nodes, edge, Subgraphs and networkx graphs.

Can serialize either one of the object from the group mentioned
before or from a dictionary with equivalent key.
"""
import copy
import logging

import networkx as nx

from pygfa.graph_element.parser import field_validator as fv
from pygfa.serializer import utils

[docs]class GFA1SerializationError(Exception): pass
serializer_logger = logging.getLogger(__name__) DEFAULT_IDENTIFIER = "no identifier given." SEGMENT_FIELDS = [fv.GFA1_NAME, fv.GFA1_SEQUENCE] LINK_FIELDS = [\ fv.GFA1_NAME, \ fv.GFA1_ORIENTATION, \ fv.GFA1_NAME, \ fv.GFA1_ORIENTATION, \ fv.GFA1_CIGAR] CONTAINMENT_FIELDS = [\ fv.GFA1_NAME, \ fv.GFA1_ORIENTATION, \ fv.GFA1_NAME, \ fv.GFA1_ORIENTATION, \ fv.GFA1_INT, \ fv.GFA1_CIGAR] PATH_FIELDS = [\ fv.GFA1_NAME, \ fv.GFA1_NAMES, \ fv.GFA1_CIGARS] ################################################################################ # NODE SERIALIZER ################################################################################
[docs]def serialize_node(node_, identifier=DEFAULT_IDENTIFIER): """Serialize to the GFA1 specification a Graph Element Node or a dictionary that has the same informations. :param node: A Graph Element Node or a dictionary. :param identifier: If set help gaining useful debug information. :return "": If the object cannot be serialized to GFA. """ identifier = utils._check_identifier(identifier) try: if isinstance(node_, dict): node_dict = copy.deepcopy(node_) defined_fields = [ \ node_dict.pop('nid'), \ node_dict.pop('sequence') \ ] node_dict.pop('slen') fields = ["S"] fields.append(str(node_['nid'])) fields.append(str(node_['sequence'])) if node_['slen'] != None: fields.append("LN:i:" + str(node_['slen'])) fields.extend(utils._serialize_opt_fields(node_dict)) else: defined_fields = [ \ node_.nid, \ node_.sequence ] fields = ["S"] fields.append(str(node_.nid)) fields.append(str(node_.sequence)) if node_.slen != None: fields.append("LN:i:" + str(node_.slen)) fields.extend(utils._serialize_opt_fields(node_.opt_fields)) if not utils._are_fields_defined(defined_fields) or \ not utils._check_fields(fields[1:], SEGMENT_FIELDS): raise GFA1SerializationError("Required node elements " \ + "missing or invalid.") return str.join("\t", fields) except (KeyError, AttributeError, GFA1SerializationError) as e: serializer_logger.debug(utils._format_exception(identifier, e)) return ""
################################################################################ # EDGE SERIALIZER ################################################################################
[docs]def serialize_edge(edge_, identifier=DEFAULT_IDENTIFIER): """Converts to a GFA1 line the given edge. Fragments and Gaps cannot be represented in GFA1 specification, so they are not serialized. """ identifier = utils._check_identifier(identifier) try: if isinstance(edge_, dict): if edge_['eid'] is None: # edge_ is a fragment raise GFA1SerializationError("Cannot serialize Fragment " \ + "to GFA1.") elif edge_['distance'] != None or \ edge_['variance'] != None: # edge_ is a gap raise GFA1SerializationError("Cannot serialize GAP " \ + "to GFA1.") elif 'pos' in edge_: # edge_ is a containment return _serialize_to_containment(edge_, identifier) elif edge_['is_dovetail'] is True: return _serialize_to_link(edge_, identifier) else: raise GFA1SerializationError("Cannot convert an " \ + "internal edge to a Link") else: if edge_.eid is None: # edge_ is a fragment raise GFA1SerializationError("Cannot serialize Fragment " \ + "to GFA1.") elif edge_.distance != None or \ edge_.variance != None: # edge_ is a gap raise GFA1SerializationError("Cannot serialize GAP " \ + "to GFA1.") elif 'pos' in edge_.opt_fields: # edge_ is a containment return _serialize_to_containment(edge_) elif edge_.is_dovetail is True: return _serialize_to_link(edge_) else: raise GFA1SerializationError("Cannot convert an " \ + "internal edge to a Link") except (KeyError, AttributeError, GFA1SerializationError) as e: serializer_logger.debug(utils._format_exception(identifier, e)) return ""
def _serialize_to_containment(containment_, identifier=DEFAULT_IDENTIFIER): identifier = utils._check_identifier(identifier) try: if isinstance(containment_, dict): containment_dict = copy.deepcopy(containment_) utils._remove_common_edge_fields(containment_dict) containment_dict.pop('pos') defined_fields = [ \ containment_['from_node'], \ containment_['from_orn'], \ containment_['to_node'], \ containment_['to_orn'], \ containment_['alignment'], \ containment_['pos'].value ] fields = ["C"] fields.append(str(containment_['from_node'])) fields.append(str(containment_['from_orn'])) fields.append(str(containment_['to_node'])) fields.append(str(containment_['to_orn'])) fields.append(str(containment_['pos'].value)) if fv.is_gfa1_cigar(containment_['alignment']): fields.append(str(containment_['alignment'])) else: fields.append("*") if not containment_['eid'] in(None, '*'): fields.append("ID:Z:" + str(containment_['eid'])) fields.extend(utils._serialize_opt_fields(containment_dict)) else: defined_fields = [ \ containment_.from_node, \ containment_.from_orn, \ containment_.to_node, \ containment_.to_orn, \ containment_.alignment, \ containment_.opt_fields['pos'].value \ ] fields = ["C"] opt_fields = copy.deepcopy(containment_.opt_fields) opt_fields.pop('pos') fields.append(str(containment_.from_node)) fields.append(str(containment_.from_orn)) fields.append(str(containment_.to_node)) fields.append(str(containment_.to_orn)) fields.append(str(containment_.opt_fields['pos'].value)) if fv.is_gfa1_cigar(containment_.alignment): fields.append(str(containment_.alignment)) else: fields.append("*") if not containment_.eid in(None, '*'): fields.append("ID:Z:" + str(containment_.eid)) fields.extend(utils._serialize_opt_fields(opt_fields)) if not utils._are_fields_defined(defined_fields) or \ not utils._check_fields(fields[1:], CONTAINMENT_FIELDS): raise GFA1SerializationError() return str.join("\t", fields) except(KeyError, AttributeError, GFA1SerializationError) as e: serializer_logger.debug(utils._format_exception(identifier, e)) return "" def _serialize_to_link(link_, identifier=DEFAULT_IDENTIFIER): identifier = utils._check_identifier(identifier) try: if isinstance(link_, dict): link_dict = copy.deepcopy(link_) utils._remove_common_edge_fields(link_dict) defined_fields = [ \ link_['from_node'], \ link_['from_orn'], \ link_['to_node'], \ link_['to_orn'], \ link_['alignment'] \ ] fields = ["L"] fields.append(str(link_['from_node'])) fields.append(str(link_['from_orn'])) fields.append(str(link_['to_node'])) fields.append(str(link_['to_orn'])) if fv.is_gfa1_cigar(link_['alignment']): fields.append(str(link_['alignment'])) else: fields.append("*") if not link_['eid'] in(None, '*'): fields.append("ID:Z:" + str(link_['eid'])) fields.extend(utils._serialize_opt_fields(link_dict)) else: defined_fields = [ \ link_.from_node, \ link_.from_orn, \ link_.to_node, \ link_.to_orn, \ link_.alignment \ ] fields = ["L"] fields.append(str(link_.from_node)) fields.append(str(link_.from_orn)) fields.append(str(link_.to_node)) fields.append(str(link_.to_orn)) if fv.is_gfa1_cigar(link_.alignment): fields.append(str(link_.alignment)) else: fields.append("*") if not link_.eid in(None, '*'): fields.append("ID:Z:" + str(link_.eid)) fields.extend(utils._serialize_opt_fields(link_.opt_fields)) if not utils._are_fields_defined(defined_fields) or \ not utils._check_fields(fields[1:], LINK_FIELDS): raise GFA1SerializationError() return str.join("\t", fields) except(KeyError, AttributeError, GFA1SerializationError) as e: serializer_logger.debug(utils._format_exception(identifier, e)) return "" ################################################################################ # SUBGRAPH SERIALIZER ################################################################################
[docs]def point_to_node(gfa_, node_id): """Check if the given node_id point to a node in the gfa graph. """ return gfa_.node(node_id) != None
def _serialize_subgraph_elements(subgraph_elements, gfa_=None): """Serialize the elements belonging to a subgraph. Check if the orientation is provided for each element of the subgraph. If gfa is provided, each element can be tested wheter it is a node or another element of the GFA graph. Only nodes (segments) will be (and could be) serialized to elements of the Path. If a gfa graph is not provided there cannot be any control over nodes, and data will be process as is. :param subgraph: A Graph Element Subgraph. :param gfa: The GFA object that contain the subgraph. """ elements = [] for id_, orientation in subgraph_elements.items(): if gfa_ is None: if orientation != None: elements.append(str(id_) + str(orientation)) else: if orientation != None \ and point_to_node(gfa_, id_): elements.append(str(id_) + str(orientation)) return str.join(",", elements)
[docs]def serialize_subgraph(subgraph_, identifier=DEFAULT_IDENTIFIER, gfa_=None): """Serialize a Subgraph object or an equivalent dictionary. """ identifier = utils._check_identifier(identifier) try: if isinstance(subgraph_, dict): subgraph_dict = copy.deepcopy(subgraph_) defined_fields = [\ subgraph_dict.pop('sub_id'), \ subgraph_dict.pop('elements') \ ] fields = ["P"] fields.append(subgraph_['sub_id']) fields.append(_serialize_subgraph_elements(subgraph_['elements'], gfa_)) if 'overlaps' in subgraph_: subgraph_dict.pop('overlaps') fields.append(str.join(",", subgraph_['overlaps'].value)) else: fields.append("*") fields.extend(utils._serialize_opt_fields(subgraph_dict)) else: defined_fields = [\ subgraph_.sub_id, \ subgraph_.elements \ ] opt_fields = copy.deepcopy(subgraph_.opt_fields) fields = ["P"] fields.append(subgraph_.sub_id) fields.append(_serialize_subgraph_elements(subgraph_.elements, gfa_)) if 'overlaps' in subgraph_.opt_fields: opt_fields.pop('overlaps') fields.append(str.join(",", subgraph_.opt_fields['overlaps'].value)) else: fields.append("*") fields.extend(utils._serialize_opt_fields(opt_fields)) if not utils._are_fields_defined(defined_fields) or \ not utils._check_fields(fields[1:], PATH_FIELDS): raise GFA1SerializationError("Required fields missing or" \ + " not valid.") return str.join("\t", fields) except(KeyError, AttributeError, GFA1SerializationError) as e: serializer_logger.debug(utils._format_exception(identifier, e)) return ""
################################################################################ # SERIALIZE GRAPH ################################################################################
[docs]def serialize_graph(graph, write_header=True): """Serialize a networkx.MulitGraph object. :param graph: A networkx.MultiGraph instance. :write_header: If set to True put a GFA1 header as first line. """ if not isinstance(graph, nx.MultiGraph): raise ValueError("The object to serialize must be an instance " \ + "of a networkx.MultiGraph.") if write_header: string_serialize = "H\tVN:Z:1.0\n" for node_id, node in graph.nodes_iter(data=True): node_serialize = serialize_node(node, node_id) if len(node_serialize) > 0: string_serialize += node_serialize + "\n" for from_node, to_node, key in graph.edges_iter(keys=True): edge_serialize = serialize_edge(graph.edge[from_node][to_node][key], key) if len(edge_serialize) > 0: string_serialize += edge_serialize + "\n" return string_serialize
[docs]def serialize_gfa(gfa_): """Serialize a GFA object into a GFA1 file. """ gfa_serialize = serialize_graph(gfa_._graph, write_header=True) for sub_id, subgraph_ in gfa_.subgraphs().items(): subgraph_serialize = serialize_subgraph(subgraph_, sub_id, gfa_) if len(subgraph_serialize) > 0: gfa_serialize += subgraph_serialize + "\n" return gfa_serialize
if __name__ == '__main__': # pragma: no cover pass