# Source code for cnvlib.smoothing

"""Signal smoothing functions."""
from __future__ import absolute_import, division

from bisect import insort, bisect_left
from collections import deque
import math

import numpy

from . import core, metrics

[docs]def check_inputs(x, width): """Transform width into a half-window size. width is either a fraction of the length of x or an integer size of the whole window. The output half-window size is truncated to the length of x if needed. """ x = numpy.asfarray(x) if 0 < width < 1: wing = int(math.ceil(len(x) * width * 0.5)) elif width >= 2 and int(width) == width: wing = width // 2 else: raise ValueError("width must be between 0 and 1 (got %s)" % width) if wing > len(x): wing = len(x) - 1 assert wing > 0, "Wing must be greater than 0 (got %s)" % wing return x, wing
[docs]def rolling_median(x, width): """Rolling median. Contributed by Peter Otten to comp.lang.python. Source: https://bitbucket.org/janto/snippets/src/tip/running_median.py https://groups.google.com/d/msg/comp.lang.python/0OARyHF0wtA/SEs-glW4t6gJ """ x, wing = check_inputs(x, width) # Pre-allocate the result array # result = numpy.zeros_like(x) result = numpy.empty_like(x) # Keep a copy of the rolling window in original order; initially fill with # a mirrored copy of the first 'wing' points window = deque(numpy.concatenate((x[wing::-1], x[:wing]))) # Also keep a sorted copy of the rolling window values sortwin = sorted(window) # Pad the right edge of the original array with a mirror copy signal = numpy.concatenate((x[wing:], x[:-wing-1:-1])) # Calculate the rolling median at each subsequent point for i, item in enumerate(signal): old = window.popleft() window.append(item) del sortwin[bisect_left(sortwin, old)] insort(sortwin, item) result[i] = sortwin[wing] return result
[docs]def smoothed(x, width, do_fit_edges=False): """Smooth the values in x with the Kaiser windowed filter. See: http://en.wikipedia.org/wiki/Kaiser_window Parameters: x : array-like 1-dimensional numeric data set. width : float Fraction of x's total length to include in the rolling window (i.e. the proportional window width), or the integer size of the window. """ x, wing = check_inputs(x, width) # Pad the edges with mirror-image copies of the array signal = numpy.concatenate((x[wing-1::-1], x, x[:-wing:-1])) # Apply signal smoothing window = numpy.kaiser(2* wing + 1, 14) y = numpy.convolve(window / window.sum(), signal, mode='same') # Chop off the ends of the result so it has the original size y = y[wing:1-wing] if do_fit_edges: fit_edges(x, y, wing) # In-place return y
[docs]def fit_edges(x, y, wing, polyorder=3): """Apply polynomial interpolation to the edges of y, in-place. Calculates a polynomial fit (of order polyorder) of x within a window of width twice wing, then updates the smoothed values y in the half of the window closest to the edge. """ window_length = 2 * wing + 1 n = len(x) # Fit each of the two array edges (start and end) _fit_edge(x, y, 0, window_length, 0, wing, polyorder) _fit_edge(x, y, n - window_length, n, n - wing, n, polyorder) # TODO - fix the discontinuities at wing, n-wing
def _fit_edge(x, y, window_start, window_stop, interp_start, interp_stop, polyorder): """ Given a 1-D array x and the specification of a slice of x from window_start to window_stop, create an interpolating polynomial of the sliced sub-array, and evaluate that polynomial from interp_start to interp_stop. Put the result into the corresponding slice of y. """ # Get the edge into a (window_length, -1) array. x_edge = x[window_start:window_stop] # Fit the edges. poly_coeffs has shape (polyorder + 1, -1), # where '-1' is the same as in x_edge. poly_coeffs = numpy.polyfit(numpy.arange(0, window_stop - window_start), x_edge, polyorder) # Compute the interpolated values for the edge. i = numpy.arange(interp_start - window_start, interp_stop - window_start) values = numpy.polyval(poly_coeffs, i) # Put the values into the appropriate slice of y. y[interp_start:interp_stop] = values
[docs]def smooth_genome_coverages(probes, smooth_func, width): """Fit a trendline through probe coverages, handling chromosome boundaries. Returns an array of smoothed coverage values, calculated with smooth_func and width, equal in length to probes. """ # ENH: also split by centromeres (long internal gaps -- see PSCBS) out = {chrom: smooth_func(subprobes['coverage'], width) for chrom, subprobes in probes.by_chromosome()} return numpy.concatenate( [out[chrom] for chrom in sorted(out, key=core.sorter_chrom)]) # Outlier detection
[docs]def outlier_iqr(a, c=1.5): """Detect outliers as a multiple of the IQR from the median. By convention, "outliers" are points more than 1.5 * IQR from the median, and "extremes" or extreme outliers are those more than 3.0 * IQR. """ a = numpy.asarray(a) dists = numpy.abs(a - numpy.median(a)) iqr = metrics.interquartile_range(a) return dists > (c * iqr)
[docs]def outlier_mad_median(a): """MAD-Median rule for detecting outliers. Returns: a boolean array of the same size, where outlier indices are True. X_i is an outlier if:: | X_i - M | _____________ > K ~= 2.24 MAD / 0.6745 where $K = sqrt( X^2_{0.975,1} )$, the square root of the 0.975 quantile of a chi-squared distribution with 1 degree of freedom. This is a very robust rule with the highest possible breakdown point of 0.5. See: - Davies & Gather (1993) The Identification of Multiple Outliers. - Rand R. Wilcox (2012) Introduction to robust estimation and hypothesis testing. Ch.3: Estimating measures of location and scale. """ K = 2.24 a = numpy.asarray(a) dists = numpy.abs(a - numpy.median(a)) mad = metrics.median_absolute_deviation(a, scale_to_sd=False) return (dists / mad) > K