Source code for cnvlib.cnary

"""CNVkit's core data structure, a copy number array."""
from __future__ import print_function, absolute_import, division
from builtins import map
from past.builtins import basestring

import logging

import numpy as np
import pandas as pd
from scipy.stats import median_test
from skgenome import GenomicArray

from . import core, descriptives, params, smoothing
from .segmetrics import segment_mean


[docs]class CopyNumArray(GenomicArray): """An array of genomic intervals, treated like aCGH probes. Required columns: chromosome, start, end, gene, log2 Optional columns: gc, rmask, spread, weight, probes """ _required_columns = ("chromosome", "start", "end", "gene", "log2") _required_dtypes = (str, int, int, str, float) # ENH: make gene optional # Extra columns, in order: # "depth", "gc", "rmask", "spread", "weight", "probes" def __init__(self, data_table, meta_dict=None): GenomicArray.__init__(self, data_table, meta_dict) @property def log2(self): return self.data["log2"] @log2.setter def log2(self, value): self.data["log2"] = value @property def _chr_x_label(self): if 'chr_x' in self.meta: return self.meta['chr_x'] if len(self): chr_x = ('chrX' if self.chromosome.iat[0].startswith('chr') else 'X') self.meta['chr_x'] = chr_x return chr_x return '' @property def _chr_y_label(self): if 'chr_y' in self.meta: return self.meta['chr_y'] if len(self): chr_y = ('chrY' if self._chr_x_label.startswith('chr') else 'Y') self.meta['chr_y'] = chr_y return chr_y return '' # More meta to store: # is_sample_male = bool # is_reference_male = bool # * invalidate 'chr_x' if .chromosome/['chromosome'] is set # Traversal
[docs] def by_gene(self, ignore=params.IGNORE_GENE_NAMES): """Iterate over probes grouped by gene name. Group each series of intergenic bins as an "Antitarget" gene; any "Antitarget" bins within a gene are grouped with that gene. Bins' gene names are split on commas to accommodate overlapping genes and bins that cover multiple genes. Parameters ---------- ignore : list or tuple of str Gene names to treat as "Antitarget" bins instead of real genes, grouping these bins with the surrounding gene or intergenic region. These bins will still retain their name in the output. Yields ------ tuple Pairs of: (gene name, CNA of rows with same name) """ ignore += params.ANTITARGET_ALIASES start_idx = end_idx = None for _chrom, subgary in self.by_chromosome(): prev_idx = 0 for gene, gene_idx in subgary._get_gene_map().items(): if gene not in ignore: if not len(gene_idx): logging.warning("Specified gene name somehow missing: " "%s", gene) continue start_idx = gene_idx[0] end_idx = gene_idx[-1] + 1 if prev_idx < start_idx: # Include intergenic regions yield params.ANTITARGET_NAME, subgary.as_dataframe( subgary.data.iloc[prev_idx:start_idx]) yield gene, subgary.as_dataframe( subgary.data.iloc[start_idx:end_idx]) prev_idx = end_idx if prev_idx < len(subgary) - 1: # Include the telomere yield params.ANTITARGET_NAME, subgary.as_dataframe( subgary.data.iloc[prev_idx:])
# Manipulation
[docs] def center_all(self, estimator=pd.Series.median, by_chrom=True, skip_low=False, verbose=False): """Re-center log2 values to the autosomes' average (in-place). Parameters ---------- estimator : str or callable Function to estimate central tendency. If a string, must be one of 'mean', 'median', 'mode', 'biweight' (for biweight location). Median by default. skip_low : bool Whether to drop very-low-coverage bins (via `drop_low_coverage`) before estimating the center value. by_chrom : bool If True, first apply `estimator` to each chromosome separately, then apply `estimator` to the per-chromosome values, to reduce the impact of uneven targeting or extreme aneuploidy. Otherwise, apply `estimator` to all log2 values directly. """ est_funcs = { "mean": pd.Series.mean, "median": pd.Series.median, "mode": descriptives.modal_location, "biweight": descriptives.biweight_location, } if isinstance(estimator, basestring): if estimator in est_funcs: estimator = est_funcs[estimator] else: raise ValueError("Estimator must be a function or one of: %s" % ", ".join(map(repr, est_funcs))) cnarr = (self.drop_low_coverage(verbose=verbose) if skip_low else self ).autosomes() if cnarr: if by_chrom: values = pd.Series([estimator(subarr['log2']) for _c, subarr in cnarr.by_chromosome() if len(subarr)]) else: values = cnarr['log2'] shift = -estimator(values) if verbose: logging.info("Shifting log2 values by %f", shift) self.data['log2'] += shift
[docs] def drop_low_coverage(self, verbose=False): """Drop bins with extremely low log2 coverage or copy ratio values. These are generally bins that had no reads mapped due to sample-specific issues. A very small log2 ratio or coverage value may have been substituted to avoid domain or divide-by-zero errors. """ min_cvg = params.NULL_LOG2_COVERAGE - params.MIN_REF_COVERAGE drop_idx = self.data['log2'] < min_cvg if 'depth' in self: drop_idx |= self.data['depth'] == 0 if verbose and drop_idx.any(): logging.info("Dropped %d low-coverage bins", drop_idx.sum()) return self[~drop_idx]
[docs] def squash_genes(self, summary_func=descriptives.biweight_location, squash_antitarget=False, ignore=params.IGNORE_GENE_NAMES): """Combine consecutive bins with the same targeted gene name. Parameters ---------- summary_func : callable Function to summarize an array of log2 values to produce a new log2 value for a "squashed" (i.e. reduced) region. By default this is the biweight location, but you might want median, mean, max, min or something else in some cases. squash_antitarget : bool If True, also reduce consecutive "Antitarget" bins into a single bin. Otherwise, keep "Antitarget" and ignored bins as they are in the output. ignore : list or tuple of str Bin names to be treated as "Antitarget" instead of as unique genes. Return ------ CopyNumArray Another, usually smaller, copy of `self` with each gene's bins reduced to a single bin with appropriate values. """ def squash_rows(name, rows): """Combine multiple rows (for the same gene) into one row.""" if len(rows) == 1: return tuple(rows.iloc[0]) chrom = core.check_unique(rows.chromosome, 'chromosome') start = rows.start.iat[0] end = rows.end.iat[-1] cvg = summary_func(rows.log2) outrow = [chrom, start, end, name, cvg] # Handle extra fields # ENH - no coverage stat; do weighted average as appropriate for xfield in ('depth', 'gc', 'rmask', 'spread', 'weight'): if xfield in self: outrow.append(summary_func(rows[xfield])) if 'probes' in self: outrow.append(sum(rows['probes'])) return tuple(outrow) outrows = [] for name, subarr in self.by_gene(ignore): if name in params.ANTITARGET_ALIASES and not squash_antitarget: outrows.extend(subarr.data.itertuples(index=False)) else: outrows.append(squash_rows(name, subarr.data)) return self.as_rows(outrows)
# Chromosomal sex
[docs] def shift_xx(self, male_reference=False, is_xx=None): """Adjust chrX log2 ratios to match the ploidy of the reference sex. I.e. add 1 to chrX log2 ratios for a male sample vs. female reference, or subtract 1 for a female sample vs. male reference, so that chrX log2 values are comparable across samples with different chromosomal sex. """ outprobes = self.copy() if is_xx is None: is_xx = self.guess_xx(male_reference=male_reference) if is_xx and male_reference: # Female: divide X coverages by 2 (in log2: subtract 1) outprobes[outprobes.chromosome == self._chr_x_label, 'log2'] -= 1.0 # Male: no change elif not is_xx and not male_reference: # Male: multiply X coverages by 2 (in log2: add 1) outprobes[outprobes.chromosome == self._chr_x_label, 'log2'] += 1.0 # Female: no change return outprobes
[docs] def guess_xx(self, male_reference=False, verbose=True): """Detect chromosomal sex; return True if a sample is probably female. Uses `compare_sex_chromosomes` to calculate coverage ratios of the X and Y chromosomes versus autosomes. Parameters ---------- male_reference : bool Was this sample normalized to a male reference copy number profile? verbose : bool If True, print (i.e. log to console) the ratios of the log2 coverages of the X and Y chromosomes versus autosomes, the "maleness" ratio of male vs. female expectations for each sex chromosome, and the inferred chromosomal sex. Returns ------- bool True if the coverage ratios indicate the sample is female. """ is_xy, stats = self.compare_sex_chromosomes(male_reference) if is_xy is None: return None elif verbose: logging.info("Relative log2 coverage of %s=%.3g, %s=%.3g " "(maleness=%.3g x %.3g = %.3g) --> assuming %s", self._chr_x_label, stats['chrx_ratio'], self._chr_y_label, stats['chry_ratio'], stats['chrx_male_lr'], stats['chry_male_lr'], stats['chrx_male_lr'] * stats['chry_male_lr'], 'male' if is_xy else 'female') return ~is_xy
[docs] def compare_sex_chromosomes(self, male_reference=False, skip_low=False): """Compare coverage ratios of sex chromosomes versus autosomes. Perform 4 Mood's median tests of the log2 coverages on chromosomes X and Y, separately shifting for assumed male and female chromosomal sex. Compare the chi-squared values obtained to infer whether the male or female assumption fits the data better. Parameters ---------- male_reference : bool Whether a male reference copy number profile was used to normalize the data. If so, a male sample should have log2 values of 0 on X and Y, and female +1 on X, deep negative (below -3) on Y. Otherwise, a male sample should have log2 values of -1 on X and 0 on Y, and female 0 on X, deep negative (below -3) on Y. skip_low : bool If True, drop very-low-coverage bins (via `drop_low_coverage`) before comparing log2 coverage ratios. Included for completeness, but shouldn't affect the result much since the M-W test is nonparametric and p-values are not used here. Returns ------- bool True if the sample appears male. dict Calculated values used for the inference: relative log2 ratios of chromosomes X and Y versus the autosomes; the Mann-Whitney U values from each test; and ratios of U values for male vs. female assumption on chromosomes X and Y. """ if not len(self): return None, {} chrx = self[self.chromosome == self._chr_x_label] if not len(chrx): logging.warning("No %s found in sample; is the input truncated?", self._chr_x_label) return None, {} auto = self.autosomes() if skip_low: chrx = chrx.drop_low_coverage() auto = auto.drop_low_coverage() auto_l = auto['log2'].values use_weight = ('weight' in self) auto_w = auto['weight'].values if use_weight else None def compare_to_auto(vals, weights): # Mood's median test stat is chisq -- near 0 for similar median try: stat, _p, _med, cont = median_test(auto_l, vals, ties='ignore', lambda_='log-likelihood') except ValueError: # "All values are below the grand median (0.0)" stat = None else: if stat == 0 and 0 in cont: stat = None # In case Mood's test failed for either sex if use_weight: med_diff = abs(descriptives.weighted_median(auto_l, auto_w) - descriptives.weighted_median(vals, weights)) else: med_diff = abs(np.median(auto_l) - np.median(vals)) return (stat, med_diff) def compare_chrom(vals, weights, female_shift, male_shift): """Calculate "maleness" ratio of test statistics. The ratio is of the female vs. male chi-square test statistics from the median test. If the median test fails for either sex, (due to flat/trivial input), use the ratio of the absolute difference in medians. """ female_stat, f_diff = compare_to_auto(vals + female_shift, weights) male_stat, m_diff = compare_to_auto(vals + male_shift, weights) # Statistic is smaller for similar-median sets if female_stat is not None and male_stat is not None: return female_stat / max(male_stat, 0.01) # Difference in medians is also smaller for similar-median sets return f_diff / max(m_diff, 0.01) female_x_shift, male_x_shift = (-1, 0) if male_reference else (0, +1) chrx_male_lr = compare_chrom(chrx['log2'].values, (chrx['weight'].values if use_weight else None), female_x_shift, male_x_shift) combined_score = chrx_male_lr # Similar for chrY if it's present chry = self[self.chromosome == self._chr_y_label] if len(chry): chry_male_lr = compare_chrom(chry['log2'].values, (chry['weight'].values if use_weight else None), +3, 0) if np.isfinite(chry_male_lr): combined_score *= chry_male_lr else: # If chrY is missing, don't sabotage the inference chry_male_lr = np.nan # Relative log2 values, for convenient reporting auto_mean = segment_mean(auto, skip_low=skip_low) chrx_mean = segment_mean(chrx, skip_low=skip_low) chry_mean = segment_mean(chry, skip_low=skip_low) return (combined_score > 1.0, dict(chrx_ratio=chrx_mean - auto_mean, chry_ratio=chry_mean - auto_mean, combined_score=combined_score, # For debugging, mainly chrx_male_lr=chrx_male_lr, chry_male_lr=chry_male_lr, ))
[docs] def expect_flat_log2(self, is_male_reference=None): """Get the uninformed expected copy ratios of each bin. Create an array of log2 coverages like a "flat" reference. This is a neutral copy ratio at each autosome (log2 = 0.0) and sex chromosomes based on whether the reference is male (XX or XY). """ if is_male_reference is None: is_male_reference = not self.guess_xx(verbose=False) cvg = np.zeros(len(self), dtype=np.float_) if is_male_reference: # Single-copy X, Y idx = ((self.chromosome == self._chr_x_label).values | (self.chromosome == self._chr_y_label).values) else: # Y will be all noise, so replace with 1 "flat" copy idx = (self.chromosome == self._chr_y_label).values cvg[idx] = -1.0 return cvg
# Reporting
[docs] def residuals(self, segments=None): """Difference in log2 value of each bin from its segment mean. Parameters ---------- segments : GenomicArray, CopyNumArray, or None Determines the "mean" value to which `self` log2 values are relative: - If CopyNumArray, use the log2 values as the segment means to subtract. - If GenomicArray with no log2 values, group `self` by these ranges and subtract each group's median log2 value. - If None, subtract each chromosome's median. Returns ------- array Residual log2 values from `self` relative to `segments`; same length as `self`. """ if not segments: resids = [subcna.log2 - subcna.log2.median() for _chrom, subcna in self.by_chromosome()] elif "log2" in segments: resids = [subcna.log2 - seg.log2 for seg, subcna in self.by_ranges(segments)] else: resids = [subcna.log2 - subcna.log2.median() for _seg, subcna in self.by_ranges(segments)] return np.concatenate(resids) if resids else np.array([])
[docs] def smoothed(self, window=None, by_arm=True): """Smooth log2 values with a sliding window. Account for chromosome and (optionally) centromere boundaries. Use bin weights if present. Returns ------- array Smoothed log2 values from `self`, the same length as `self`. """ if by_arm: parts = self.by_arm() else: parts = self.by_chromosome() if 'weight' in self: out = [smoothing.savgol(subcna['log2'], window, weights=subcna['weight']) for _chrom, subcna in parts] else: out = [smoothing.savgol(subcna['log2'], window) for _chrom, subcna in parts] return np.concatenate(out)
def _guess_average_depth(self, segments=None, window=100): """Estimate the effective average read depth from variance. Assume read depths are Poisson distributed, converting log2 values to absolute counts. Then the mean depth equals the variance , and the average read depth is the estimated mean divided by the estimated variance. Use robust estimators (Tukey's biweight location and midvariance) to compensate for outliers and overdispersion. With `segments`, take the residuals of this array's log2 values from those of the segments to remove the confounding effect of real CNVs. If `window` is an integer, calculate and subtract a smoothed trendline to remove the effect of CNVs without segmentation (skipped if `segments` are given). See: http://www.evanmiller.org/how-to-read-an-unlabeled-sales-chart.html """ # Try to drop allosomes cnarr = self.autosomes() if not len(cnarr): cnarr = self # Remove variations due to real/likely CNVs y_log2 = cnarr.residuals(segments) if segments is None and window: y_log2 -= smoothing.savgol(y_log2, window) # Guess Poisson parameter from absolute-scale values y = np.exp2(y_log2) # ENH: use weight argument to these stats loc = descriptives.biweight_location(y) spread = descriptives.biweight_midvariance(y, loc) if spread > 0: return loc / spread**2 return loc