Source code for cnvlib.reference

"""Supporting functions for the 'reference' command."""
from __future__ import absolute_import, division

import numpy
from Bio._py3k import map, zip

from . import core, metrics, ngfrills, params
from .ngfrills import echo
from .cnarray import CopyNumArray as CNA, row2label


[docs]def bed2probes(bed_fname): """Create neutral-coverage probes from intervals.""" cn_rows = [(chrom, start, end, name, 0, 0, 0, 0) for chrom, start, end, name in ngfrills.parse_regions(bed_fname)] return CNA.from_rows(core.fbase(bed_fname), cn_rows, extra_keys=('gc', 'rmask', 'spread'))
[docs]def combine_probes(filenames, fa_fname, is_male_reference): """Calculate the median coverage of each bin across multiple samples. Input: List of .cnn files, as generated by 'coverage' or 'import-picard'. `fa_fname`: fil columns for GC and RepeatMasker genomic values. Returns: A single CopyNumArray summarizing the coverages of the input samples, including each bin's "average" coverage, "spread" of coverages, and genomic GC content. """ from cnvlib import fix # XXX kwargs = {} # Load coverage from target/antitarget files echo("Loading", filenames[0]) cnarr1 = CNA.read(filenames[0]) if not len(cnarr1): # Just create an empty array with the right columns extra_cols = ['spread'] if 'gc' in cnarr1 or fa_fname: extra_cols.append('gc') if fa_fname: extra_cols.append('rmask') return CNA("reference", extra_cols) # Calculate GC and RepeatMasker content for each probe's genomic region if fa_fname: gc, rmask = get_fasta_stats(cnarr1, fa_fname) kwargs['gc'] = gc kwargs['rmask'] = rmask elif 'gc' in cnarr1: # Reuse .cnn GC values if they're already stored (via import-picard) gc = cnarr1['gc'] kwargs['gc'] = gc else: echo("No FASTA reference genome provided; skipping GC, RM calculations") # Make the sex-chromosome coverages of male and female samples compatible chr_x = core.guess_chr_x(cnarr1) chr_y = ('chrY' if chr_x.startswith('chr') else 'Y') flat_coverage = core.expect_flat_cvg(cnarr1, is_male_reference) def shift_sex_chroms(cnarr): """Shift sample X and Y chromosomes to match the reference gender. Reference values: XY: chrX -1, chrY -1 XX: chrX 0, chrY -1 Plan: chrX: xx sample, xx ref: 0 (from 0) xx sample, xy ref: -= 1 (from -1) xy sample, xx ref: += 1 (from 0) +1 xy sample, xy ref: 0 (from -1) +1 chrY: xx sample, xx ref: = -1 (from -1) xx sample, xy ref: = -1 (from -1) xy sample, xx ref: 0 (from -1) +1 xy sample, xy ref: 0 (from -1) +1 """ is_xx = core.guess_xx(cnarr, chr_x=chr_x) cnarr['coverage'] += flat_coverage if is_xx: # chrX already OK # No chrY; it's all noise, so just match the male cnarr['coverage'][cnarr.chromosome == chr_y] = -1.0 else: # 1/2 #copies of each sex chromosome cnarr['coverage'][(cnarr.chromosome == chr_x) | (cnarr.chromosome == chr_y)] += 1.0 edge_sorter = fix.make_edge_sorter(cnarr1, params.INSERT_SIZE) def bias_correct_coverage(cnarr): """Perform bias corrections on the sample.""" cnarr.center_all() shift_sex_chroms(cnarr) if 'gc' in kwargs: echo("Correcting for GC bias...") cnarr = fix.center_by_window(cnarr, .1, kwargs['gc']) if 'rmask' in kwargs: echo("Correcting for RepeatMasker bias...") cnarr = fix.center_by_window(cnarr, .1, kwargs['rmask']) echo("Correcting for density bias...") cnarr = fix.center_by_window(cnarr, .1, edge_sorter) return cnarr.coverage # Pseudocount of 1 "flat" sample all_coverages = [flat_coverage, bias_correct_coverage(cnarr1)] for fname in filenames[1:]: echo("Loading target", fname) cnarrx = CNA.read(fname) # Bin information should match across all files if not (len(cnarr1) == len(cnarrx) and (cnarr1.chromosome == cnarrx.chromosome).all() and (cnarr1.start == cnarrx.start).all() and (cnarr1.end == cnarrx.end).all() and (cnarr1.gene == cnarrx.gene).all()): raise RuntimeError("%s probes do not match those in %s" % (fname, filenames[0])) all_coverages.append(bias_correct_coverage(cnarrx)) all_coverages = numpy.vstack(all_coverages) echo("Calculating average bin coverages") cvg_centers = numpy.apply_along_axis(metrics.biweight_location, 0, all_coverages) echo("Calculating bin spreads") spreads = numpy.apply_along_axis(metrics.biweight_midvariance, 0, all_coverages) kwargs['spread'] = spreads return CNA.from_columns("reference", chromosome=cnarr1.chromosome, start=cnarr1.start, end=cnarr1.end, gene=cnarr1.gene, coverage=cvg_centers, **kwargs)
[docs]def warn_bad_probes(probes): """Warn about target probes where coverage is poor. Prints a formatted table to stderr. """ bad_probes = probes[mask_bad_probes(probes)] fg_index = (bad_probes['gene'] != 'Background') fg_bad_probes = bad_probes[fg_index] if len(fg_bad_probes) > 0: bad_pct = 100 * len(fg_bad_probes) / sum(probes['gene'] != 'Background') echo("*WARNING*", len(fg_bad_probes), "targets", "(%.4f)" % bad_pct + '%', "failed filters:") gene_cols = max(map(len, fg_bad_probes['gene'])) labels = list(map(row2label, fg_bad_probes)) chrom_cols = max(map(len, labels)) last_gene = None for label, probe in zip(labels, fg_bad_probes): if probe['gene'] == last_gene: gene = ' "' else: gene = probe['gene'] last_gene = gene if 'rmask' in probes: print(" %s %s coverage=%.3f spread=%.3f rmask=%.3f" % (gene.ljust(gene_cols), label.ljust(chrom_cols), probe['coverage'], probe['spread'], probe['rmask'])) else: print(" %s %s coverage=%.3f spread=%.3f" % (gene.ljust(gene_cols), label.ljust(chrom_cols), probe['coverage'], probe['spread'])) # Count the number of BG probes dropped, too (names are all "Background") bg_bad_probes = bad_probes[~fg_index] if len(bg_bad_probes) > 0: bad_pct = 100 * len(bg_bad_probes) / sum(probes['gene'] == 'Background') echo("Antitargets:", len(bg_bad_probes), "(%.4f)" % bad_pct + '%', "failed filters")
[docs]def mask_bad_probes(probes): """Flag the probes with excessively low or inconsistent coverage. Returns a bool array where True indicates probes that failed the checks. """ mask = ((probes['coverage'] < params.MIN_BIN_COVERAGE) | (probes['spread'] > params.MAX_BIN_SPREAD)) if 'rmask' in probes: mask |= (probes['rmask'] > params.MAX_REPEAT_FRACTION) return mask
[docs]def get_fasta_stats(probes, fa_fname): """Calculate GC and RepeatMasker content of each bin in the FASTA genome.""" ngfrills.ensure_fasta_index(fa_fname) fa_coords = zip(probes.chromosome, probes.start, probes.end) echo("Calculating GC and RepeatMasker content in", fa_fname, "...") gc_rm_vals = [calculate_gc_lo(subseq) for subseq in ngfrills.fasta_extract_regions(fa_fname, fa_coords)] gc_vals, rm_vals = zip(*gc_rm_vals) return numpy.asfarray(gc_vals), numpy.asfarray(rm_vals)
[docs]def calculate_gc_lo(subseq): """Calculate the GC and lowercase (RepeatMasked) content of a string.""" cnt_at_lo = subseq.count('a') + subseq.count('t') cnt_at_up = subseq.count('A') + subseq.count('T') cnt_gc_lo = subseq.count('g') + subseq.count('c') cnt_gc_up = subseq.count('G') + subseq.count('C') tot = float(cnt_gc_up + cnt_gc_lo + cnt_at_up + cnt_at_lo) if not tot: return 0.0, 0.0 frac_gc = (cnt_gc_lo + cnt_gc_up) / tot frac_lo = (cnt_at_lo + cnt_gc_lo) / tot return frac_gc, frac_lo
[docs]def reference2regions(reference, coord_only=False): """Extract iterables of target and antitarget regions from a reference CNA. Like loading two BED files with ngfrills.parse_regions. """ cna2rows = (_cna2coords if coord_only else _cna2regions) return map(cna2rows, _ref_split_targets(reference))
def _cna2coords(cnarr): """Extract the coordinate columns from a CopyNumberArray""" return zip(cnarr['chromosome'], cnarr['start'], cnarr['end']) def _cna2regions(cnarr): """Extract the region columns (including genes) from a CopyNumberArray""" return zip(cnarr['chromosome'], cnarr['start'], cnarr['end'], cnarr['gene']) def _ref_split_targets(ref_arr): """Split reference into 2 sub-arrays of targets/antitargets.""" is_bg = (ref_arr.gene == 'Background') targets = ref_arr[~is_bg] antitargets = ref_arr[is_bg] return targets, antitargets