Source code for cnvlib.vary

"""An array of genomic intervals, treated as variant loci."""
from __future__ import absolute_import, division, print_function

import logging

import numpy as np
import pandas as pd
import vcf

from . import gary


[docs]class VariantArray(gary.GenomicArray): """An array of genomic intervals, treated as variant loci. Required columns: chromosome, start, end, ref, alt """ _required_columns = ("chromosome", "start", "end", "ref", "alt") _required_dtypes = ("string", "int", "int", "string", "string") # Extra: somatic, zygosity, depth, alt_count, alt_freq def __init__(self, data_table, meta_dict=None): gary.GenomicArray.__init__(self, data_table, meta_dict)
[docs] def mirrored_baf(self, above_half=True, tumor_boost=True): """Mirrored B-allele frequencies (BAFs). Flip BAFs to be all above 0.5 (if `above_half`) or below 0.5, for consistency. With `tumor_boost`, normalize tumor-sample allele frequencies to the matched normal allele frequencies. """ if tumor_boost and "n_alt_freq" in self: alt_freqs = self.tumor_boost() else: alt_freqs = self["alt_freq"] fromhalf = np.abs(alt_freqs - 0.5) if above_half: return fromhalf + 0.5 else: return 0.5 - fromhalf
[docs] def tumor_boost(self): """TumorBoost normalization of tumor-sample allele frequencies. De-noises the signal for detecting LOH. """ if "n_alt_freq" in self: n_freqs = self["n_alt_freq"] else: raise ValueError("TumorBoost requires a paired normal sample in " "the VCF.") t_freqs = self["alt_freq"] return _tumor_boost(t_freqs, n_freqs)
# I/O @classmethod
[docs] def read_vcf(cls, infile, sample_id=None, normal_id=None, min_depth=None, skip_hom=False, skip_reject=False, skip_somatic=False): """Parse SNV coordinates from a VCF file into a VariantArray.""" if isinstance(infile, basestring): vcf_reader = vcf.Reader(filename=infile) else: vcf_reader = vcf.Reader(infile) if not vcf_reader.samples: logging.warn("VCF file %s has no samples; parsing minimal info", infile) return cls._read_vcf_nosample(infile, sample_id, skip_reject) columns = [ "chromosome", "start", "end", "ref", "alt", "somatic", "zygosity", "depth", "alt_count"] sample_id, normal_id = _select_sample(vcf_reader, sample_id, normal_id) if normal_id: columns.extend(["n_zygosity", "n_depth", "n_alt_count"]) rows = _parse_records(vcf_reader, sample_id, normal_id, skip_reject) table = pd.DataFrame.from_records(rows, columns=columns) table["alt_freq"] = table["alt_count"] / table["depth"] if normal_id: table["n_alt_freq"] = table["n_alt_count"] / table["n_depth"] # Filter out records as requested cnt_depth = cnt_hom = cnt_som = 0 if min_depth: dkey = "n_depth" if "n_depth" in table else "depth" idx_depth = table[dkey] >= min_depth cnt_depth = (~idx_depth).sum() table = table[idx_depth] if skip_hom: zkey = "n_zygosity" if "n_zygosity" in table else "zygosity" idx_het = (table[zkey] != 0.0) & (table[zkey] != 1.0) cnt_hom = (~idx_het).sum() table = table[idx_het] if skip_somatic: idx_som = table["somatic"] cnt_som = idx_som.sum() table = table[~idx_som] logging.info("Skipped records: %d somatic, %d depth, %d homozygous", cnt_som, cnt_depth, cnt_hom) return cls(table, {"sample_id": sample_id})
@classmethod def _read_vcf_nosample(cls, vcf_file, sample_id=None, skip_reject=False): table = pd.read_table(vcf_file, comment="#", header=None, na_filter=False, names=["chromosome", "start", "_ID", "ref", "alt", "_QUAL", "filter", "info"], usecols=cls._required_columns, # usecols=["chromosome", "start", "ref", "alt", # # "filter", "info", # ], # ENH: converters=func -> to parse each col dtype=dict(zip(cls._required_columns, cls._required_dtypes)), ) # ENH: do things with filter, info # if skip_reject and record.FILTER and len(record.FILTER) > 0: table['end'] = table['start'] # TODO: _get_end table['start'] -= 1 table = table.loc[:, cls._required_columns] return cls(table, {"sample_id": sample_id})
# def write_vcf(self, outfile=sys.stdout): # """Write data to a file or handle in VCF format.""" # # grab from export.export_vcf() def _select_sample(vcf_reader, sample_id, normal_id): """Select a sample ID in the VCF; ensure it's valid.""" peds = list(_parse_pedigrees(vcf_reader)) if sample_id is None and normal_id is None and peds: # Use the VCF header to select the tumor sample # Take the first entry, if any sample_id, normal_id = peds[0] elif sample_id and normal_id: # Take the given IDs at face value, just verify below pass elif sample_id: if peds: for sid, nid in peds: if sid == sample_id: normal_id = nid break # if normal_id is None and len(vcf_reader.samples == 2): # normal_id = next(s for s in vcf_reader.samples if s != sample_id) elif normal_id: if peds: for sid, nid in peds: if nid == normal_id: sample_id = sid break else: try: sample_id = next(s for s in vcf_reader.samples if s != normal_id) except StopIteration: raise ValueError( "No other sample in VCF besides the specified normal " + normal_id + "; did you mean to use this as the sample_id " "instead?") else: sample_id = vcf_reader.samples[0] _confirm_unique(sample_id, vcf_reader.samples) if normal_id: _confirm_unique(normal_id, vcf_reader.samples) logging.info("Selected test sample " + sample_id + (" and control sample %s" % normal_id if normal_id else '')) return sample_id, normal_id def _parse_pedigrees(vcf_reader): """Extract tumor/normal pair sample IDs from the VCF header. Return an iterable of (tumor sample ID, normal sample ID). """ if "PEDIGREE" in vcf_reader.metadata: for tag in vcf_reader.metadata["PEDIGREE"]: if "Derived" in tag: sample_id = tag["Derived"] normal_id = tag["Original"] logging.debug("Found tumor sample %s and normal sample %s " "in the VCF header PEDIGREE tag", sample_id, normal_id) yield sample_id, normal_id elif "GATKCommandLine" in vcf_reader.metadata: for tag in vcf_reader.metadata["GATKCommandLine"]: if tag.get("ID") == "MuTect": # any others OK? options = dict(kv.split("=", 1) for kv in tag["CommandLineOptions"].split() if '=' in kv) sample_id = options.get('tumor_sample_name') normal_id = options['normal_sample_name'] logging.debug("Found tumor sample %s and normal sample " "%s in the MuTect VCF header", sample_id, normal_id) yield sample_id, normal_id def _confirm_unique(sample_id, samples): occurrences = [s for s in samples if s == sample_id] if len(occurrences) != 1: raise ValueError( "Did not find a single sample ID '%s' in: %s" % (sample_id, samples)) def _parse_records(vcf_reader, sample_id, normal_id, skip_reject): """Parse VCF records into DataFrame rows. Apply filters to skip records with low depth, homozygosity, the REJECT flag, or the SOMATIC info field. """ cnt_reject = 0 # For logging for record in vcf_reader: is_som = False if skip_reject and record.FILTER and len(record.FILTER) > 0: cnt_reject += 1 continue if record.INFO.get("SOMATIC"): is_som = True sample = record.genotype(sample_id) depth, zygosity, alt_count = _extract_genotype(sample) if normal_id: normal = record.genotype(normal_id) n_depth, n_zygosity, n_alt_count = _extract_genotype(normal) if n_zygosity == 0: is_som = True # Split multiallelics? # XXX Ensure sample genotypes are handled properly for alt in record.ALT: posn = record.POS - 1 end = _get_end(posn, alt, record.INFO) row = (record.CHROM, posn, end, record.REF, str(alt), is_som, zygosity, depth, alt_count, ) if normal_id: row += (n_zygosity, n_depth, n_alt_count) yield row if cnt_reject: logging.info('Filtered out %d records', cnt_reject) def _extract_genotype(sample): if "DP" in sample.data._fields: depth = sample.data.DP else: # SV, probably depth = alt_count = None if sample.is_het: zygosity = 0.5 elif sample.gt_type == 0: zygosity = 0.0 else: zygosity = 1.0 alt_count = (_get_alt_count(sample) if sample.gt_type else 0.0) return depth, zygosity, alt_count def _get_alt_count(sample): """Get the alternative allele count from a sample in a VCF record.""" if "AD" in sample.data._fields and sample.data.AD is not None: # GATK and other callers if isinstance(sample.data.AD, (list, tuple)): alt_count = float(sample.data.AD[1]) # VarScan else: alt_count = float(sample.data.AD) elif "CLCAD2" in sample.data._fields and sample.data.CLCAD2 is not None: # Qiagen CLC Genomics Server -- similar to GATK's AD alt_count = float(sample.data.CLCAD2[1]) elif "AO" in sample.data._fields: if sample.data.AO: if isinstance(sample.data.AO, (list, tuple)): alt_count = sum(map(float, sample.data.AO)) else: alt_count = float(sample.data.AO) else: alt_count = 0.0 else: logging.warn("Skipping %s:%d %s; " "unsure how to get alternative allele count: %s", sample.site.CHROM, sample.site.POS, sample.site.REF, sample.data) alt_count = None # or 0 or "missing data"? return alt_count def _get_end(posn, alt, info): """Get record end position.""" if "END" in info: # Structural variant return posn + info['END'] return posn + len(alt) def _tumor_boost(t_freqs, n_freqs): """Normalize tumor-sample allele frequencies. boosted = { 0.5 (t/n) if t < n 1 - 0.5(1-t)/(1-n) otherwise See: TumorBoost, Bengtsson et al. 2010 """ lt_mask = (t_freqs < n_freqs) lt_idx = np.nonzero(lt_mask)[0] gt_idx = np.nonzero(~lt_mask)[0] out = np.zeros_like(t_freqs) out[lt_idx] = 0.5 * t_freqs.take(lt_idx) / n_freqs.take(lt_idx) out[gt_idx] = 1 - 0.5 * (1 - t_freqs.take(gt_idx) ) / (1 - n_freqs.take(gt_idx)) return out def _allele_specific_copy_numbers(segarr, varr, ploidy=2): """Split total copy number between alleles based on BAF. See: PSCBS, Bentsson et al. 2011 """ seg_depths = ploidy * np.exp2(segarr["log2"]) seg_bafs = np.asarray([(np.median(subvarr.mirrored_baf()) if len(subvarr) else np.nan) for _seg, subvarr in varr.by_ranges(segarr)]) cn1 = 0.5 * (1 - seg_bafs) * seg_depths cn2 = seg_depths - cn1 # segout = segarr.copy() # segout.update({"baf": seg_bafs, "CN1": cn1, "CN2": cn2}) # return segout return pd.DataFrame({"baf": seg_bafs, "CN1": cn1, "CN2": cn2})