"""Z-test for single-bin copy number alterations."""
from __future__ import annotations
import logging
import numpy as np
import pandas as pd
from scipy.stats import norm
from . import params, segfilters
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from cnvlib.cnary import CopyNumArray
from numpy import ndarray
[docs]
def do_bintest(
cnarr: CopyNumArray,
segments: Optional[CopyNumArray] = None,
alpha: float = 0.005,
target_only: bool = False,
) -> CopyNumArray:
"""Get a probability for each bin based on its Z-score.
Adds a column with p-values to the input .cnr. With `segments`, the Z-score is
relative to the enclosing segment's mean, otherwise it is relative to 0.
Bin p-values are corrected for multiple hypothesis testing by the
Benjamini-Hochberg method.
Parameters
----------
cnarr : CopyNumArray
Bin-level copy number ratios.
segments : CopyNumArray, optional
Segmented copy number data to use as baseline. If provided, Z-scores
are calculated relative to segment means.
alpha : float, optional
Significance threshold for filtering bins. Default is 0.005.
target_only : bool, optional
Test only on-target bins, excluding antitarget bins. Default is False.
Returns
-------
CopyNumArray
Bins where the probability is less than `alpha`.
"""
cnarr = cnarr.copy()
# Subtract segment means, if given, to report only the CNA bins that
# weren't already detected (including exon-size CNAs within a
# larger-scale, smaller-amplitude CNA)
resid = cnarr.residuals(segments)
if not resid.index.is_unique:
# Overlapping segments, maybe?
dup_idx = resid.index.duplicated(keep=False)
logging.warning(
"Segments may overlap at %d bins; dropping duplicate values", dup_idx.sum()
)
logging.debug(
"Duplicated indices: %s", " ".join(map(str, resid[dup_idx].head(50)))
)
resid = resid[~resid.index.duplicated()]
cnarr = cnarr.as_dataframe(cnarr.data.loc[resid.index])
if len(cnarr) != len(resid):
logging.info(
"Segments do not cover all bins (%d), only %d of them",
len(cnarr),
len(resid),
)
cnarr = cnarr.as_dataframe(cnarr.data.loc[resid.index])
cnarr["log2"] = resid
cnarr["probes"] = 1
if target_only:
antitarget_idx = cnarr["gene"].isin(params.ANTITARGET_ALIASES)
if antitarget_idx.any():
logging.info("Ignoring %d off-target bins", antitarget_idx.sum())
# NB: bins no longer match the original input
cnarr = cnarr[~antitarget_idx]
cnarr["p_bintest"] = z_prob(cnarr)
is_sig = cnarr["p_bintest"] < alpha
logging.info(
"Significant hits in {}/{} bins ({:.3g}%)".format(
is_sig.sum(), len(is_sig), 100 * is_sig.sum() / len(is_sig)
)
)
# if segments:
# return spike_into_segments(cnarr, segments, is_sig)
# May be empty
hits = cnarr[is_sig]
return hits
def z_prob(cnarr: CopyNumArray) -> ndarray:
"""Calculate z-test p-value at each bin."""
# Bin weights ~ 1-variance; bin log2 values already centered at 0.0
sd = np.sqrt(1 - cnarr["weight"])
# Convert to Z-scores
z = cnarr["log2"] / sd
# Two-sided survival function (1-CDF) probability
p = 2.0 * norm.cdf(-np.abs(z))
# Similar to the above -- which is better?
# p2 = 2 * norm.pdf(cnarr['log2'], loc=0, scale=sd)
# if not np.allclose(p, p2):
# print("Max diff:", np.abs(p - p2).max())
# print("Median diff:", np.median(np.abs(p - p2)))
# print("Ratio:", (p / p2).mean())
# Correct for multiple hypothesis tests
return p_adjust_bh(p)
def p_adjust_bh(p: ndarray) -> ndarray:
"""Benjamini-Hochberg p-value correction for multiple hypothesis testing."""
p = np.asarray(p, dtype=float)
by_descend = p.argsort()[::-1]
by_orig = by_descend.argsort()
steps = float(len(p)) / np.arange(len(p), 0, -1)
q = np.minimum(1, np.minimum.accumulate(steps * p[by_descend]))
return q[by_orig]
def spike_into_segments(cnarr, segments, is_sig):
"""Splice significant hits (if any) into the given segments."""
if not is_sig.any():
# Nothing to do
return segments
# NB: residuals() above ensures hits all occur within segments
cnarr["is_sig"] = is_sig
chunks = []
for segment, seghits in cnarr.by_ranges(segments, keep_empty=True):
if seghits["is_sig"].any():
# Merge each run of adjacent non-significant bins within this
# segment, leaving the significant hits as single-bin segments
levels = seghits["is_sig"].cumsum() * seghits["is_sig"]
chunks.append(
seghits.data.assign(_levels=levels)
.groupby("_levels", sort=False)
.apply(segfilters.squash_region)
.reset_index(drop=True)
)
else:
# Keep this segment as-is
chunks.append(
pd.DataFrame.from_records([segment], columns=segments.data.columns)
)
return cnarr.as_dataframe(pd.concat(chunks, sort=False))