"""Import from other formats to the CNVkit format."""
from __future__ import absolute_import, division, print_function
import logging
import math
import os.path
import subprocess
import numpy as np
import pandas as pd
from . import core, params
from .cnary import CopyNumArray as CNA
# __________________________________________________________________________
# import-picard
TOO_MANY_NO_COVERAGE = 100
[docs]def find_picard_files(file_and_dir_names):
"""Search the given paths for 'targetcoverage' CSV files.
Per the convention we use in our Picard applets, the target coverage file
names end with '.targetcoverage.csv'; anti-target coverages end with
'.antitargetcoverage.csv'.
"""
filenames = []
for tgt in file_and_dir_names:
if os.path.isdir(tgt):
# Collect the target coverage files from this directory tree
fnames = subprocess.check_output(['find', tgt,
'-name', '*targetcoverage.csv']
).splitlines()
if not fnames:
raise RuntimeError("Given directory %s does not contain any "
"'*targetcoverage.csv' files."
% tgt)
filenames.extend(fnames)
elif os.path.isfile(tgt):
filenames.append(tgt)
else:
raise ValueError("Given path is neither a file nor a directory: %s"
% tgt)
filenames.sort()
return filenames
[docs]def import_picard_pertargetcoverage(fname):
"""Parse a Picard CalculateHsMetrics PER_TARGET_COVERAGE file.
Return a CopyNumArray.
Input column names:
chrom (str),
start, end, length (int),
name (str),
%gc, mean_coverage, normalized_coverage (float)
"""
dframe = pd.read_table(fname, na_filter=False)
coverages = np.asarray(dframe['mean_coverage'])
no_cvg_idx = (coverages == 0)
if sum(no_cvg_idx) > TOO_MANY_NO_COVERAGE:
logging.warn("*WARNING* Sample %s has >%d bins with no coverage",
fname, TOO_MANY_NO_COVERAGE)
# Avoid math domain error
coverages[no_cvg_idx] = 2**params.NULL_LOG2_COVERAGE
cnarr = CNA.from_columns({"chromosome": dframe["chrom"],
"start": dframe["start"] - 1,
"end": dframe["end"],
"gene": dframe["name"].apply(unpipe_name),
"gc": dframe["%gc"],
"log2": np.log2(coverages)},
{"sample_id": core.fbase(fname)})
cnarr.sort()
return cnarr
[docs]def unpipe_name(name):
"""Fix the duplicated gene names Picard spits out.
Return a string containing the single gene name, sans duplications and pipe
characters.
Picard CalculateHsMetrics combines the labels of overlapping intervals
by joining all labels with '|', e.g. 'BRAF|BRAF' -- no two distinct
targeted genes actually overlap, though, so these dupes are redundant.
Meaningless target names are dropped, e.g. 'CGH|FOO|-' resolves as 'FOO'.
In case of ambiguity, the longest name is taken, e.g. "TERT|TERT Promoter"
resolves as "TERT Promoter".
"""
if '|' not in name:
return name
gene_names = set(name.split('|'))
if len(gene_names) == 1:
return gene_names.pop()
cleaned_names = gene_names.difference(params.IGNORE_GENE_NAMES)
if cleaned_names:
gene_names = cleaned_names
new_name = sorted(gene_names, key=len, reverse=True)[0]
if len(gene_names) > 1:
logging.warn("*WARNING* Ambiguous gene name %r; using %r",
name, new_name)
return new_name
# __________________________________________________________________________
# import-seg
LOG2_10 = math.log(10, 2) # To convert log10 values to log2
[docs]def import_seg(segfname, chrom_names, chrom_prefix, from_log10):
"""Parse a SEG file as an iterable of CopyNumArray instances.
`chrom_names`:
Map (string) chromosome IDs to names. (Applied before chrom_prefix.)
e.g. {'23': 'X', '24': 'Y', '25': 'M'}
`chrom_prefix`: prepend this string to chromosome names
(usually 'chr' or None)
`from_log10`: Convert values from log10 to log2.
"""
dframe = pd.read_table(segfname, na_filter=False)
if len(dframe.columns) == 6:
dframe.columns = ['sample_id', 'chromosome', 'start', 'end', 'nprobes',
'mean']
elif len(dframe.columns) == 5:
dframe.columns = ['sample_id', 'chromosome', 'start', 'end', 'mean']
else:
raise ValueError("SEG format expects 5 or 6 columns; found {}: {}"
.format(len(dframe.columns), ' '.join(dframe.columns)))
# Calculate values for output columns
dframe['chromosome'] = dframe['chromosome'].apply(str)
if chrom_names:
dframe['chromosome'] = dframe['chromosome'].apply(lambda c:
chrom_names.get(c, c))
if chrom_prefix:
dframe['chromosome'] = dframe['chromosome'].apply(lambda c:
chrom_prefix + c)
if from_log10:
dframe['mean'] *= LOG2_10
dframe['gene'] = ["G" if mean >= 0 else "L" for mean in dframe['mean']]
for sid in pd.unique(dframe['sample_id']):
sample = dframe[dframe['sample_id'] == sid]
cols = {'chromosome': sample['chromosome'],
'start': sample['start'],
'end': sample['end'],
'gene': sample['gene'],
'log2': sample['mean']}
if 'nprobes' in dframe:
cols['probes'] = sample['nprobes']
cns = CNA.from_columns(cols, {'sample_id': sid})
cns.sort()
yield cns
# __________________________________________________________________________
# import-theta
[docs]def parse_theta_results(fname):
"""Parse THetA results into a data structure.
Columns: NLL, mu, C, p*
"""
with open(fname) as handle:
header = next(handle).rstrip().split('\t')
body = next(handle).rstrip().split('\t')
assert len(body) == len(header) == 4
# NLL
nll = float(body[0])
# mu
mu = body[1].split(',')
mu_normal = float(mu[0])
mu_tumors = map(float, mu[1:])
# C
copies = body[2].split(':')
if len(mu_tumors) == 1:
# 1D array of integers
# Replace X with None for "missing"
copies = [[int(c) if c.isdigit() else None
for c in copies]]
else:
# List of lists of integer-or-None (usu. 2 x #segments)
copies = [[int(c) if c.isdigit() else None
for c in subcop]
for subcop in zip(*[c.split(',') for c in copies])]
# p*
probs = body[3].split(',')
if len(mu_tumors) == 1:
# 1D array of floats, or None for "X" (missing/unknown)
probs = [float(p) if not p.isalpha() else None
for p in probs]
else:
probs = [[float(p) if not p.isalpha() else None
for p in subprob]
for subprob in zip(*[p.split(',') for p in probs])]
return {"NLL": nll,
"mu_normal": mu_normal,
"mu_tumors": mu_tumors,
"C": copies,
"p*": probs}