Source code for cnvlib.export

"""Export CNVkit objects and files to other formats."""
from __future__ import absolute_import, division, print_function

import collections
import math
import sys

from Bio._py3k import map, range, zip
import numpy

from . import core
from .cnarray import row2label, CopyNumArray as CNA

ProbeInfo = collections.namedtuple('ProbeInfo', 'label chrom start end gene')

[docs]def merge_samples(filenames): """Merge probe values from multiple samples into a 2D table (of sorts). Input: dict of {sample ID: (probes, values)} Output: list-of-tuples: (probe, log2 coverages...) """ handles = [] datastreams = [] # e.g. [list-of-pairs, list-of-pairs, ...] for fname in filenames: handle = open(fname) handles.append(handle) data = core.parse_tsv(handle) datastreams.append(data) # Emit the individual rows merged across samples, one per probe for rows in zip(*datastreams): yield merge_rows(rows) # Clean up for handle in handles: handle.close()
[docs]def merge_rows(rows): """Combine equivalent rows of coverage data across multiple samples. Check that probe info matches across all samples, then merge the log2 coverage values. Input: a list of individual rows corresponding to the same probes from different coverage files. Output: a list starting with the single common Probe object, followed by the log2 coverage values from each sample, in order. """ probe_infos, coverages = zip(*map(row_to_probe_coverage, rows)) probe_info = core.check_unique(probe_infos, "probe Name") combined_row = [probe_info] + list(coverages) return combined_row
[docs]def row_to_probe_coverage(row): """Repack a parsed row into a ProbeInfo instance and coverage value.""" chrom, start, end, gene, coverage = row[:5] label = "%s:%s-%s:%s" % (chrom, start, end, gene) probe_info = ProbeInfo(label, chrom, int(start), int(end), gene) return probe_info, float(coverage) # Supported formats:
[docs]def fmt_cdt(sample_ids, rows): """Format as CDT.""" outheader = ['GID', 'CLID', 'NAME', 'GWEIGHT'] + sample_ids header2 = ['AID', '', '', ''] header2.extend(['ARRY' + str(i).zfill(3) + 'X' for i in range(len(sample_ids))]) outrows = [header2] for i, row in enumerate(rows): probe, values = row[0], row[1:] outrow = ['GENE%dX' % i, 'IMAGE:%d' % i, probe.label, 1] # or probe.gene? outrow.extend(values) outrows.append(outrow) return outheader, outrows # TODO
[docs]def fmt_gct(sample_ids, rows): return NotImplemented
[docs]def fmt_jtv(sample_ids, rows): """Format for Java TreeView.""" outheader = ["CloneID", "Name"] + sample_ids outrows = [["IMAGE:", row[0].label] + row[1:] for row in rows] return outheader, outrows # TODO
[docs]def fmt_multi(sample_ids, rows): return NotImplemented # TODO
[docs]def fmt_vcf(sample_ids, rows): return NotImplemented # Special cases
[docs]def export_nexus_basic(sample_fname): """Biodiscovery Nexus Copy Number "basic" format. Only represents one sample per file. """ cnarr = CNA.read(sample_fname) outheader = ['probe', 'chromosome', 'start', 'end', 'gene', 'log2'] outrows = [[row2label(row)] + list(row)[:5] for row in cnarr.data] return outheader, outrows
[docs]def export_seg(sample_fnames): """SEG format for copy number segments. Segment breakpoints are not the same across samples, so samples are listed in serial with the sample ID as the left column. """ outrows = [] chrom_ids = None for fname in sample_fnames: segments = CNA.read(fname) if chrom_ids is None: # Create & store chrom_ids = create_chrom_ids(segments) else: # Verify core.assert_equal("Segment chromosome names differ", previous=chrom_ids.keys(), current=create_chrom_ids(segments).keys()) if 'probes' in segments.data.dtype.fields: outheader = ["ID", "Chromosome", "Start", "End", "NumProbes", "Mean"] def row2out(row): return (segments.sample_id, chrom_ids[row['chromosome']], row['start'], row['end'], row['probes'], row['coverage']) else: outheader = ["ID", "Chromosome", "Start", "End", "Mean"] def row2out(row): return (segments.sample_id, chrom_ids[row['chromosome']], row['start'], row['end'], row['coverage']) outrows.extend(row2out(row) for row in segments) return outheader, outrows
[docs]def create_chrom_ids(segments): """Map chromosome names to integers in the order encountered.""" mapping = collections.OrderedDict() curr_idx = 1 for chrom in segments.chromosome: if chrom not in mapping: mapping[chrom] = curr_idx curr_idx += 1 return mapping # _____________________________________________________________________________ # freebayes
[docs]def export_freebayes(sample_fname, args): """Export to FreeBayes --cnv-map format. Which is BED-like, for each region in each sample which does not have neutral copy number (equal to 2 or the value set by --ploidy), with columns: - reference sequence - start (0-indexed) - end - sample name - copy number """ if args.purity and not 0.0 < args.purity <= 1.0: raise RuntimeError("Purity must be between 0 and 1.") segs = CNA.read(sample_fname) is_sample_female = core.guess_xx(segs, args.male_normal, verbose=False) if args.gender: is_sample_female_given = (args.gender in ["f", "female"]) if is_sample_female != is_sample_female_given: print("Sample gender specified as", args.gender, "but chrX copy number looks like", "female" if is_sample_female else "male", file=sys.stderr) is_sample_female = is_sample_female_given print("Treating sample gender as", "female" if is_sample_female else "male", file=sys.stderr) bedrows = segments2freebayes(segs, args.name or segs.sample_id, args.ploidy, args.purity, args.male_normal, is_sample_female) return None, list(bedrows)
[docs]def segments2freebayes(segments, sample_name, ploidy, purity, is_reference_male, is_sample_female): """Convert a copy number array to a BED-like format.""" absolutes = cna_absolutes(segments, ploidy, purity, is_reference_male, is_sample_female) for row, abs_val in zip(segments, absolutes): ncopies = round_to_integer(abs_val, half_is_zero=purity is None) # Ignore regions of neutral copy number if ncopies != ploidy: yield (row["chromosome"], # reference sequence row["start"], # start (0-indexed) row["end"], # end sample_name, # sample name ncopies) # copy number
[docs]def round_to_integer(ncopies, half_is_zero=True, rounding_error=1e-7): """Round an absolute estimate of copy number to a positive integer. `half_is_zero` indicates the hack of encoding 0 copies (complete loss) as a half-copy in log2 scale (e.g. log2-ratio value of -2.0 for diploid) to avoid domain errors when log-transforming. If `half_is_zero`, a half-copy will be rounded down to zero rather than up to 1 copy. """ zero_cutoff = (.5 + rounding_error if half_is_zero else 0.0) if ncopies <= zero_cutoff: return 0 return int(round(ncopies)) # _____________________________________________________________________________ # theta
[docs]def export_theta(tumor, reference): """Convert tumor segments and normal .cnr or reference .cnn to THetA input. Follows the THetA segmentation import script but avoid repeating the pileups, since we already have the mean depth of coverage in each target bin. The options for average depth of coverage and read length do not matter crucially for proper operation of THetA; increased read counts per bin simply increase the confidence of THetA's results. THetA2 input format is tabular, with columns: ID, chrm, start, end, tumorCount, normalCount where chromosome IDs ("chrm") are integers 1 through 24. """ tumor_segs = CNA.read(tumor) ref_vals = CNA.read(reference) outheader = ["#ID", "chrm", "start", "end", "tumorCount", "normalCount"] outrows = [] # Convert chromosome names to 1-based integer indices prev_chrom = None chrom_id = 0 for seg, ref_rows in ref_vals.by_segment(tumor_segs): if seg["chromosome"] != prev_chrom: chrom_id += 1 prev_chrom = seg["chromosome"] fields = calculate_theta_fields(seg, ref_rows, chrom_id) outrows.append(fields) return outheader, outrows
[docs]def calculate_theta_fields(seg, ref_rows, chrom_id): """Convert a segment's info to a row of THetA input. For the normal/reference bin count, take the mean of the bin values within each segment so that segments match between tumor and normal. """ # These two scaling factors don't meaningfully affect THetA's calculation # unless they're too small expect_depth = 100 # Average exome-wide depth of coverage read_length = 100 # Similar number of reads in on-, off-target bins; treat them equally segment_size = 1000 * seg["probes"] def logratio2count(log2_ratio): """Calculate a segment's read count from log2-ratio. Math: nbases = read_length * read_count and nbases = segment_size * read_depth where read_depth = read_depth_ratio * expect_depth So: read_length * read_count = segment_size * read_depth read_count = segment_size * read_depth / read_length """ read_depth = (2 ** log2_ratio) * expect_depth read_count = segment_size * read_depth / read_length return int(round(read_count)) tumor_count = logratio2count(seg["coverage"]) ref_count = logratio2count(ref_rows["coverage"].mean()) # e.g. "start_1_93709:end_1_19208166" row_id = ("start_%d_%d:end_%d_%d" % (chrom_id, seg["start"], chrom_id, seg["end"])) return (row_id, # ID chrom_id, # chrm seg["start"], # start seg["end"], # end tumor_count, # tumorCount ref_count # normalCount ) # _____________________________________________________________________________ # Rescaling etc. # (XXX move these to cnarray or another module)
[docs]def rescale_copy_ratios(cnarr, purity=None, ploidy=2, is_sample_female=None, is_reference_male=True): """Rescale segment copy ratio values given a known tumor purity.""" if purity and not 0.0 < purity <= 1.0: raise ValueError("Purity must be between 0 and 1.") chr_x = core.guess_chr_x(cnarr) chr_y = ('chrY' if chr_x.startswith('chr') else 'Y') if is_sample_female is None: is_sample_female = core.guess_xx(cnarr, is_reference_male, chr_x, verbose=False) absolutes = cna_absolutes(cnarr, ploidy, purity, is_reference_male, is_sample_female) # Avoid a logarithm domain error absolutes[absolutes <= 0.0] = .5 abslog = numpy.log2(absolutes / float(ploidy)) newcnarr = cnarr.copy() newcnarr["coverage"] = abslog # Adjust sex chromosomes to be relative to the reference if is_reference_male: newcnarr['coverage'][newcnarr.chromosome == chr_x] += 1.0 newcnarr['coverage'][newcnarr.chromosome == chr_y] += 1.0 return newcnarr
[docs]def cna_absolutes(cnarr, ploidy, purity, is_reference_male, is_sample_female): """Calculate absolute copy number values from segment or bin log2 ratios.""" absolutes = numpy.zeros(len(cnarr), dtype=numpy.float_) for i, row in enumerate(cnarr): ref_copies, expect_copies = _reference_expect_copies( row["chromosome"], ploidy, is_sample_female, is_reference_male) absolutes[i] = _log2_ratio_to_absolute( row["coverage"], ref_copies, expect_copies, purity) return absolutes
def _reference_expect_copies(chrom, ploidy, is_sample_female, is_reference_male): """Determine the number copies of a chromosome expected and in reference. For sex chromosomes, these values may not be the same ploidy as the autosomes. Return a pair: number of copies in the reference and expected in the sample. """ chrom = chrom.lower() if chrom in ["chrx", "x"]: ref_copies = (ploidy // 2 if is_reference_male else ploidy) exp_copies = (ploidy if is_sample_female else ploidy // 2) elif chrom in ["chry", "y"]: ref_copies = ploidy // 2 exp_copies = (0 if is_sample_female else ploidy // 2) else: ref_copies = exp_copies = ploidy return ref_copies, exp_copies def _log2_ratio_to_absolute(log2_ratio, ref_copies, expect_copies, purity=None): """Transform a log2 ratio value to absolute linear scale. Math: log2_ratio = log2(ncopies / ploidy) 2^log2_ratio = ncopies / ploidy ncopies = ploidy * 2^log2_ratio With rescaling for purity: let v = log2 ratio value, p = tumor purity, r = reference ploidy, x = expected ploidy; v = log_2(p*n/r + (1-p)*x/r) 2^v = p*n/r + (1-p)*x/r n*p/r = 2^v - (1-p)*x/r n = (r*2^v - x*(1-p)) / p If purity adjustment is skipped (p=1; e.g. if THetA was run beforehand): n = r*2^v """ if purity and purity < 1.0: ncopies = (ref_copies * 2**log2_ratio - expect_copies * (1 - purity) ) / purity else: ncopies = ref_copies * 2 ** log2_ratio return ncopies # _____________________________________________________________________________ EXPORT_FORMATS = { 'cdt': fmt_cdt, # 'gct': fmt_gct, 'jtv': fmt_jtv, # 'vcf': fmt_vcf, 'nexus-basic': export_nexus_basic, # 'nexus-multi1': fmt_multi, 'seg': export_seg, 'freebayes': export_freebayes, 'theta': export_theta, }