Source code for skchange.anomaly_detectors._capa

"""The collective and point anomalies (CAPA) algorithm."""

__author__ = ["Tveten"]
__all__ = ["CAPA"]

import numpy as np
import pandas as pd

from ..anomaly_scores import L2Saving, to_saving
from ..base import BaseIntervalScorer
from ..compose.penalised_score import PenalisedScore
from ..penalties import make_chi2_penalty, make_linear_chi2_penalty
from ..utils.numba import njit
from ..utils.validation.data import check_data
from ..utils.validation.interval_scorer import check_interval_scorer
from ..utils.validation.parameters import check_larger_than
from ..utils.validation.penalties import check_penalty
from .base import BaseSegmentAnomalyDetector


@njit
def get_anomalies(
    anomaly_starts: np.ndarray,
) -> tuple[list[tuple[int, int]], list[tuple[int, int]]]:
    segment_anomalies = []
    point_anomalies = []
    i = anomaly_starts.size - 1
    while i >= 0:
        start_i = anomaly_starts[i]
        size = i - start_i + 1
        if size > 1:
            segment_anomalies.append((int(start_i), i + 1))
            i = int(start_i)
        elif size == 1:
            point_anomalies.append((i, i + 1))
        i -= 1
    return segment_anomalies, point_anomalies


def get_affected_components(
    penalised_scorer: PenalisedScore,
    anomalies: list[tuple[int, int]],
) -> list[tuple[int, int, np.ndarray]]:
    penalised_scorer.check_is_penalised()
    penalised_scorer.check_is_fitted()
    new_anomalies = []
    for start, end in anomalies:
        saving_values = penalised_scorer.score_.evaluate(np.array([start, end]))[0]
        saving_order = np.argsort(-saving_values)  # Decreasing order.
        penalised_savings = (
            np.cumsum(saving_values[saving_order]) - penalised_scorer.penalty_
        )
        argmax = np.argmax(penalised_savings)
        new_anomalies.append((start, end, saving_order[: argmax + 1]))
    return new_anomalies


def run_capa(
    segment_penalised_saving: PenalisedScore,
    point_penalised_saving: PenalisedScore,
    min_segment_length: int,
    max_segment_length: int,
    find_affected_components: bool = False,
) -> tuple[np.ndarray, list[tuple[int, int]], list[tuple[int, int]]]:
    segment_penalised_saving.check_is_penalised()
    segment_penalised_saving.check_is_fitted()
    point_penalised_saving.check_is_penalised()
    point_penalised_saving.check_is_fitted()
    n_samples = segment_penalised_saving._X.shape[0]
    if not n_samples == point_penalised_saving._X.shape[0]:
        raise ValueError(
            "The segment and point saving costs must span the same number of samples."
        )

    opt_savings = np.zeros(n_samples + 1)

    # Store the optimal start and affected components of an anomaly for each t.
    # Used to get the final set of anomalies after the loop.
    opt_anomaly_starts = np.repeat(np.nan, n_samples)
    starts = np.array([], dtype=int)
    max_segment_penalty = np.max(segment_penalised_saving.penalty_)

    ts = np.arange(min_segment_length - 1, n_samples)
    for t in ts:
        # Segment anomalies
        t_array = np.array([t])

        starts = np.concatenate((starts, t_array - min_segment_length + 1))
        ends = np.repeat(t + 1, len(starts))
        intervals = np.column_stack((starts, ends))
        segment_savings = segment_penalised_saving.evaluate(intervals).flatten()
        candidate_savings = opt_savings[starts] + segment_savings
        candidate_argmax = np.argmax(candidate_savings)
        opt_segment_saving = candidate_savings[candidate_argmax]
        opt_start = starts[0] + candidate_argmax

        # Point anomalies
        point_interval = np.column_stack((t_array, t_array + 1))
        point_savings = point_penalised_saving.evaluate(point_interval).flatten()
        opt_point_saving = opt_savings[t] + point_savings[0]

        # Combine and store results
        savings = np.array([opt_savings[t], opt_segment_saving, opt_point_saving])
        argmax = np.argmax(savings)
        opt_savings[t + 1] = savings[argmax]
        if argmax == 1:
            opt_anomaly_starts[t] = opt_start
        elif argmax == 2:
            opt_anomaly_starts[t] = t

        # Pruning the admissible starts
        saving_too_low = candidate_savings + max_segment_penalty <= opt_savings[t + 1]
        too_long_segment = starts < t - max_segment_length + 2
        prune = saving_too_low | too_long_segment
        starts = starts[~prune]

    segment_anomalies, point_anomalies = get_anomalies(opt_anomaly_starts)

    if find_affected_components:
        segment_anomalies = get_affected_components(
            segment_penalised_saving, segment_anomalies
        )
        point_anomalies = get_affected_components(
            point_penalised_saving,
            point_anomalies,
        )
    return opt_savings[1:], segment_anomalies, point_anomalies


def _make_chi2_penalty_from_score(score: BaseIntervalScorer) -> float:
    score.check_is_fitted()
    n = score._X.shape[0]
    p = score._X.shape[1]
    return make_chi2_penalty(score.get_model_size(p), n)


def _make_linear_chi2_penalty_from_score(score: BaseIntervalScorer) -> np.ndarray:
    score.check_is_fitted()
    n = score._X.shape[0]
    p = score._X.shape[1]
    return make_linear_chi2_penalty(score.get_model_size(p), n, p)


def _check_capa_penalised_score(score: BaseIntervalScorer, name: str, caller_name: str):
    if score.get_tag("is_penalised") and not isinstance(score, PenalisedScore):
        raise ValueError(
            f"{caller_name} only supports a penalised `{name}` constructed"
            " by `PenalisedScore`."
        )


[docs] class CAPA(BaseSegmentAnomalyDetector): """The collective and point anomaly (CAPA) detection algorithm. An efficient implementation of the CAPA family of algorithms for anomaly detection. Supports both univariate data [1]_ (CAPA) and multivariate data with subset anomalies [2]_ (MVCAPA) by using the penalised saving formulation of the collective anomaly detection problem found in [2]_ and [3]_. For multivariat data, the algorithm can also be used to infer the affected components for each anomaly given a suitable penalty array. Parameters ---------- segment_saving : BaseIntervalScorer, optional, default=L2Saving() The saving to use for segment anomaly detection. If a cost is given, the saving is constructed from the cost. The cost must have a fixed parameter that represents the baseline cost. If a penalised saving is given, it must be constructed from `PenalisedScore`. segment_penalty : np.ndarray or float, optional, default=None The penalty to use for segment anomaly detection. If the segment penalty is penalised (`segment_penalty.get_tag("is_penalised")`) the penalty will be ignored. The different types of penalties are as follows: * ``float``: A constant penalty applied to the sum of scores across all variables in the data. * ``np.ndarray``: A penalty array of the same length as the number of columns in the data, where element ``i`` of the array is the penalty for ``i+1`` variables being affected by an anomaly. The penalty array must be positive and increasing (not strictly). A penalised score with a linear penalty array is faster to evaluate than a nonlinear penalty array. * ``None``: A default constant penalty is created in `predict` based on the fitted score using the `make_chi2_penalty` function. point_saving : BaseIntervalScorer, optional, default=L2Saving() The saving to use for point anomaly detection. Only savings with a minimum size of 1 are permitted. If a cost is given, the saving is constructed from the cost. The cost must have a fixed parameter that represents the baseline cost. If a penalised saving is given, it must be constructed from `PenalisedScore`. point_penalty : np.ndarray or float, optional, default=None The penalty to use for point anomaly detection. See the documentation for `segment_penalty` for details. For ``None`` input, the default is set using the `make_linear_chi2_penalty` function. min_segment_length : int, optional, default=2 Minimum length of a segment. max_segment_length : int, optional, default=1000 Maximum length of a segment. ignore_point_anomalies : bool, optional, default=False If ``True``, detected point anomalies are not returned by `predict`. I.e., only segment anomalies are returned. If ``False``, point anomalies are included in the output as segment anomalies of length 1. find_affected_components : bool, optional, default=False If ``True``, the affected components for each segment anomaly are returned in the `predict` output. This is only relevant for multivariate data in combination with a penalty array. References ---------- .. [1] Fisch, A. T., Eckley, I. A., & Fearnhead, P. (2022). A linear time method for the detection of collective and point anomalies. Statistical Analysis and DataMining: The ASA Data Science Journal, 15(4), 494-508. .. [2] Fisch, A. T., Eckley, I. A., & Fearnhead, P. (2022). Subset multivariate collective and point anomaly detection. Journal of Computational and Graphical Statistics, 31(2), 574-585. .. [3] Tveten, M., Eckley, I. A., & Fearnhead, P. (2022). Scalable change-point and anomaly detection in cross-correlated data with an application to condition monitoring. The Annals of Applied Statistics, 16(2), 721-743. Examples -------- >>> from skchange.anomaly_detectors import CAPA >>> from skchange.datasets import generate_alternating_data >>> df = generate_alternating_data(n_segments=5, mean=10, segment_length=100) >>> detector = CAPA() >>> detector.fit_predict(df) ilocs labels 0 [100, 200) 1 1 [300, 400) 2 """ _tags = { "capability:missing_values": False, "capability:multivariate": True, "fit_is_empty": True, } def __init__( self, segment_saving: BaseIntervalScorer | None = None, segment_penalty: np.ndarray | float | None = None, point_saving: BaseIntervalScorer | None = None, point_penalty: np.ndarray | float | None = None, min_segment_length: int = 2, max_segment_length: int = 1000, ignore_point_anomalies: bool = False, find_affected_components: bool = False, ): self.segment_saving = segment_saving self.segment_penalty = segment_penalty self.point_saving = point_saving self.point_penalty = point_penalty self.min_segment_length = min_segment_length self.max_segment_length = max_segment_length self.ignore_point_anomalies = ignore_point_anomalies self.find_affected_components = find_affected_components super().__init__() _segment_score = L2Saving() if segment_saving is None else segment_saving check_interval_scorer( _segment_score, "segment_saving", "CAPA", required_tasks=["cost", "saving"], allow_penalised=True, ) _segment_saving = to_saving(_segment_score) check_penalty(segment_penalty, "segment_penalty", "CAPA") if _segment_saving.get_tag("is_penalised"): _check_capa_penalised_score(_segment_saving, "segment_saving", "CAPA") self._segment_penalised_saving = _segment_saving.clone() else: self._segment_penalised_saving = PenalisedScore( _segment_saving, segment_penalty, make_default_penalty=_make_chi2_penalty_from_score, ) _point_score = L2Saving() if point_saving is None else point_saving check_interval_scorer( _point_score, "point_saving", "CAPA", required_tasks=["cost", "saving"], allow_penalised=True, ) if _point_score.min_size != 1: raise ValueError("`point_saving` must have `min_size == 1`.") _point_saving = to_saving(_point_score) check_penalty(point_penalty, "point_penalty", "CAPA") if _point_saving.get_tag("is_penalised"): _check_capa_penalised_score(_point_saving, "point_saving", "CAPA") self._point_penalised_saving = _point_saving.clone() else: self._point_penalised_saving = PenalisedScore( _point_saving, point_penalty, make_default_penalty=_make_linear_chi2_penalty_from_score, ) check_larger_than(2, min_segment_length, "min_segment_length") check_larger_than(min_segment_length, max_segment_length, "max_segment_length") self.clone_tags(_segment_saving, ["distribution_type"]) self.set_tags(**{"capability:identify_variables": find_affected_components}) def _predict(self, X: pd.DataFrame | pd.Series) -> pd.Series: """Detect events in test/deployment data. Parameters ---------- X : pd.DataFrame Time series to detect anomalies in. Returns ------- y_sparse: pd.DataFrame A `pd.DataFrame` with a range index and two columns: * ``"ilocs"`` - left-closed ``pd.Interval``s of iloc based segments. * ``"labels"`` - integer labels ``1, ..., K`` for each segment anomaly. * ``"icolumns"`` - list of affected columns for each segment anomaly. Only included if `find_affected_components` is ``True``. Attributes ---------- fitted_segment_saving : BaseIntervalScorer The fitted penalised segment saving used for the detection. fitted_point_saving : BaseIntervalScorer The fitted penalised point saving used for the detection. scores : pd.Series The cumulative optimal savings for the input data. """ X = check_data( X, min_length=self.min_segment_length, min_length_name="min_segment_length", ) self.fitted_segment_saving: BaseIntervalScorer = ( self._segment_penalised_saving.clone().fit(X) ) self.fitted_point_saving: BaseIntervalScorer = ( self._point_penalised_saving.clone().fit(X) ) opt_savings, segment_anomalies, point_anomalies = run_capa( segment_penalised_saving=self.fitted_segment_saving, point_penalised_saving=self.fitted_point_saving, min_segment_length=self.min_segment_length, max_segment_length=self.max_segment_length, find_affected_components=self.find_affected_components, ) self.scores = pd.Series(opt_savings, index=X.index, name="score") anomalies = segment_anomalies if not self.ignore_point_anomalies: anomalies += point_anomalies anomalies = sorted(anomalies) return self._format_sparse_output(anomalies) def _transform_scores(self, X: pd.DataFrame | pd.Series) -> pd.DataFrame: """Return scores for predicted labels on test/deployment data. Parameters ---------- X : pd.DataFrame, pd.Series or np.ndarray Data to score (time series). Returns ------- scores : pd.DataFrame with same index as X Scores for sequence `X`. Notes ----- The CAPA scores are the cumulative optimal savings, so the scores are increasing and are not per observation scores. """ self.predict(X) return self.scores
[docs] @classmethod def get_test_params(cls, parameter_set="default"): """Return testing parameter settings for the estimator. Parameters ---------- parameter_set : str, default="default" Name of the set of test parameters to return, for use in tests. If no special parameters are defined for a value, will return ``"default"`` set. There are currently no reserved values for annotators. Returns ------- params : dict or list of dict, default = {} Parameters to create testing instances of the class Each dict are parameters to construct an "interesting" test instance, i.e., `MyClass(**params)` or `MyClass(**params[i])` creates a valid test instance. `create_test_instance` uses the first (or only) dictionary in `params` """ from skchange.anomaly_scores import L2Saving from skchange.base import BaseIntervalScorer from skchange.compose.penalised_score import PenalisedScore from skchange.costs import L2Cost from skchange.penalties import make_nonlinear_chi2_penalty def _make_nonlinear_chi2_penalty_from_score( score: BaseIntervalScorer, ) -> np.ndarray: score.check_is_fitted() n = score._X.shape[0] p = score._X.shape[1] return make_nonlinear_chi2_penalty(score.get_model_size(p), n, p) params = [ { "segment_saving": PenalisedScore( L2Saving(), make_default_penalty=_make_nonlinear_chi2_penalty_from_score, ), "min_segment_length": 5, "max_segment_length": 100, "find_affected_components": True, }, { "segment_saving": L2Cost(param=0.0), "point_saving": L2Cost(param=0.0), "min_segment_length": 5, "max_segment_length": 100, }, { "segment_saving": L2Cost(param=0.0), "point_saving": L2Cost(param=0.0), "min_segment_length": 2, "max_segment_length": 20, }, ] return params