"""The ESAC score for detecting changes in the mean of high-dimensional data."""
import numpy as np
from scipy.stats import norm
from ..base import BaseIntervalScorer
from ..utils.numba import njit
from ..utils.numba.stats import col_cumsum
from ..utils.validation.parameters import check_larger_than
from ._cusum import cusum_score
@njit
def transform_esac(
cusum_scores: np.ndarray,
a_s: np.ndarray,
nu_s: np.ndarray,
t_s: np.ndarray,
threshold: np.ndarray,
) -> tuple[np.ndarray, np.ndarray]:
r"""
Compute ESAC scores from CUSUM scores.
This function calculates the penalised score for the ESAC algorithm,
as defined in Equation (6) in [1]_. The outputs are penalised CUSUM
scores computed from the input CUSUM scores.
Parameters
----------
cusum_scores (np.ndarray): A 2D array where each row represents the
CUSUM scores for a specific candidate change location.
a_s (np.ndarray): A 1D array of hard threshold values. Correspond to
a(t) as defined in Equation (4) in [1]_ for each t specified in t_s.
nu_s (np.ndarray): A 1D array of mean-centering terms. Correspond to
nu(t) as defined after Equation (4) in [1]_ for each t specified in t_s.
t_s (np.ndarray): A 1D array of candidate sparsity values corresponding
to the element in a_s and nu_s.
threshold (np.ndarray): A 1D array of penalty values, corresponding to
\gamma(t) in Equation (4) in [1]_, where t is as defined in t_s.
Returns
-------
tuple[np.ndarray, np.ndarray]: A tuple containing:
- output_scores (np.ndarray): A 2D array of computed ESAC scores. The
array has one column, where each element represents the maximum
score for each candidate change location.
- sargmax (np.ndarray): A 1D array giving sparisty level at which the
maximum score was achieved for each candidate change location.
References
----------
.. [1] Per August Jarval Moen, Ingrid Kristine Glad, Martin Tveten. Efficient
sparsity adaptive changepoint estimation. Electron. J. Statist. 18 (2)
3975 - 4038, 2024. https://doi.org/10.1214/24-EJS2294.
"""
num_levels = len(threshold)
num_cusum_scores = len(cusum_scores)
output_scores = np.zeros(num_cusum_scores, dtype=np.float64)
sargmax = np.zeros(num_cusum_scores, dtype=np.int64)
for i in range(num_cusum_scores):
temp_max = -np.inf
for j in range(num_levels):
temp_vec = (cusum_scores[i])[np.abs(cusum_scores[i]) > a_s[j]]
if len(temp_vec) > 0:
temp = np.sum(temp_vec**2 - nu_s[j]) - threshold[j]
if temp > temp_max:
temp_max = temp
sargmax[i] = t_s[j]
output_scores[i] = temp_max
return output_scores.reshape(-1, 1), sargmax
[docs]
class ESACScore(BaseIntervalScorer):
"""The ESAC score for detecting changes in the mean of high-dimensional data.
This is the sparsity adaptive penalised CUSUM score for a change in the mean.
The ESAC score is a penalised version of the CUSUM score, where the CUSUM of
each time series is thresholded, mean-centered and penalised by a sparsity
dependant penalty. The score is defined in Equation (6) in [1]_.
Parameters
----------
threshold_dense : float, optional, default=1.5
The leading constant in the penalty function taken as in (8) in [1]_ in the
dense case where the candidate sparisity level t is greater than or equal to
sqrt(p * log(n)).
threshold_sparse : float, optional, default=1.0
The leading constant in the penalty function taken as in (8) in [1]_ in the
sparse case where the candidate sparisity level t is less than sqrt(p * log(n)).
References
----------
.. [1] Per August Jarval Moen, Ingrid Kristine Glad, Martin Tveten. Efficient
sparsity adaptive changepoint estimation. Electron. J. Statist. 18 (2)
3975 - 4038, 2024. https://doi.org/10.1214/24-EJS2294.
"""
_tags = {
"authors": ["peraugustmoen", "Tveten"],
"maintainers": ["peraugustmoen", "Tveten"],
"task": "change_score",
"distribution_type": "Gaussian",
"is_conditional": False,
"is_aggregated": True,
"is_penalised": True,
}
def __init__(
self,
threshold_dense=1.5,
threshold_sparse=1.0,
):
self.threshold_dense = threshold_dense
self.threshold_sparse = threshold_sparse
super().__init__()
check_larger_than(0.0, self.threshold_dense, "threshold_dense")
check_larger_than(0.0, self.threshold_sparse, "threshold_sparse")
def _fit(self, X: np.ndarray, y=None):
"""Fit the change score evaluator.
Parameters
----------
X : np.ndarray
Data to evaluate. Must be a 2D array.
y : None
Ignored. Included for API consistency by convention.
Returns
-------
self :
Reference to self.
"""
self._sums = col_cumsum(X, init_zero=True)
self.n = X.shape[0]
self.p = X.shape[1]
n = self.n
p = self.p
if p == 1:
self.a_s = np.array([0.0])
self.nu_s = np.array([1.0])
self.t_s = np.array([1])
self.threshold = np.array(
[self.threshold_dense * (np.sqrt(p * np.log(n)) + np.log(n))]
)
else:
max_s = min(np.sqrt(p * np.log(n)), p)
# Generate log2ss from 0 to floor(log2(max_s))
log2ss = np.arange(0, np.floor(np.log2(max_s)) + 1)
# calculate candidate sparsity values
ss = 2**log2ss
# add p as candidate sparsity and reverse order
ss = np.concatenate(([p], ss[::-1]))
self.t_s = np.array(ss, dtype=float)
ss = np.array(ss, dtype=float)
# Initialize as array
self.a_s = np.zeros_like(ss, dtype=float)
self.a_s[1:] = np.sqrt(
2 * np.log(np.exp(1) * p * 4 * np.log(n) / ss[1:] ** 2)
)
# Calculate nu_as using the following formula:
# nu(a) = E(Z^2|Z^2 > a^2) = 1 + a * exp(log(phi(a)) - log(1 - Phi(a))),
# where Z ~ N(0, 1) and phi is the standard normal density function,
# and Phi is the standard normal cumulative distribution function.
log_dnorm = norm.logpdf(self.a_s)
log_pnorm_upper = norm.logsf(self.a_s)
self.nu_s = 1 + self.a_s * np.exp(log_dnorm - log_pnorm_upper)
self.threshold = np.zeros_like(ss, dtype=float)
self.threshold[0] = self.threshold_dense * (
np.sqrt(4 * p * np.log(n)) + 4 * np.log(n)
)
self.threshold[1:] = self.threshold_sparse * (
ss[1:] * np.log(np.exp(1) * p * 4 * np.log(n) / ss[1:] ** 2)
+ 4 * np.log(n)
)
return self
def _evaluate(self, cuts: np.ndarray):
"""Evaluate the change score for a split within an interval.
Evaluates the score for `X[start:split]` vs. `X[split:end]` for each
start, split, end in cuts.
Parameters
----------
cuts : np.ndarray
A 2D array with three columns of integer locations.
The first column is the ``start``, the second is the ``split``, and the
third is the ``end`` of the interval to evaluate.
The difference between subsets ``X[start:split]`` and ``X[split:end]`` is
evaluated for each row in `cuts`.
Returns
-------
scores : np.ndarray
A 2D array of change scores. One row for each cut. The number of
columns is 1 if the change score is inherently multivariate. The number of
columns is equal to the number of columns in the input data if the score is
univariate. In this case, each column represents the univariate score for
the corresponding input data column.
"""
starts = cuts[:, 0]
splits = cuts[:, 1]
ends = cuts[:, 2]
cusum_scores = cusum_score(starts, ends, splits, self._sums)
thresholded_cusum_scores, sargmaxes = transform_esac(
cusum_scores, self.a_s, self.nu_s, self.t_s, self.threshold
)
self.sargmaxes = sargmaxes
return thresholded_cusum_scores
@property
def min_size(self) -> int | None:
"""Minimum size of the interval to evaluate.
The size of each interval is defined as ``cuts[i, 1] - cuts[i, 0]``.
Returns
-------
int or None
The minimum valid size of an interval to evaluate. If ``None``, it is
unknown what the minimum size is. E.g., the scorer may need to be fitted
first to determine the minimum size.
"""
return 1
[docs]
@classmethod
def get_test_params(cls, parameter_set="default"):
"""Return testing parameter settings for the estimator.
Parameters
----------
parameter_set : str, default="default"
Name of the set of test parameters to return, for use in tests. If no
special parameters are defined for a value, will return `"default"` set.
There are currently no reserved values for change scores.
Returns
-------
params : dict or list of dict, default = {}
Parameters to create testing instances of the class
Each dict are parameters to construct an "interesting" test instance, i.e.,
`MyClass(**params)` or `MyClass(**params[i])` creates a valid test instance.
`create_test_instance` uses the first (or only) dictionary in `params`
"""
params = [
{"threshold_dense": 1.5, "threshold_sparse": 1.0},
{"threshold_dense": 2.0, "threshold_sparse": 2.0},
]
return params