Source code for cnvlib.rary

"""An array of genomic regions or features."""
from __future__ import absolute_import, division, print_function

import logging
import sys

import pandas as pd
from Bio.File import as_handle

from . import core, gary, ngfrills

[docs]class RegionArray(gary.GenomicArray): """An array of genomic intervals.""" _required_columns = ("chromosome", "start", "end", # "name", "strand", ) _required_dtypes = ("string", "int", "int") def __init__(self, data_table, meta_dict=None): gary.GenomicArray.__init__(self, data_table, meta_dict) @classmethod
[docs] def read(cls, fname, sample_id=None, fmt=None): """Read regions in any of the expected file formats. Iterates over tuples of the tabular contents. Header lines are skipped. Start and end coordinates are base-0, half-open. """ if sample_id is None: if isinstance(fname, basestring): sample_id = core.fbase(fname) elif fmt is None: raise ValueError("To read regions from a stream, the file " "format must be specified with the `fmt` " "argument.") else: sample_id = '<unknown>' if not fmt: fmt = ngfrills.sniff_region_format(fname) if fmt is None: return cls([]) if fmt == 'bed':"Detected file format: BED") elif fmt == 'interval':"Detected file format: interval list") parser = {'text': _parse_text_coords, 'interval': _parse_interval_list, 'bed': _parse_bed, }[fmt] table = parser(fname) return cls(table, {"sample_id": sample_id})
[docs] def write(self, outfile=sys.stdout, fmt="bed", verbose=True): assert fmt in ("text", "interval") or fmt.startswith("bed") if fmt == "text": cp = self.copy() cp['start'] += 1 table = cp.labels() else: table = if fmt == "interval": table["start"] += 1 if "name" not in table: table["name"] = '-' if "strand" not in table: table["strand"] = "+" table = table.loc[:, ["chromosome", "start", "end", "strand", "name"]] elif fmt == "bed4": if "name" not in table: table["name"] = '-' table = table.loc[:, ["chromosome", "start", "end", "name"]] elif fmt == "bed3": table = table.loc[:, ["chromosome", "start", "end"]] # Default: bed-like, keep all trailing columns with ngfrills.safe_write(outfile, False) as outfile: table.to_csv(outfile, sep='\t', header=False, index=False) if verbose: # Log the output path, if possible if isinstance(outfile, basestring): outfname = outfile elif hasattr(outfile, 'name') and outfile not in (sys.stdout, sys.stderr): outfname = else: # Probably stdout or stderr used in a pipeline -- don't pollute return"Wrote %s with %d regions", outfname, len(table))
def _parse_text_coords(infile): """Parse text coordinates: chrom:start-end Or sometimes: chrom:start-end:name Text coordinates are assumed to be counting from 1. """ @ngfrills.report_bad_line def _parse_line(line): fields = line.split(':') if len(fields) == 3: chrom, start_end, name = fields elif len(fields) == 2: chrom, start_end = fields name = '-' else: raise ValueError("Bad line: %r" % line) start, end = start_end.split('-') return chrom, int(start) - 1, int(end), name.rstrip() with as_handle(infile, 'rU') as handle: rows = [_parse_line(line) for line in handle] return pd.DataFrame.from_records(rows, columns=["chromosome", "start", "end", "name"]) def _parse_interval_list(infile): """Parse a Picard-compatible interval list. Expected tabular columns: chromosome, start position, end position, strand, region name Counting is from 1. """ table = pd.read_table(infile, comment='@', # Skip the SAM header names=["chromosome", "start", "end", "strand", "name", ]) table["name"].fillna('-', inplace=True) table["start"] -= 1 return table def _parse_bed(infile): """Parse a BED file. A BED file has these columns: chromosome, start position, end position, [name, strand, other stuff...] Counting is from 0. Sets of regions are separated by "track" lines. This function stops reading after encountering a track line other than the first one in the file. """ # ENH: just pd.read_table, skip 'track' @ngfrills.report_bad_line def _parse_line(line): fields = line.split('\t', 6) chrom, start, end = fields[:3] name = (fields[3].rstrip() if len(fields) >= 4 else '-') strand = (fields[5].rstrip() if len(fields) >= 6 else '.') return chrom, int(start), int(end), name, strand def track2track(handle): firstline = next(handle) if firstline.startswith("track"): pass else: yield firstline for line in handle: if line.startswith('track'): raise StopIteration yield line with as_handle(infile, 'rU') as handle: rows = map(_parse_line, track2track(handle)) return pd.DataFrame.from_records(rows, columns=["chromosome", "start", "end", "name", "strand"])