Source code for skchange.anomaly_detectors._anomalisers

"""Anomaly detectors composed of change detectors and some conversion logic."""

from typing import Callable

import numpy as np
import pandas as pd

from ..change_detectors.base import BaseChangeDetector
from .base import BaseSegmentAnomalyDetector


[docs] class StatThresholdAnomaliser(BaseSegmentAnomalyDetector): """Anomaly detection based on thresholding the values of segment statistics. Parameters ---------- change_detector : BaseChangeDetector Change detector to use for detecting segments. stat : callable, optional (default=np.mean) Statistic to calculate per segment. A function that takes in a 1D array and returns a float. stat_lower : float, optional (default=-1.0) Segments with a statistic lower than this value are considered anomalous. stat_upper : float, optional (default=1.0) Segments with a statistic higher than this value are considered anomalous. """ _tags = { "capability:missing_values": False, "capability:multivariate": False, "fit_is_empty": False, } def __init__( self, change_detector: BaseChangeDetector, stat: Callable[[np.ndarray], float] = np.mean, stat_lower: float = -1.0, stat_upper: float = 1.0, ): self.change_detector = change_detector self.stat = stat self.stat_lower = stat_lower self.stat_upper = stat_upper super().__init__() if self.stat_lower > self.stat_upper: message = f"stat_lower ({self.stat_lower}) must be less" +f" than or equal to stat_upper ({self.stat_upper})." raise ValueError(message) self.clone_tags(change_detector, ["distribution_type"]) def _fit(self, X: pd.DataFrame, y: pd.DataFrame | None = None): """Fit to training data. Parameters ---------- X : pd.DataFrame Training data to fit the detector to. y : pd.Series, optional Does nothing. Only here to make the fit method compatible with `sktime` and `scikit-learn`. Returns ------- self : Reference to self. State change ------------ Creates fitted model that updates attributes ending in "_". """ self.change_detector_: BaseChangeDetector = self.change_detector.clone() self.change_detector_.fit(X, y) return self def _predict(self, X: pd.DataFrame | pd.Series) -> pd.Series: """Detect events in test/deployment data. Parameters ---------- X : pd.DataFrame Time series to detect anomalies in. Returns ------- y_sparse: pd.DataFrame A `pd.DataFrame` with a range index and two columns: * ``"ilocs"`` - left-closed ``pd.Interval``s of iloc based segments. * ``"labels"`` - integer labels ``1, ..., K`` for each segment anomaly. """ # This is the required output format for the rest of the code to work. segments = self.change_detector_.transform(X)["labels"] df = pd.concat([X, segments], axis=1) anomalies = [] for _, segment in df.reset_index(drop=True).groupby("labels"): segment_stat = self.stat(segment.iloc[:, 0].values) if (segment_stat < self.stat_lower) | (segment_stat > self.stat_upper): anomalies.append((int(segment.index[0]), int(segment.index[-1] + 1))) return self._format_sparse_output(anomalies)
[docs] @classmethod def get_test_params(cls, parameter_set="default"): """Return testing parameter settings for the estimator. Parameters ---------- parameter_set : str, default="default" Name of the set of test parameters to return, for use in tests. If no special parameters are defined for a value, will return "default" set. There are currently no reserved values for annotators. Returns ------- params : dict or list of dict, default = {} Parameters to create testing instances of the class Each dict are parameters to construct an "interesting" test instance, i.e., MyClass(**params) or MyClass(**params[i]) creates a valid test instance. create_test_instance uses the first (or only) dictionary in params """ from skchange.change_detectors import MovingWindow params = [ { "change_detector": MovingWindow(bandwidth=3), "stat": np.mean, "stat_lower": -1.0, "stat_upper": 1.0, }, { "change_detector": MovingWindow(bandwidth=5), "stat": np.median, "stat_lower": -2.0, "stat_upper": 2.0, }, ] return params