"""An array of genomic intervals, treated as variant loci."""
from __future__ import absolute_import, division, print_function
from builtins import map, next, str, zip
from past.builtins import basestring
import numpy as np
import pandas as pd
from . import gary
[docs]class VariantArray(gary.GenomicArray):
"""An array of genomic intervals, treated as variant loci.
Required columns: chromosome, start, end, ref, alt
"""
_required_columns = ("chromosome", "start", "end", "ref", "alt")
_required_dtypes = (str, int, int, str, str)
# Extra: somatic, zygosity, depth, alt_count, alt_freq
def __init__(self, data_table, meta_dict=None):
gary.GenomicArray.__init__(self, data_table, meta_dict)
[docs] def baf_by_ranges(self, ranges, summary_func=pd.Series.median,
above_half=None, tumor_boost=True):
"""Aggregate variant (b-allele) frequencies in each given bin.
Get the average BAF in each of the bins of another genomic array:
BAFs are mirrored (see `mirrored_baf`), grouped in each bin of `ranges`,
and summarized with `summary_func`, by default the median.
Parameters
----------
ranges : GenomicArray or subclass
Bins for grouping the variants in `self`.
summary_func : callable
Function to reduce BAF values to one number; by default the mirrored
BAF median.
above_half : bool
The same as in `mirrored_baf`.
tumor_boost : bool
The same as in `mirrored_baf`.
Returns
-------
float array
Average b-allele frequency in each range; same length as `ranges`.
May contain NaN values where no variants overlap a range.
"""
if tumor_boost and "n_alt_freq" in self:
self = self.copy()
self["alt_freq"] = self.tumor_boost()
return np.asarray([
(summary_func(_mirrored_baf(subvarr["alt_freq"], above_half))
if len(subvarr) else np.nan)
for _bin, subvarr in self.by_ranges(ranges, mode='outer',
keep_empty=True)])
[docs] def heterozygous(self, min_freq=None, max_freq=None):
"""Subset to only heterozygous variants.
If `min_freq` and `max_freq` are not specified, use "zygosity" genotype
values, excludes variants with value 0.0 or 1.0.
Parameters
----------
min_freq : float
Return only variants with alt allele frequency of at least this
value.
max_freq : float
Return only variants with alt allele frequency of at most this
value.
Returns
-------
VariantArray
The subset of `self` with heterozygous genotype, or allele frequency
between the specified thresholds.
"""
idx_het = None
if 'zygosity' in self and min_freq is None and max_freq is None:
# Use existing genotype/zygosity info
zygosity = self['n_zygosity' if 'n_zygosity' in self
else 'zygosity']
idx_het = (zygosity != 0.0) & (zygosity != 1.0)
elif 'alt_freq' in self and (min_freq or max_freq):
# Decide zygosity from allele frequency
freq = self['n_alt_freq' if 'n_alt_freq' in self
else 'alt_freq']
idx_het = True
if min_freq:
idx_het = idx_het & (freq >= min_freq)
if max_freq:
idx_het = idx_het & (freq <= max_freq)
if idx_het is not None and idx_het.any():
return self[idx_het]
else:
# Fallback -- ought to return something
return self
[docs] def mirrored_baf(self, above_half=None, tumor_boost=False):
"""Mirrored B-allele frequencies (BAFs).
Parameters
----------
above_half : bool or None
If specified, flip BAFs to be all above 0.5 (True) or below 0.5
(False), respectively, for consistency. Otherwise, if None, mirror
in the direction of the majority of BAFs.
tumor_boost : bool
Normalize tumor-sample allele frequencies to the matched normal
sample's allele frequencies.
Returns
-------
float array
Mirrored b-allele frequencies, the same length as `self`. May
contain NaN values.
"""
if tumor_boost and "n_alt_freq" in self:
alt_freq = self.tumor_boost()
else:
alt_freq = self["alt_freq"]
return _mirrored_baf(alt_freq, above_half)
[docs] def tumor_boost(self):
"""TumorBoost normalization of tumor-sample allele frequencies.
De-noises the signal for detecting LOH.
See: TumorBoost, Bengtsson et al. 2010
"""
if "n_alt_freq" in self:
n_freqs = self["n_alt_freq"]
else:
raise ValueError("TumorBoost requires a paired normal sample in "
"the VCF.")
t_freqs = self["alt_freq"]
return _tumor_boost(t_freqs, n_freqs)
def _mirrored_baf(vals, above_half=None):
shift = (vals - .5).abs()
if above_half is None:
above_half = (vals.median() > .5)
if above_half:
return .5 + shift
else:
return .5 - shift
def _tumor_boost(t_freqs, n_freqs):
"""Normalize tumor-sample allele frequencies.
boosted = { 0.5 (t/n) if t < n
1 - 0.5(1-t)/(1-n) otherwise
See: TumorBoost, Bengtsson et al. 2010
"""
lt_mask = (t_freqs < n_freqs)
lt_idx = np.nonzero(lt_mask)[0]
gt_idx = np.nonzero(~lt_mask)[0]
out = pd.Series(np.zeros_like(t_freqs))
out[lt_idx] = 0.5 * t_freqs.take(lt_idx) / n_freqs.take(lt_idx)
out[gt_idx] = 1 - 0.5 * (1 - t_freqs.take(gt_idx)
) / (1 - n_freqs.take(gt_idx))
return out
def _allele_specific_copy_numbers(segarr, varr, ploidy=2):
"""Split total copy number between alleles based on BAF.
See: PSCBS, Bentsson et al. 2011
"""
seg_depths = ploidy * np.exp2(segarr["log2"])
seg_bafs = varr.baf_by_ranges(segarr, above_half=True)
cn1 = 0.5 * (1 - seg_bafs) * seg_depths
cn2 = seg_depths - cn1
# segout = segarr.copy()
# segout.update({"baf": seg_bafs, "CN1": cn1, "CN2": cn2})
# return segout
return pd.DataFrame({"baf": seg_bafs, "cn1": cn1, "cn2": cn2})