Source code for cnvlib.importers

"""Import from other formats to the CNVkit format."""

from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Union

import numpy as np
from skgenome import tabio

from . import params

if TYPE_CHECKING:
    from collections.abc import Iterator
    from cnvlib.cnary import CopyNumArray


# __________________________________________________________________________
# import-picard


[docs] def do_import_picard(fname, too_many_no_coverage=100): """Import coverage data from Picard CalculateHsMetrics output. Converts Picard CalculateHsMetrics output to CNVkit format by reading the interval-level coverage data, cleaning up gene names, and calculating log2 ratios. Parameters ---------- fname : str Path to Picard CalculateHsMetrics output file (typically ends in .per_target_coverage or .interval_summary). too_many_no_coverage : int, optional Threshold for warning about excessive zero-coverage bins. Default: 100. Returns ------- CopyNumArray CNVkit-compatible genomic array with columns including 'gene', 'log2', and 'ratio' (coverage relative to average). Notes ----- Bins with zero coverage are assigned the NULL_LOG2_COVERAGE value to avoid math domain errors in log2 transformation. Gene names from overlapping intervals are deduplicated (see `unpipe_name`). See Also -------- unpipe_name : Cleans up duplicated gene names from Picard output """ garr = tabio.read(fname, "picardhs") garr["gene"] = garr["gene"].apply(unpipe_name) # Create log2 column from coverages, avoiding math domain error coverages = garr["ratio"].copy() no_cvg_idx = coverages == 0 if no_cvg_idx.sum() > too_many_no_coverage: logging.warning( "WARNING: Sample %s has >%d bins with no coverage", garr.sample_id, too_many_no_coverage, ) coverages[no_cvg_idx] = 2**params.NULL_LOG2_COVERAGE garr["log2"] = np.log2(coverages) return garr
[docs] def unpipe_name(name): """Fix the duplicated gene names Picard spits out. Return a string containing the single gene name, sans duplications and pipe characters. Picard CalculateHsMetrics combines the labels of overlapping intervals by joining all labels with '|', e.g. 'BRAF|BRAF' -- no two distinct targeted genes actually overlap, though, so these dupes are redundant. Meaningless target names are dropped, e.g. 'CGH|FOO|-' resolves as 'FOO'. In case of ambiguity, the longest name is taken, e.g. "TERT|TERT Promoter" resolves as "TERT Promoter". """ if "|" not in name: return name gene_names = set(name.split("|")) if len(gene_names) == 1: return gene_names.pop() cleaned_names = gene_names.difference(params.IGNORE_GENE_NAMES) if cleaned_names: gene_names = cleaned_names new_name = sorted(gene_names, key=len, reverse=True)[0] if len(gene_names) > 1: logging.warning("WARNING: Ambiguous gene name %r; using %r", name, new_name) return new_name
# __________________________________________________________________________ # import-theta
[docs] def do_import_theta( segarr: CopyNumArray, theta_results_fname: str, ploidy: int = 2 ) -> Iterator[CopyNumArray]: """Import absolute copy number calls from THetA results. THetA (Tumor Heterogeneity Analysis) infers tumor purity and clonal/subclonal copy number profiles. This function converts THetA's output into CNVkit segment format with absolute copy numbers. Parameters ---------- segarr : CopyNumArray Input segmented copy number data (.cns file). Typically the segmentation results before calling absolute copy numbers. theta_results_fname : str Path to THetA results file (.results or .withBounds file). ploidy : int, optional Expected baseline ploidy (usually 2 for diploid). Used to calculate log2 ratios from absolute copy numbers. Default: 2. Yields ------ CopyNumArray One or more segment arrays with 'cn' (absolute copy number) and recalculated 'log2' values. THetA may output multiple solutions, each yielded separately. Notes ----- - Only autosomal segments are processed; sex chromosomes are excluded because THetA doesn't handle them well. - Segments with unknown copy number (marked as None/X in THetA output) are dropped. - Copy number 0 is treated as 0.5 for log2 calculation to avoid -inf. See Also -------- parse_theta_results : Parses the THetA results file format """ theta = parse_theta_results(theta_results_fname) # THetA doesn't handle sex chromosomes well segarr = segarr.autosomes() for copies in theta["C"]: if len(copies) != len(segarr): copies = copies[: len(segarr)] # Drop any segments where the C value is None mask_drop = np.array([c is None for c in copies], dtype="bool") segarr = segarr[~mask_drop].copy() ok_copies = np.asarray([c for c in copies if c is not None], dtype=float) # Replace remaining segment values with these integers segarr["cn"] = ok_copies.astype("int") ok_copies[ok_copies == 0] = 0.5 segarr["log2"] = np.log2(ok_copies / ploidy) segarr.sort_columns() yield segarr
[docs] def parse_theta_results( fname: str, ) -> dict[str, Union[float, list[float], list[list[int]], list[list[float]]]]: """Parse THetA results into a data structure. Columns: NLL, mu, C, p* """ with open(fname) as handle: header = next(handle).rstrip().split("\t") body = next(handle).rstrip().split("\t") assert len(body) == len(header) == 4 # NLL nll = float(body[0]) # mu mu = body[1].split(",") mu_normal = float(mu[0]) mu_tumors = list(map(float, mu[1:])) # C copies = body[2].split(":") if len(mu_tumors) == 1: # 1D array of integers # Replace X with None for "missing" copies = [[int(c) if c.isdigit() else None for c in copies]] else: # List of lists of integer-or-None (usu. 2 x #segments) copies = [ [int(c) if c.isdigit() else None for c in subcop] for subcop in zip(*[c.split(",") for c in copies], strict=False) ] # p* probs = body[3].split(",") if len(mu_tumors) == 1: # 1D array of floats, or None for "X" (missing/unknown) probs = [float(p) if not p.isalpha() else None for p in probs] else: probs = [ [float(p) if not p.isalpha() else None for p in subprob] for subprob in zip(*[p.split(",") for p in probs], strict=False) ] return { "NLL": nll, "mu_normal": mu_normal, "mu_tumors": mu_tumors, "C": copies, "p*": probs, }