Source code for cnvlib.reference

"""Supporting functions for the 'reference' command."""
from __future__ import absolute_import, division, print_function
from builtins import map, zip

import logging

import numpy as np
import pyfaidx

from . import core, fix, descriptives, params, tabio
from .cnary import CopyNumArray as CNA
from .genome import GenomicArray as GA


[docs]def do_reference(target_fnames, antitarget_fnames=None, fa_fname=None, male_reference=False, female_samples=None, do_gc=True, do_edge=True, do_rmask=True): """Compile a coverage reference from the given files (normal samples).""" if antitarget_fnames: core.assert_equal("Unequal number of target and antitarget files given", targets=len(target_fnames), antitargets=len(antitarget_fnames)) if not fa_fname: logging.info("No FASTA reference genome provided; " "skipping GC, RM calculations") # Calculate & save probe centers ref_probes = combine_probes(target_fnames, fa_fname, male_reference, female_samples, True, do_gc, do_edge, False) if antitarget_fnames: ref_probes.add(combine_probes(antitarget_fnames, fa_fname, male_reference, female_samples, False, do_gc, False, do_rmask)) ref_probes.center_all(skip_low=True) ref_probes.sort_columns() warn_bad_probes(ref_probes) return ref_probes
[docs]def do_reference_flat(targets, antitargets=None, fa_fname=None, male_reference=False): """Compile a neutral-coverage reference from the given intervals. Combines the intervals, shifts chrX values if requested, and calculates GC and RepeatMasker content from the genome FASTA sequence. """ ref_probes = bed2probes(targets) if antitargets: ref_probes.add(bed2probes(antitargets)) # Set sex chromosomes by "reference" sex ref_probes['log2'] = ref_probes.expect_flat_log2(male_reference) ref_probes['depth'] = np.exp2(ref_probes['log2']) # Shim # Calculate GC and RepeatMasker content for each probe's genomic region if fa_fname: gc, rmask = get_fasta_stats(ref_probes, fa_fname) ref_probes['gc'] = gc ref_probes['rmask'] = rmask # warn_bad_probes(ref_probes) else: logging.info("No FASTA reference genome provided; " "skipping GC, RM calculations") ref_probes.sort_columns() return ref_probes
[docs]def bed2probes(bed_fname): """Create neutral-coverage probes from intervals.""" regions = tabio.read_auto(bed_fname) table = regions.data.loc[:, ("chromosome", "start", "end")] table["gene"] = (regions.data["gene"] if "gene" in regions.data else '-') table["log2"] = 0.0 table["spread"] = 0.0 return CNA(table, {"sample_id": core.fbase(bed_fname)})
[docs]def combine_probes(filenames, fa_fname, is_male_reference, is_female_sample, skip_low, fix_gc, fix_edge, fix_rmask): """Calculate the median coverage of each bin across multiple samples. Parameters ---------- filenames : list List of string filenames, corresponding to targetcoverage.cnn and antitargetcoverage.cnn files, as generated by 'coverage' or 'import-picard'. fa_fname : str Reference genome sequence in FASTA format, used to extract GC and RepeatMasker content of each genomic bin. is_male_reference : bool skip_low : bool fix_gc : bool fix_edge : bool fix_rmask : bool Returns ------- CopyNumArray One object summarizing the coverages of the input samples, including each bin's "average" coverage, "spread" of coverages, and GC content. """ columns = {} # Load coverage from target/antitarget files logging.info("Loading %s", filenames[0]) cnarr1 = tabio.read_cna(filenames[0]) if not len(cnarr1): # Just create an empty array with the right columns col_names = ['chromosome', 'start', 'end', 'gene', 'log2', 'depth'] if 'gc' in cnarr1 or fa_fname: col_names.append('gc') if fa_fname: col_names.append('rmask') col_names.append('spread') return CNA.from_rows([], col_names, {'sample_id': "reference"}) # Calculate GC and RepeatMasker content for each probe's genomic region if fa_fname and (fix_rmask or fix_gc): gc, rmask = get_fasta_stats(cnarr1, fa_fname) if fix_gc: columns['gc'] = gc if fix_rmask: columns['rmask'] = rmask elif 'gc' in cnarr1 and fix_gc: # Reuse .cnn GC values if they're already stored (via import-picard) gc = cnarr1['gc'] columns['gc'] = gc # Make the sex-chromosome coverages of male and female samples compatible is_chr_x = (cnarr1.chromosome == cnarr1._chr_x_label) is_chr_y = (cnarr1.chromosome == cnarr1._chr_y_label) flat_coverage = cnarr1.expect_flat_log2(is_male_reference) def shift_sex_chroms(cnarr): """Shift sample X and Y chromosomes to match the reference sex. Reference values: XY: chrX -1, chrY -1 XX: chrX 0, chrY -1 Plan: chrX: xx sample, xx ref: 0 (from 0) xx sample, xy ref: -= 1 (from -1) xy sample, xx ref: += 1 (from 0) +1 xy sample, xy ref: 0 (from -1) +1 chrY: xx sample, xx ref: = -1 (from -1) xx sample, xy ref: = -1 (from -1) xy sample, xx ref: 0 (from -1) +1 xy sample, xy ref: 0 (from -1) +1 """ is_xx = (cnarr.guess_xx() if is_female_sample is None else is_female_sample) cnarr['log2'] += flat_coverage if is_xx: # chrX has same ploidy as autosomes; chrY is just unusable noise cnarr[is_chr_y, 'log2'] = -1.0 # np.nan is worse else: # 1/2 #copies of each sex chromosome cnarr[is_chr_x | is_chr_y, 'log2'] += 1.0 edge_bias = fix.get_edge_bias(cnarr1, params.INSERT_SIZE) def bias_correct_coverage(cnarr): """Perform bias corrections on the sample.""" cnarr.center_all(skip_low=skip_low) shift_sex_chroms(cnarr) # Skip bias corrections if most bins have no coverage (e.g. user error) if (cnarr['log2'] > params.NULL_LOG2_COVERAGE - params.MIN_REF_COVERAGE ).sum() <= len(cnarr) // 2: logging.warn("WARNING: most bins have no or very low coverage; " "check that the right BED file was used") else: if 'gc' in columns and fix_gc: logging.info("Correcting for GC bias...") cnarr = fix.center_by_window(cnarr, .1, columns['gc']) if 'rmask' in columns and fix_rmask: logging.info("Correcting for RepeatMasker bias...") cnarr = fix.center_by_window(cnarr, .1, columns['rmask']) if fix_edge: logging.info("Correcting for density bias...") cnarr = fix.center_by_window(cnarr, .1, edge_bias) return cnarr['log2'] # Pseudocount of 1 "flat" sample all_depths = [cnarr1['depth'] if 'depth' in cnarr1 else np.exp2(cnarr1['log2'])] all_coverages = [flat_coverage, bias_correct_coverage(cnarr1)] for fname in filenames[1:]: logging.info("Loading target %s", fname) cnarrx = tabio.read_cna(fname) # Bin information should match across all files if not (len(cnarr1) == len(cnarrx) and (cnarr1.chromosome == cnarrx.chromosome).all() and (cnarr1.start == cnarrx.start).all() and (cnarr1.end == cnarrx.end).all() and (cnarr1['gene'] == cnarrx['gene']).all()): raise RuntimeError("%s probes do not match those in %s" % (fname, filenames[0])) all_depths.append(cnarrx['depth'] if 'depth' in cnarrx else np.exp2(cnarrx['log2'])) all_coverages.append(bias_correct_coverage(cnarrx)) all_coverages = np.vstack(all_coverages) logging.info("Calculating average bin coverages") cvg_centers = np.apply_along_axis(descriptives.biweight_location, 0, all_coverages) depth_centers = np.apply_along_axis(descriptives.biweight_location, 0, np.vstack(all_depths)) logging.info("Calculating bin spreads") spreads = np.asarray([descriptives.biweight_midvariance(a, initial=i) for a, i in zip(all_coverages.T, cvg_centers)]) columns.update({ 'chromosome': cnarr1.chromosome, 'start': cnarr1.start, 'end': cnarr1.end, 'gene': cnarr1['gene'], 'log2': cvg_centers, 'depth': depth_centers, 'spread': spreads, }) return CNA.from_columns(columns, {'sample_id': "reference"})
[docs]def warn_bad_probes(probes, max_name_width=50): """Warn about target probes where coverage is poor. Prints a formatted table to stderr. """ bad_probes = probes[fix.mask_bad_bins(probes)] fg_index = (bad_probes['gene'] != 'Background') fg_bad_probes = bad_probes[fg_index] if len(fg_bad_probes) > 0: bad_pct = 100 * len(fg_bad_probes) / sum(probes['gene'] != 'Background') logging.info("Targets: %d (%s) bins failed filters:", len(fg_bad_probes), "%.4f" % bad_pct + '%') gene_cols = min(max_name_width, max(map(len, fg_bad_probes['gene']))) labels = list(map(CNA.row2label, fg_bad_probes)) chrom_cols = max(map(len, labels)) last_gene = None for label, probe in zip(labels, fg_bad_probes): if probe.gene == last_gene: gene = ' "' else: gene = probe.gene last_gene = gene if len(gene) > max_name_width: gene = gene[:max_name_width-3] + '...' if 'rmask' in probes: logging.info(" %s %s log2=%.3f spread=%.3f rmask=%.3f", gene.ljust(gene_cols), label.ljust(chrom_cols), probe.log2, probe.spread, probe.rmask) else: logging.info(" %s %s log2=%.3f spread=%.3f", gene.ljust(gene_cols), label.ljust(chrom_cols), probe.log2, probe.spread) # Count the number of BG probes dropped, too (names are all "Background") bg_bad_probes = bad_probes[~fg_index] if len(bg_bad_probes) > 0: bad_pct = 100 * len(bg_bad_probes) / sum(probes['gene'] == 'Background') logging.info("Antitargets: %d (%s) bins failed filters", len(bg_bad_probes), "%.4f" % bad_pct + '%')
[docs]def get_fasta_stats(cnarr, fa_fname): """Calculate GC and RepeatMasker content of each bin in the FASTA genome.""" logging.info("Calculating GC and RepeatMasker content in %s ...", fa_fname) gc_rm_vals = [calculate_gc_lo(subseq) for subseq in fasta_extract_regions(fa_fname, cnarr)] gc_vals, rm_vals = zip(*gc_rm_vals) return np.asfarray(gc_vals), np.asfarray(rm_vals)
[docs]def calculate_gc_lo(subseq): """Calculate the GC and lowercase (RepeatMasked) content of a string.""" cnt_at_lo = subseq.count('a') + subseq.count('t') cnt_at_up = subseq.count('A') + subseq.count('T') cnt_gc_lo = subseq.count('g') + subseq.count('c') cnt_gc_up = subseq.count('G') + subseq.count('C') tot = float(cnt_gc_up + cnt_gc_lo + cnt_at_up + cnt_at_lo) if not tot: return 0.0, 0.0 frac_gc = (cnt_gc_lo + cnt_gc_up) / tot frac_lo = (cnt_at_lo + cnt_gc_lo) / tot return frac_gc, frac_lo
[docs]def fasta_extract_regions(fa_fname, intervals): """Extract an iterable of regions from an indexed FASTA file. Input: FASTA file name; iterable of (seq_id, start, end) (1-based) Output: iterable of string sequences. """ with pyfaidx.Fasta(fa_fname, as_raw=True) as fa_file: for chrom, subarr in intervals.by_chromosome(): logging.info("Extracting sequences from chromosome %s", chrom) for _chrom, start, end in subarr.coords(): yield fa_file[_chrom][start.item():end.item()]
[docs]def reference2regions(refarr): """Split reference into target and antitarget regions.""" is_bg = (refarr['gene'] == 'Background') regions = GA(refarr.data.loc[:, ('chromosome', 'start', 'end', 'gene')], {'sample_id': 'reference'}) targets = regions[~is_bg] antitargets = regions[is_bg] return targets, antitargets