Source code for cnvlib.cnarray

"""Definitions for the core data structure, a copy number array."""
from __future__ import print_function

import sys
from itertools import groupby

import numpy
from Bio.File import as_handle
from Bio._py3k import basestring, map, zip

from . import core, metrics, ngfrills, smoothing


[docs]class CopyNumArray(object): """An array of genomic intervals, treated like aCGH probes.""" # http://docs.scipy.org/doc/numpy/user/basics.rec.html _dtype = (('chromosome', 'a30'), ('start', 'i4'), ('end', 'i4'), ('gene', 'a30'), ('coverage', 'f4')) _dtype_gc = ('gc', 'f4') _dtype_rmask = ('rmask', 'f4') _dtype_spread = ('spread', 'f4') _dtype_weight = ('weight', 'f4') _dtype_probes = ('probes', 'i4') def __init__(self, sample_id, chromosomes, starts, ends, genes, coverages, gc=None, rmask=None, spread=None, weight=None, probes=None): dtype = list(self._dtype) if all(x is None for x in (gc, rmask, spread, weight, probes)): self._xtra = () table = list(zip(chromosomes, starts, ends, genes, coverages)) else: # XXX There Must Be a Better Way -- use **kwargs? xtra_names = [] xtra_cols = [] if gc is not None: xtra_names.append('gc') xtra_cols.append(gc) dtype.append(self._dtype_gc) if rmask is not None: xtra_names.append('rmask') xtra_cols.append(rmask) dtype.append(self._dtype_rmask) if spread is not None: xtra_names.append('spread') xtra_cols.append(spread) dtype.append(self._dtype_spread) if weight is not None: xtra_names.append('weight') xtra_cols.append(weight) dtype.append(self._dtype_weight) if probes is not None: xtra_names.append('probes') xtra_cols.append(probes) dtype.append(self._dtype_probes) self._xtra = tuple(xtra_names) table = list(zip(chromosomes, starts, ends, genes, coverages, *xtra_cols)) self.data = numpy.asarray(table, dtype) self.sample_id = sample_id @classmethod
[docs] def from_rows(cls, sample_id, row_data, extra_keys=()): dtype = list(cls._dtype) if extra_keys: blank_kwargs = {k: [] for k in extra_keys} newps = cls(sample_id, [], [], [], [], [], **blank_kwargs) if 'gc' in extra_keys: dtype.append(cls._dtype_gc) if 'rmask' in extra_keys: dtype.append(cls._dtype_rmask) if 'spread' in extra_keys: dtype.append(cls._dtype_spread) if 'weight' in extra_keys: dtype.append(cls._dtype_weight) if 'probes' in extra_keys: dtype.append(cls._dtype_probes) else: newps = cls(sample_id, [], [], [], [], []) newps.data = numpy.asarray(row_data, dtype=dtype) return newps # Container-like behavior
def __eq__(self, other): return (isinstance(other, self.__class__) and (self.data == other.data).all()) def __len__(self): return len(self.data) def __contains__(self, key): return (key in self.data.dtype.fields) def __getitem__(self, index): return self.data[index] def __setitem__(self, index, value): self.data[index] = value def __delitem__(self, index): return NotImplemented def __iter__(self): return iter(self.data) def __next__(self): return next(self.data) @property def coverage(self): return self.data['coverage'] @property def chromosome(self): return self.data['chromosome'] @property def start(self): return self.data['start'] @property def end(self): return self.data['end'] @property def gene(self): return self.data['gene']
[docs] def labels(self): for row in self.data: yield row2label(row)
[docs] def by_bin(self, bins): """Group rows by another CopyNumArray; trim row start/end to bin edges. Returns an iterable of (bin, CopyNumArray of overlapping cnarray rows)) If a probe overlaps with a bin boundary, the probe start or end position is replaced with the bin boundary position. Probes outside any segments are skipped. This is appropriate for most other comparisons between CopyNumArray objects. """ cn_by_chrom = dict(self.by_chromosome()) for chrom, bin_rows in bins.by_chromosome(): cn_rows = cn_by_chrom.get(chrom) if not cn_rows: continue # Traverse rows and bins together, matching up start/end points # cn_rows = iter(cn_rows) # curr_row = next(cn_rows) # row_start, row_end = curr_row['start'], curr_row['end'] for bin_row in bin_rows: # Iterate over rows until we're clear of the bin # while True: # if row_start < bin_row['start']: # # Skip a gap in bin1 # row_start = bin_row['start'] # elif row_start > bin_row['start']: # # Skip a gap in bin2 # bin_row['start'] = row_start # # Row start points match now: emit up to the next endpoint # bin_row['end'] = min(bin_row['end'], row_end) # selected_rows.append(bin_row) # if row_end > bin_row['end']: # # Trim & hold bin2; take next bin1 # row_start = bin_row['end'] # break # elif row_end < bin_row['end']: # try: # # Trim & hold bin1; take next bin2 # bin_row['start'] = row_end # curr_row = next(bin_rows) # row_start, row_end = curr_row['start'], curr_row['end'] # continue # except StopIteration: # # At the 3' telomere # break # elif row_end == bin_row['end']: # try: # # Take next of each of bin1 and bin2 # curr_row = next(bin_rows) # row_start, row_end = curr_row['start'], curr_row['end'] # except StopIteration: # # At the 3' telomere # pass # break # ENH - use a faster 1-pass implementation binned_rows = cn_rows.in_range(chrom, bin_row['start'], bin_row['end'], trim=True) yield bin_row, self.to_rows(binned_rows)
[docs] def by_chromosome(self): """Iterate over probes grouped by chromosome name.""" for chrom, rows in groupby(self.data, lambda row: str(row['chromosome'])): yield chrom, self.to_rows(list(rows))
[docs] def by_gene(self, ignore=('-', 'CGH')): """Iterate over probes grouped by gene name. Emits pairs of (gene name, CNA of rows with same name) Groups each series of intergenic bins as a 'Background' gene; any 'Background' bins within a gene are grouped with that gene. Bins with names in `ignore` are treated as 'Background' bins, but retain their name. """ curr_chrom = None curr_bg_rows = [] curr_gene_name = None curr_gene_rows = [] for row in self.data: gene = str(row['gene']) if gene == 'Background' or gene in ignore: # This background *may* be in an intergenic region if curr_chrom != row['chromosome']: # New chromosome (not intergenic): emit the BG & reset if curr_bg_rows: yield 'Background', self.to_rows(curr_bg_rows) curr_bg_rows = [] # Include this row in the current background curr_chrom = row['chromosome'] curr_bg_rows.append(row) elif gene == curr_gene_name: # Continue the current gene # Any "background" was intronic; include in the current gene if curr_bg_rows: curr_gene_rows.extend(curr_bg_rows) curr_bg_rows = [] # Add this row to the current gene curr_gene_rows.append(row) else: # New gene # Emit the last gene, if any if curr_gene_rows: yield curr_gene_name, self.to_rows(curr_gene_rows) # Emit the subsequent background, if any if curr_bg_rows: yield 'Background', self.to_rows(curr_bg_rows) # Reset BG trackers curr_bg_rows = [] # Start tracking the new gene curr_gene_rows = [row] curr_chrom = row['chromosome'] curr_gene_name = gene # Remainder if curr_gene_rows: yield curr_gene_name, self.to_rows(curr_gene_rows) if curr_bg_rows: yield 'Background', self.to_rows(curr_bg_rows)
[docs] def by_segment(self, segments): """Group cnarray rows by the segments that row midpoints land in. Returns an iterable of segments and rows grouped by overlap with each segment. Note that segments don't necessarily cover all probes (some near telo/centromeres may have been dropped as outliers during segmentation). These probes are grouped with the nearest segment, so the endpoint of the first/last probe may not match the corresponding segment endpoint. This is appropriate if the segments were obtained from this probe array. """ curr_probes = [] segments = iter(segments) curr_segment = next(segments) next_segment = None for row in self.data: probe_midpoint = 0.5 * (row['start'] + row['end']) if (row['chromosome'] == curr_segment['chromosome'] and curr_segment['start'] <= probe_midpoint <= curr_segment['end']): # Probe is within the current segment curr_probes.append(row) elif row['chromosome'] != curr_segment['chromosome']: # Probe should be at the start of the next chromosome. # Find the matching segment. if next_segment is None: next_segment = next(segments) # Skip any extra segments on the chromosome after the current # probes (e.g. probes are targets, trailing segments are based # on antitargets alone) while row['chromosome'] != next_segment['chromosome']: try: next_segment = next(segments) except StopIteration: raise ValueError("Segments are missing chromosome %r" % row['chromosome']) # Emit the current (completed) group yield curr_segment, self.to_rows(curr_probes) # Begin a new group of probes curr_segment, next_segment = next_segment, None curr_probes = [row] elif row['start'] < curr_segment['start']: # Probe is near the start of the current chromosome, but we've # already seen another outlier here (rare/nonexistent case). # Group with the current (upcoming) segment. curr_probes.append(row) elif row['end'] > curr_segment['end']: # Probe is at the end of an accessible region (e.g. p or q arm) # on the current chromosome. # Group this probe with whichever of the current or next # segments is closer. if next_segment is None: next_segment = next(segments) if (next_segment['chromosome'] != row['chromosome'] or (next_segment['start'] - probe_midpoint) > (probe_midpoint - curr_segment['end'])): # The current segment is closer than the next. Group here. curr_probes.append(row) else: # The next segment is closer. Emit the current group # Begin a new group of probes yield curr_segment, self.to_rows(curr_probes) # Reset/update trackers for the next group of probes curr_segment, next_segment = next_segment, None curr_probes = [row] else: raise ValueError("Mismatch between probes and segments\n" + "Probe: %s\nSegment: %s" % (row2label(row), row2label(curr_segment))) # Emit the remaining probes yield curr_segment, self.to_rows(curr_probes)
[docs] def center_all(self, mode=False): """Recenter coverage values to the autosomes' average (in-place).""" chr_x = core.guess_chr_x(self) chr_y = ('chrY' if chr_x.startswith('chr') else 'Y') mask_autosome = ((self.chromosome != chr_x) & (self.chromosome != chr_y)) mid = numpy.median(self.coverage[mask_autosome]) mask_cvg = (mask_autosome & (self.coverage >= mid - 1.1) & (self.coverage <= mid + 1.1)) if mode and sum(mask_cvg) > 210: # Estimate the mode from a smoothed histogram x = self.coverage[mask_cvg] w = self['weight'][mask_cvg] if 'weight' in self else None resn = int(round(numpy.sqrt(len(x)))) x_vals, x_edges = numpy.histogram(x, bins=8*resn, weights=w) xs = smoothing.smoothed(x_vals, resn) # DBG: Check the fit # ngfrills.echo("Centering: median", mid, # ", mode", x_edges[numpy.argmax(xs)], # ", resolution", x_edges[2] - x_edges[1]) # from matplotlib import pyplot # _fig, ax = pyplot.subplots() # ax.plot(x_vals, c='k', alpha=.5) # ax.plot(xs, c='r', lw=2) # median_idx = x_edges.searchsorted(mid) # ax.axvspan(median_idx - 1, median_idx, color='teal', alpha=.3) # ax.vlines(numpy.argmax(xs), *ax.get_ylim(), colors='salmon') # pyplot.show() # -- mid = x_edges[numpy.argmax(xs)] self.data['coverage'] -= mid
[docs] def copy(self): """Create an independent copy of this object.""" return self.to_rows(numpy.copy(self.data))
[docs] def drop_extra_columns(self): """Remove any optional columns from this CopyNumArray. Returns a new copy with only the core columns retained: log2 value, chromosome, start, end, bin name. """ rows = [tuple(row)[:5] for row in self.data] return self.__class__.from_rows(self.sample_id, rows)
[docs] def extend(self, other): """Combine this array's data with another CopyNumArray (in-place). Any optional columns must match between both arrays. """ assert isinstance(other, CopyNumArray), ( "Argument (type %s) is not a CopyNumArray instance" % type(other)) self.data = numpy.concatenate((self.data, other.data))
[docs] def in_range(self, chrom, start=0, end=None, trim=False): """Get the CopyNumArray portion within the given genomic range. If trim=True, include bins straddling the range boundaries, and trim the bins endpoints to the boundaries. """ rows = self.data[self.chromosome == chrom] if not len(rows): raise ValueError("Chromosome %s is not in this probe set" % chrom) if start: if trim: # Include all rows overlapping the start point rows = rows[rows['end'].searchsorted(start, 'right'):] # Update 5' endpoints to the boundary rows['start'][rows['start'] < start] = start else: # Only rows entirely after the start point rows = rows[rows['start'].searchsorted(start):] if end: if trim: rows = rows[:rows['start'].searchsorted(end)] # Update 3' endpoints to the boundary rows['end'][rows['end'] > end] = end else: rows = rows[:rows['end'].searchsorted(end, 'right')] return self.to_rows(rows)
[docs] def select(self, selector=None, **kwargs): """Take a subset of rows where the given condition is true. Arguments can be a function (lambda expression) returning a bool, which will be used to select True rows, and/or keyword arguments like gene="Background" or chromosome="chr7", which will select rows where the keyed field equals the specified value. """ outrows = self.data if selector is not None: assert callable(selector) condition = numpy.apply_along_axis(selector, 0, outrows) outrows = numpy.extract(condition, outrows) for key, val in kwargs.items(): assert key in self outrows = outrows[outrows[key] == val] return self.to_rows(outrows)
[docs] def shuffle(self): """Randomize the order of bins in this array (in-place).""" numpy.random.seed(0xA5EED) # For reproducible results order = numpy.arange(len(self.data)) numpy.random.shuffle(order) self.data = self.data[order] return order
[docs] def sort(self, key=None): """Sort the bins in this array (in-place). Optional argument 'key' is one of: - a function that computes a sorting key from a CopyNumArray row - a string identifier for an existing data column - a list/array/iterable of precomputed keys equal in length to the number of rows in this CopyNumArray. By default, bins are sorted by chromosomal coordinates. """ if key is None: # Sort by chrom, then by start position chrom_keys = list(map(core.sorter_chrom, self.data['chromosome'])) order = numpy.lexsort((self.data['start'], chrom_keys)) else: # Sort by the given key, using a stable sort algorithm if isinstance(key, basestring): keys = self.data[key] elif callable(key): keys = list(map(key, self.data)) else: if not len(key) == len(self): raise ValueError("Sort key, as an array, must have the " "same length as the CopyNumArray to sort " "(%d vs. %d)." % (len(key), len(self))) keys = key order = numpy.argsort(keys, kind='mergesort') self.data = self.data.take(order)
[docs] def squash_genes(self, ignore=('-', 'CGH'), squash_background=False, summary_stat=metrics.biweight_location): """Combine consecutive bins with the same targeted gene name. The `ignore` parameter lists bin names that not be counted as genes to be output. Parameter `summary_stat` is a function that summarizes an array of coverage values to produce the "squashed" gene's coverage value. By default this is the biweight location, but you might want median, mean, max, min or something else in some cases. Optional columns, if present, are dropped. """ def squash_rows(name, rows): """Combine multiple rows (for the same gene) into one row.""" chrom = core.check_unique(rows['chromosome'], 'chromosome') start = rows[0]['start'] end = rows[-1]['end'] cvg = summary_stat(rows['coverage']) outrow = [chrom, start, end, name, cvg] # Handle extra fields # ENH - no coverage stat; do weighted average as appropriate for xfield in ('gc', 'rmask', 'spread', 'weight'): if xfield in self: outrow.append(summary_stat(rows[xfield])) if 'probes' in self: outrow.append(sum(rows['probes'])) return tuple(outrow) outrows = [] for name, subarr in self.by_gene(ignore): if name == 'Background' and not squash_background: outrows.extend(subarr) else: outrows.append(squash_rows(name, subarr)) return self.to_rows(outrows)
[docs] def to_rows(self, rows): """Like from_rows, reusing this instance's metadata.""" return self.__class__.from_rows(self.sample_id, rows, self._xtra) # I/O
@classmethod
[docs] def read(cls, infile, sample_id=None): """Parse a tabular table of coverage data from a handle or filename. """ if sample_id is None: if isinstance(infile, basestring): sample_id = core.fbase(infile) else: sample_id = '<unknown>' with as_handle(infile) as handle: rows = _parse_lines(handle) try: xtra = next(rows) row_data = [next(rows)] row_data.extend(rows) except StopIteration: # Don't crash on empty files return cls(sample_id, [], [], [], [], []) return cls.from_rows(sample_id, row_data, xtra)
[docs] def write(self, outfile=sys.stdout): """Write coverage data to a file or handle in tabular format. This is similar to BED or BedGraph format, but with extra columns. To combine multiple samples in one file and/or convert to another format, see the 'export' subcommand. """ colnames = ['chromosome', 'start', 'end', 'gene', 'log2'] colnames.extend(self._xtra) rows = (list(map(str, row)) for row in self.data) with ngfrills.safe_write(outfile) as handle: header = '\t'.join(colnames) + '\n' handle.write(header) handle.writelines('\t'.join(row) + '\n' for row in rows)
def _parse_lines(handle): """Parse CopyNumArray rows from a text stream. Columns: label (skipped), coverage, chromosome, start, end, gene name [, extra] """ lines = iter(handle) # Handle the header header = next(lines).rstrip().split('\t') if header[4] == 'log2': # New format, BED-like xtra = tuple(header[5:]) if not xtra: @ngfrills.report_bad_line def parse_line(line): fields = line.rstrip().split('\t') chrom, start, end, gene, coverage = fields[:5] return chrom, int(start), int(end), gene, float(coverage) else: @ngfrills.report_bad_line def parse_line(line): fields = line.rstrip().split('\t') chrom, start, end, gene, coverage = fields[:5] outrow = [chrom, int(start), int(end), gene, float(coverage)] # Parse extra fields as numbers (common type: float) rest = list(map(float, fields[5:])) core.assert_equal("Number of extra columns parsed doesn't match " "extra fields given", **{"extra columns": len(rest), "extra fields": len(xtra)}) return tuple(outrow + rest) elif header[:2] == ['probe', 'log2']: # Old format, Nexus "basic" with initial "probe" field ngfrills.echo("Parsing a file in the old format") xtra = tuple(header[6:]) if not xtra: @ngfrills.report_bad_line def parse_line(line): fields = line.rstrip().split('\t') coverage, chrom, start, end, gene = fields[1:6] return chrom, int(start), int(end), gene, float(coverage) else: @ngfrills.report_bad_line def parse_line(line): fields = line.rstrip().split('\t') coverage, chrom, start, end, gene = fields[1:6] outrow = [chrom, int(start), int(end), gene, float(coverage)] # Parse extra fields as numbers (common type: float) rest = list(map(float, fields[6:])) core.assert_equal("Number of extra columns parsed doesn't match " "extra fields given", **{"extra columns": len(rest), "extra fields": len(xtra)}) return tuple(outrow + rest) else: raise ValueError("Unrecognized header: %s" % header) yield xtra for line in lines: yield parse_line(line)
[docs]def row2label(row): return "%s:%d-%d" % (row['chromosome'], row['start'], row['end'])