Source code for cnvlib.segfilters

"""Filter copy number segments."""
import functools
import logging

import numpy as np
import pandas as pd

from .descriptives import weighted_median


[docs]def require_column(*colnames): """Wrapper to coordinate the segment-filtering functions. Verify that the given columns are in the CopyNumArray the wrapped function takes. Also log the number of rows in the array before and after filtration. """ if len(colnames) == 1: msg = "'{}' filter requires column '{}'" else: msg = "'{}' filter requires columns " + \ ", ".join(["'{}'"] * len(colnames)) def wrap(func): @functools.wraps(func) def wrapped_f(segarr): filtname = func.__name__ if any(c not in segarr for c in colnames): raise ValueError(msg.format(filtname, *colnames)) result = func(segarr) logging.info("Filtered by '%s' from %d to %d rows", filtname, len(segarr), len(result)) return result return wrapped_f return wrap
[docs]def squash_by_groups(cnarr, levels, by_arm=False): """Reduce CopyNumArray rows to a single row within each given level.""" # Enumerate runs of identical values change_levels = enumerate_changes(levels) assert (change_levels.index == levels.index).all() assert cnarr.data.index.is_unique assert levels.index.is_unique assert change_levels.index.is_unique if by_arm: # Enumerate chromosome arms arm_levels = [] for i, (_chrom, cnarm) in enumerate(cnarr.by_arm()): arm_levels.append(np.repeat(i, len(cnarm))) change_levels += np.concatenate(arm_levels) else: # Enumerate chromosomes chrom_names = cnarr['chromosome'].unique() chrom_col = (cnarr['chromosome'] .replace(chrom_names, np.arange(len(chrom_names)))) change_levels += chrom_col data = cnarr.data.assign(_group=change_levels) groupkey = ['_group'] if 'cn1' in cnarr: # Keep allele-specific CNAs separate data['_g1'] = enumerate_changes(cnarr['cn1']) data['_g2'] = enumerate_changes(cnarr['cn2']) groupkey.extend(['_g1', '_g2']) data = (data.groupby(groupkey, as_index=False, group_keys=False, sort=False) .apply(squash_region) .reset_index(drop=True)) return cnarr.as_dataframe(data)
[docs]def enumerate_changes(levels): """Assign a unique integer to each run of identical values. Repeated but non-consecutive values will be assigned different integers. """ return levels.diff().fillna(0).abs().cumsum().astype(int)
[docs]def squash_region(cnarr): """Reduce a CopyNumArray to 1 row, keeping fields sensible. Most fields added by the `segmetrics` command will be dropped. """ assert 'weight' in cnarr out = {'chromosome': [cnarr['chromosome'].iat[0]], 'start': cnarr['start'].iat[0], 'end': cnarr['end'].iat[-1], } region_weight = cnarr['weight'].sum() if region_weight > 0: out['log2'] = np.average(cnarr['log2'], weights=cnarr['weight']) else: out['log2'] = np.mean(cnarr['log2']) out['gene'] = ','.join(cnarr['gene'].drop_duplicates()) out['probes'] = cnarr['probes'].sum() if 'probes' in cnarr else len(cnarr) out['weight'] = region_weight if 'depth' in cnarr: if region_weight > 0: out['depth'] = np.average(cnarr['depth'], weights=cnarr['weight']) else: out['depth'] = np.mean(cnarr['depth']) if 'baf' in cnarr: if region_weight > 0: out['baf'] = np.average(cnarr['baf'], weights=cnarr['weight']) else: out['baf'] = np.mean(cnarr['baf']) if 'cn' in cnarr: if region_weight > 0: out['cn'] = weighted_median(cnarr['cn'], cnarr['weight']) else: out['cn'] = np.median(cnarr['cn']) if 'cn1' in cnarr: if region_weight > 0: out['cn1'] = weighted_median(cnarr['cn1'], cnarr['weight']) else: out['cn1'] = np.median(cnarr['cn1']) out['cn2'] = out['cn'] - out['cn1'] if 'p_bintest' in cnarr: # Only relevant for single-bin segments, but this seems safe/conservative out['p_bintest'] = cnarr['p_bintest'].max() return pd.DataFrame(out)
[docs]@require_column('cn') def ampdel(segarr): """Merge segments by amplified/deleted/neutral copy number status. Follow the clinical reporting convention: - 5+ copies (2.5-fold gain) is amplification - 0 copies is homozygous/deep deletion - CNAs of lesser degree are not reported This is recommended only for selecting segments corresponding to actionable, usually focal, CNAs. Any real and potentially informative but lower-level CNAs will be dropped. """ levels = np.zeros(len(segarr)) levels[segarr['cn'] == 0] = -1 levels[segarr['cn'] >= 5] = 1 # or: segarr['log2'] >= np.log2(2.5) cnarr = squash_by_groups(segarr, pd.Series(levels)) return cnarr[(cnarr['cn'] == 0) | (cnarr['cn'] >= 5)]
[docs]@require_column('depth') def bic(segarr): """Merge segments by Bayesian Information Criterion. See: BIC-seq (Xi 2011), doi:10.1073/pnas.1110574108 """ return NotImplemented
[docs]@require_column('ci_lo', 'ci_hi') def ci(segarr): """Merge segments by confidence interval (overlapping 0). Segments with lower CI above 0 are kept as gains, upper CI below 0 as losses, and the rest with CI overlapping zero are collapsed as neutral. """ levels = np.zeros(len(segarr)) # if len(segarr) < 10: # logging.warning("* segarr :=\n%s", segarr) # logging.warning("* segarr['ci_lo'] :=\n%s", segarr['ci_lo']) # logging.warning("* segarr['ci_lo']>0 :=\n%s", segarr['ci_lo'] > 0) levels[segarr['ci_lo'].values > 0] = 1 levels[segarr['ci_hi'].values < 0] = -1 return squash_by_groups(segarr, pd.Series(levels))
[docs]@require_column('cn') def cn(segarr): """Merge segments by integer copy number.""" return squash_by_groups(segarr, segarr['cn'])
[docs]@require_column('sem') def sem(segarr, zscore=1.96): """Merge segments by Standard Error of the Mean (SEM). Use each segment's SEM value to estimate a 95% confidence interval (via `zscore`). Segments with lower CI above 0 are kept as gains, upper CI below 0 as losses, and the rest with CI overlapping zero are collapsed as neutral. """ margin = segarr['sem'] * zscore levels = np.zeros(len(segarr)) levels[segarr['log2'] - margin > 0] = 1 levels[segarr['log2'] + margin < 0] = -1 return squash_by_groups(segarr, pd.Series(levels))