Source code for cnvlib.importers

"""Import from other formats to the CNVkit format."""
from __future__ import absolute_import, division, print_function
import math
import os.path
import subprocess

from . import core
from .params import NULL_LOG2_COVERAGE
from .ngfrills import echo
from .cnarray import CopyNumArray as CNA

# __________________________________________________________________________
# import-picard


[docs]def find_picard_files(file_and_dir_names): """Search the given paths for 'targetcoverage' CSV files. Per the convention we use in our Picard applets, the target coverage file names end with '.targetcoverage.csv'; anti-target coverages end with '.antitargetcoverage.csv'. """ filenames = [] for tgt in file_and_dir_names: if os.path.isdir(tgt): # Collect the target coverage files from this directory tree fnames = subprocess.check_output(['find', tgt, '-name', '*targetcoverage.csv'] ).splitlines() if not fnames: raise RuntimeError("Given directory %s does not contain any " "'*targetcoverage.csv' files." % tgt) filenames.extend(fnames) elif os.path.isfile(tgt): filenames.append(tgt) else: raise ValueError("Given path is neither a file nor a directory: %s" % tgt) filenames.sort() return filenames
[docs]def load_targetcoverage_csv(fname): """Parse a target or antitarget coverage file (.csv) into a CopyNumArray. These files are generated by Picard CalculateHsMetrics. The fields of the .csv files are actually separated by tabs, not commas. CSV column names: chrom (str), start, end, length (int), name (str), %gc, mean_coverage, normalized_coverage (float) """ cna_rows = [] no_cvg_cnt = 0 # Count probes with no coverage for row in core.parse_tsv(fname): assert len(row) >= 8, \ "Bad line in {}:\n{}".format(fname, row) chrom = row[0] start = int(row[1]) end = int(row[2]) gene = unpipe_name(row[4]) gc_val = float(row[5]) cvg = float(row[7]) # Coverage already normalized to avg. 1 by Picard if cvg == 0: no_cvg_cnt += 1 coverage = NULL_LOG2_COVERAGE else: coverage = math.log(cvg, 2) cna_rows.append((chrom, start, end, gene, coverage, gc_val)) if no_cvg_cnt > TOO_MANY_NO_COVERAGE: echo("*WARNING* Sample", fname, "has >", TOO_MANY_NO_COVERAGE, "probes with no coverage") pset = CNA.from_rows(core.fbase(fname), cna_rows, ('gc',)) pset.sort() return pset
[docs]def unpipe_name(name): """Fix the duplicated gene names Picard spits out. Return a string containing the single gene name, sans duplications and pipe characters. Picard CalculateHsMetrics combines the labels of overlapping intervals by joining all labels with '|', e.g. 'BRAF|BRAF' -- no two distinct targeted genes actually overlap, though, so these dupes are redundant. Also, in our convention, 'CGH' probes are selected intergenic regions, not meaningful gene names, so 'CGH|FOO' resolves as 'FOO'. """ gene_names = set(name.split('|')) if len(gene_names) > 1: if 'CGH' in gene_names and len(gene_names) == 2: gene_names.remove('CGH') else: echo("*WARNING* Ambiguous gene name:", name) return gene_names.pop() # __________________________________________________________________________ # import-seg
LOG2_10 = math.log(10, 2) # To convert log10 values to log2
[docs]def import_seg(segfname, chrom_names, chrom_prefix, from_log10): """Parse a SEG file. Emit pairs of (sample ID, CopyNumArray) Values are converted from log10 to log2. `chrom_names`: Map (string) chromosome IDs to names. (Applied before chrom_prefix.) e.g. {'23': 'X', '24': 'Y', '25': 'M'} `chrom_prefix`: prepend this string to chromosome names (usually 'chr' or None) """ curr_sample = None curr_rows = [] with open(segfname) as infile: lines = iter(infile) next(lines) # Skip the header for line in lines: sample, chrom, start, end, nprobes, mean = line.split() if chrom_names and chrom in chrom_names: chrom = chrom_names[chrom] if chrom_prefix: chrom = chrom_prefix + chrom mean = float(mean) if from_log10: mean *= LOG2_10 this_row = (chrom, int(start), int(end), ("G" if mean >= 0 else "L"), mean, int(nprobes)) if curr_sample != sample: if curr_sample is not None: assert len(curr_rows) # Emit the current set of segments as a sample yield CNA.from_rows(curr_sample, curr_rows, ('probes',)) # Reset curr_sample = sample curr_rows = [this_row] else: # Continue building this sample curr_rows.append(this_row) # Remainder if curr_sample is not None: assert len(curr_rows) # Emit the current set of segments as a sample yield CNA.from_rows(curr_sample, curr_rows, ('probes',)) # __________________________________________________________________________ # import-theta
[docs]def parse_theta_results(fname): """Parse THetA results into a data structure. Columns: NLL, mu, C, p* """ with open(fname) as handle: header = next(handle).rstrip().split('\t') body = next(handle).rstrip().split('\t') assert len(body) == len(header) == 4 # NLL nll = float(body[0]) # mu mu = body[1].split(',') mu_normal = float(mu[0]) mu_tumors = map(float, mu[1:]) # C copies = body[2].split(':') if len(mu_tumors) == 1: # 1D array of integers # Replace X with None for "missing" copies = [[int(c) if c.isdigit() else None for c in copies]] else: # List of lists of integer-or-None (usu. 2 x #segments) copies = [[int(c) if c.isdigit() else None for c in subcop] for subcop in zip(*[c.split(',') for c in copies])] # p* probs = body[3].split(',') if len(mu_tumors) == 1: # 1D array of floats, or None for "X" (missing/unknown) probs = [float(p) if not p.isalpha() else None for p in probs] else: probs = [[float(p) if not p.isalpha() else None for p in subprob] for subprob in zip(*[p.split(',') for p in probs])] return {"NLL": nll, "mu_normal": mu_normal, "mu_tumors": mu_tumors, "C": copies, "p*": probs}