"""Robust estimators of central tendency and scale.
See:
https://en.wikipedia.org/wiki/Robust_measures_of_scale
https://astropy.readthedocs.io/en/latest/_modules/astropy/stats/funcs.html
"""
from __future__ import annotations
import sys
from functools import wraps
from typing import TYPE_CHECKING, Optional, Union
import numpy as np
from scipy import stats
if TYPE_CHECKING:
from collections.abc import Callable
from numpy import float64, ndarray
# Decorators to coerce input and short-circuit trivial cases
[docs]
def on_array(default: Optional[int] = None) -> Callable:
"""Ensure `a` is a numpy array with no missing/NaN values."""
def outer(f):
@wraps(f)
def wrapper(a, **kwargs):
a = np.asarray(a, dtype=float)
a = a[~np.isnan(a)]
if not len(a):
return np.nan
if len(a) == 1:
if default is None:
return a[0]
return default
return f(a, **kwargs)
return wrapper
return outer
[docs]
def on_weighted_array(default: None = None) -> Callable:
"""Ensure `a` and `w` are equal-length numpy arrays with no NaN values.
For weighted descriptives -- `a` is the array of values, `w` is weights.
1. Drop any cells in `a` that are NaN from both `a` and `w`
2. Replace any remaining NaN cells in `w` with 0.
"""
def outer(f):
@wraps(f)
def wrapper(a, w, **kwargs):
if len(a) != len(w):
raise ValueError(f"Unequal array lengths: a={len(a)}, w={len(w)}")
if not len(a):
return np.nan
a = np.asarray(a, dtype=float)
w = np.asarray(w, dtype=float)
# Drop a's NaN indices from both arrays
a_nan = np.isnan(a)
if a_nan.any():
a = a[~a_nan]
if not len(a):
return np.nan
w = w[~a_nan]
if len(a) == 1:
if default is None:
return a[0]
return default
# Fill w's NaN indices
w_nan = np.isnan(w)
if w_nan.any():
w[w_nan] = 0.0
return f(a, w, **kwargs)
return wrapper
return outer
# M-estimators of central location
[docs]
@on_array()
def biweight_location(
a: ndarray,
initial: None = None,
c: float = 6.0,
epsilon: float = 1e-3,
max_iter: int = 5,
) -> float64:
"""Compute the biweight location for an array.
The biweight is a robust statistic for estimating the central location of a
distribution.
"""
def biloc_iter(a, initial):
# Weight the observations by distance from initial estimate
d = a - initial
mad = np.median(np.abs(d))
w = d / max(c * mad, epsilon)
w = (1 - w**2) ** 2
# Omit the outlier points
mask = w < 1
weightsum = w[mask].sum()
if weightsum == 0:
# Insufficient variation to improve the initial estimate
return initial
return initial + (d[mask] * w[mask]).sum() / weightsum
if initial is None:
initial = np.median(a)
for _i in range(max_iter):
result = biloc_iter(a, initial)
if abs(result - initial) <= epsilon:
break
initial = result
return result
[docs]
@on_array()
def modal_location(a: ndarray) -> float64:
"""Return the modal value of an array's values.
The "mode" is the location of peak density among the values, estimated using
a Gaussian kernel density estimator.
Parameters
----------
a : np.array
A 1-D array of floating-point values, e.g. bin log2 ratio values.
"""
sarr = np.sort(a)
kde = stats.gaussian_kde(sarr)
y = kde.evaluate(sarr)
peak = sarr[y.argmax()]
return peak
# Estimators of scale
[docs]
@on_array(0)
def biweight_midvariance(
a: ndarray,
initial: Optional[Union[int, float64]] = None,
c: float = 9.0,
epsilon: float = 1e-3,
) -> float64:
"""Compute the biweight midvariance for an array.
The biweight midvariance is a robust statistic for determining the
midvariance (i.e. the standard deviation) of a distribution.
See:
- https://en.wikipedia.org/wiki/Robust_measures_of_scale#The_biweight_midvariance
- https://astropy.readthedocs.io/en/latest/_modules/astropy/stats/funcs.html
"""
if initial is None:
initial = biweight_location(a)
# Difference of observations from initial location estimate
d = a - initial
# Weighting (avoid dividing by zero)
mad = np.median(np.abs(d))
w = d / max(c * mad, epsilon)
# Omit the outlier points
mask = np.abs(w) < 1
if w[mask].sum() == 0:
# Insufficient variation to improve on MAD
return mad * 1.4826
n = mask.sum()
d_ = d[mask]
w_ = (w**2)[mask]
return np.sqrt(
(n * (d_**2 * (1 - w_) ** 4).sum()) / (((1 - w_) * (1 - 5 * w_)).sum() ** 2)
)
[docs]
@on_array(0)
def gapper_scale(a):
"""Scale estimator based on gaps between order statistics.
See:
- Wainer & Thissen (1976)
- Beers, Flynn, and Gebhardt (1990)
"""
gaps = np.diff(np.sort(a))
n = len(a)
idx = np.arange(1, n)
weights = idx * (n - idx)
return (gaps * weights).sum() * np.sqrt(np.pi) / (n * (n - 1))
[docs]
@on_array(0)
def interquartile_range(a: ndarray) -> float64:
"""Compute the difference between the array's first and third quartiles."""
return np.percentile(a, 75) - np.percentile(a, 25)
[docs]
@on_weighted_array()
def weighted_mad(a, weights, scale_to_sd=True):
"""Median absolute deviation (MAD) with weights."""
a_median = weighted_median(a, weights)
mad = weighted_median(np.abs(a - a_median), weights)
if scale_to_sd:
mad *= 1.4826
return mad
[docs]
@on_weighted_array()
def weighted_std(a: ndarray, weights: ndarray) -> float64:
"""Standard deviation with weights."""
mean = np.average(a, weights=weights)
var = np.average((a - mean) ** 2, weights=weights)
return np.sqrt(var)
[docs]
@on_array(0)
def mean_squared_error(a, initial=None):
"""Mean squared error (MSE).
By default, assume the input array `a` is the residuals/deviations/error,
so MSE is calculated from zero. Another reference point for calculating the
error can be specified with `initial`.
"""
if initial is None:
initial = a.mean()
if initial:
a = a - initial
return (a**2).mean()
[docs]
@on_array(0)
def q_n(a):
"""Rousseeuw & Croux's (1993) Q_n, an alternative to MAD.
``Qn := Cn first quartile of (|x_i - x_j|: i < j)``
where Cn is a constant depending on n.
Finite-sample correction factors must be used to calibrate the
scale of Qn for small-to-medium-sized samples.
n E[Qn]
-- -----
10 1.392
20 1.193
40 1.093
60 1.064
80 1.048
100 1.038
200 1.019
"""
# First quartile of: (|x_i - x_j|: i < j)
vals = []
for i, x_i in enumerate(a):
for x_j in a[i + 1 :]:
vals.append(abs(x_i - x_j))
quartile = np.percentile(vals, 25)
# Cn: a scaling factor determined by sample size
n = len(a)
if n <= 10:
# ENH: warn when extrapolating beyond the data
# ENH: simulate for values up to 10
# (unless the equation below is reliable)
scale = 1.392
elif 10 < n < 400:
# I fitted the simulated values (above) to a power function in Excel:
# f(x) = 1.0 + 3.9559 * x ^ -1.0086
# This should be OK for interpolation. (Does it apply generally?)
scale = 1.0 + (4 / n)
else:
scale = 1.0
return quartile / scale