"""Signal smoothing functions."""
from __future__ import absolute_import, division
import math
import warnings
import numpy as np
import pandas as pd
from . import metrics
[docs]def rolling_quantile(x, width, quantile):
"""Rolling quantile (0--1) with mirrored edges."""
x, wing = check_inputs(x, width)
# Pad the edges of the original array with mirror copies
signal = np.concatenate((x[wing-1::-1], x, x[:-wing-1:-1]))
with warnings.catch_warnings():
# NB: in pandas 0.18+ this function is deprecated
warnings.simplefilter("ignore", FutureWarning)
rolled = pd.rolling_quantile(signal, 2 * wing + 1, quantile,
center=True)
return rolled[wing:-wing]
[docs]def rolling_std(x, width):
"""Rolling quantile (0--1) with mirrored edges."""
x, wing = check_inputs(x, width)
# Pad the edges of the original array with mirror copies
signal = np.concatenate((x[wing-1::-1], x, x[:-wing-1:-1]))
with warnings.catch_warnings():
# NB: in pandas 0.18+ this function is deprecated
warnings.simplefilter("ignore", FutureWarning)
rolled = pd.rolling_std(signal, 2 * wing + 1, center=True)
return rolled[wing:-wing]
[docs]def smoothed(x, width, do_fit_edges=False):
"""Smooth the values in `x` with the Kaiser windowed filter.
See: https://en.wikipedia.org/wiki/Kaiser_window
Parameters:
x : array-like
1-dimensional numeric data set.
width : float
Fraction of x's total length to include in the rolling window (i.e. the
proportional window width), or the integer size of the window.
"""
x, wing = check_inputs(x, width)
# Pad the edges with mirror-image copies of the array
signal = np.concatenate((x[wing-1::-1], x, x[:-wing-1:-1]))
# Apply signal smoothing
window = np.kaiser(2 * wing + 1, 14)
y = np.convolve(window / window.sum(), signal, mode='same')
# Chop off the ends of the result so it has the original size
y = y[wing:-wing]
if do_fit_edges:
fit_edges(x, y, wing) # In-place
return y
[docs]def fit_edges(x, y, wing, polyorder=3):
"""Apply polynomial interpolation to the edges of y, in-place.
Calculates a polynomial fit (of order `polyorder`) of `x` within a window of
width twice `wing`, then updates the smoothed values `y` in the half of the
window closest to the edge.
"""
window_length = 2 * wing + 1
n = len(x)
# Fit each of the two array edges (start and end)
_fit_edge(x, y, 0, window_length, 0, wing, polyorder)
_fit_edge(x, y, n - window_length, n, n - wing, n, polyorder)
# TODO - fix the discontinuities at wing, n-wing
def _fit_edge(x, y, window_start, window_stop, interp_start, interp_stop,
polyorder):
"""
Given a 1-D array `x` and the specification of a slice of `x` from
`window_start` to `window_stop`, create an interpolating polynomial of the
sliced sub-array, and evaluate that polynomial from `interp_start` to
`interp_stop`. Put the result into the corresponding slice of `y`.
"""
# Get the edge into a (window_length, -1) array.
x_edge = x[window_start:window_stop]
# Fit the edges. poly_coeffs has shape (polyorder + 1, -1),
# where '-1' is the same as in x_edge.
poly_coeffs = np.polyfit(np.arange(0, window_stop - window_start),
x_edge, polyorder)
# Compute the interpolated values for the edge.
i = np.arange(interp_start - window_start, interp_stop - window_start)
values = np.polyval(poly_coeffs, i)
# Put the values into the appropriate slice of y.
y[interp_start:interp_stop] = values
# Outlier detection
[docs]def outlier_iqr(a, c=3.0):
"""Detect outliers as a multiple of the IQR from the median.
By convention, "outliers" are points more than 1.5 * IQR from the median,
and "extremes" or extreme outliers are those more than 3.0 * IQR.
"""
a = np.asarray(a)
dists = np.abs(a - np.median(a))
iqr = metrics.interquartile_range(a)
return dists > (c * iqr)
[docs]def rolling_outlier_iqr(x, width, c=3.0):
"""Detect outliers as a multiple of the IQR from the median.
By convention, "outliers" are points more than 1.5 * IQR from the median (~2
SD if values are normally distributed), and "extremes" or extreme outliers
are those more than 3.0 * IQR (~4 SD).
"""
if len(x) <= width:
return np.zeros(len(x), dtype=np.bool_)
dists = x - smoothed(x, width)
q_hi = rolling_quantile(dists, width, .75)
q_lo = rolling_quantile(dists, width, .25)
iqr = q_hi - q_lo
outliers = (np.abs(dists) > iqr * c)
return outliers
[docs]def rolling_outlier_quantile(x, width, q, m):
"""Return a boolean mask of outliers by multiples of a quantile in a window.
Outliers are the array elements outside `m` times the `q`'th
quantile of deviations from the smoothed trend line, as calculated from
the trend line residuals. (For example, take the magnitude of the 95th
quantile times 5, and mark any elements greater than that value as
outliers.)
This is the smoothing method used in BIC-seq (doi:10.1073/pnas.1110574108)
with the parameters width=200, q=.95, m=5 for WGS.
"""
if len(x) <= width:
return np.zeros(len(x), dtype=np.bool_)
dists = np.abs(x - smoothed(x, width))
quants = rolling_quantile(dists, width, q)
outliers = (dists > quants * m)
return outliers
[docs]def rolling_outlier_std(x, width, stdevs):
"""Return a boolean mask of outliers by stdev within a rolling window.
Outliers are the array elements outside `stdevs` standard deviations from
the smoothed trend line, as calculated from the trend line residuals.
"""
if len(x) <= width:
return np.zeros(len(x), dtype=np.bool_)
dists = x - smoothed(x, width)
x_std = rolling_std(dists, width)
outliers = (np.abs(dists) > x_std * stdevs)
return outliers