"""Signal smoothing functions."""
from __future__ import absolute_import, division
from bisect import insort, bisect_left
from collections import deque
import math
import numpy
from . import core, metrics
[docs]def smoothed(x, width, do_fit_edges=False):
"""Smooth the values in `x` with the Kaiser windowed filter.
See: http://en.wikipedia.org/wiki/Kaiser_window
Parameters:
x : array-like
1-dimensional numeric data set.
width : float
Fraction of x's total length to include in the rolling window (i.e. the
proportional window width), or the integer size of the window.
"""
x, wing = check_inputs(x, width)
# Pad the edges with mirror-image copies of the array
signal = numpy.concatenate((x[wing-1::-1], x, x[:-wing:-1]))
# Apply signal smoothing
window = numpy.kaiser(2* wing + 1, 14)
y = numpy.convolve(window / window.sum(), signal, mode='same')
# Chop off the ends of the result so it has the original size
y = y[wing:1-wing]
if do_fit_edges:
fit_edges(x, y, wing) # In-place
return y
[docs]def fit_edges(x, y, wing, polyorder=3):
"""Apply polynomial interpolation to the edges of y, in-place.
Calculates a polynomial fit (of order `polyorder`) of `x` within a window of
width twice `wing`, then updates the smoothed values `y` in the half of the
window closest to the edge.
"""
window_length = 2 * wing + 1
n = len(x)
# Fit each of the two array edges (start and end)
_fit_edge(x, y, 0, window_length, 0, wing, polyorder)
_fit_edge(x, y, n - window_length, n, n - wing, n, polyorder)
# TODO - fix the discontinuities at wing, n-wing
def _fit_edge(x, y, window_start, window_stop, interp_start, interp_stop,
polyorder):
"""
Given a 1-D array `x` and the specification of a slice of `x` from
`window_start` to `window_stop`, create an interpolating polynomial of the
sliced sub-array, and evaluate that polynomial from `interp_start` to
`interp_stop`. Put the result into the corresponding slice of `y`.
"""
# Get the edge into a (window_length, -1) array.
x_edge = x[window_start:window_stop]
# Fit the edges. poly_coeffs has shape (polyorder + 1, -1),
# where '-1' is the same as in x_edge.
poly_coeffs = numpy.polyfit(numpy.arange(0, window_stop - window_start),
x_edge, polyorder)
# Compute the interpolated values for the edge.
i = numpy.arange(interp_start - window_start, interp_stop - window_start)
values = numpy.polyval(poly_coeffs, i)
# Put the values into the appropriate slice of y.
y[interp_start:interp_stop] = values
[docs]def smooth_genome_coverages(probes, smooth_func, width):
"""Fit a trendline through probe coverages, handling chromosome boundaries.
Returns an array of smoothed coverage values, calculated with `smooth_func`
and `width`, equal in length to `probes`.
"""
# ENH: also split by centromeres (long internal gaps -- see PSCBS)
out = {chrom: smooth_func(subprobes['coverage'], width)
for chrom, subprobes in probes.by_chromosome()}
return numpy.concatenate(
[out[chrom] for chrom in sorted(out, key=core.sorter_chrom)])
# Outlier detection
[docs]def outlier_iqr(a, c=1.5):
"""Detect outliers as a multiple of the IQR from the median.
By convention, "outliers" are points more than 1.5 * IQR from the median,
and "extremes" or extreme outliers are those more than 3.0 * IQR.
"""
a = numpy.asarray(a)
dists = numpy.abs(a - numpy.median(a))
iqr = metrics.interquartile_range(a)
return dists > (c * iqr)