Source code for skchange.anomaly_detectors._circular_binseg

"""Circular binary segmentation algorithm for multiple changepoint detection."""

__author__ = ["Tveten"]
__all__ = ["CircularBinarySegmentation"]

import numpy as np
import pandas as pd

from ..anomaly_scores import to_local_anomaly_score
from ..base import BaseIntervalScorer
from ..change_detectors._seeded_binseg import make_seeded_intervals
from ..compose.penalised_score import PenalisedScore
from ..costs import L2Cost
from ..penalties import make_bic_penalty
from ..utils.numba import njit
from ..utils.validation.data import check_data
from ..utils.validation.interval_scorer import check_interval_scorer
from ..utils.validation.parameters import check_in_interval, check_larger_than
from ..utils.validation.penalties import check_penalty
from .base import BaseSegmentAnomalyDetector


@njit
def greedy_anomaly_selection(
    penalised_scores: np.ndarray,
    anomaly_starts: np.ndarray,
    anomaly_ends: np.ndarray,
    starts: np.ndarray,
    ends: np.ndarray,
) -> list[tuple[int, int]]:
    penalised_scores = penalised_scores.copy()
    anomalies = []
    while np.any(penalised_scores > 0):
        argmax = penalised_scores.argmax()
        anomaly_start = anomaly_starts[argmax]
        anomaly_end = anomaly_ends[argmax]
        anomalies.append((anomaly_start, anomaly_end))
        # remove intervals that overlap with the detected segment anomaly.
        penalised_scores[(anomaly_end > starts) & (anomaly_start < ends)] = 0.0
    anomalies.sort()
    return anomalies


@njit
def make_anomaly_intervals(
    interval_start: int, interval_end: int, min_segment_length: int = 1
) -> tuple[np.ndarray, np.ndarray]:
    starts = []
    ends = []
    for i in range(interval_start + 1, interval_end - min_segment_length + 2):
        # TODO: Add support for anomaly_intervals starting at interval_start and ending
        # at interval_end. Currently blocked by interval evaluators requiring
        # strictly increasing interval input.
        for j in range(i + min_segment_length, interval_end):
            baseline_n = interval_end - j + i - interval_start
            if baseline_n >= min_segment_length:
                starts.append(i)
                ends.append(j)
    return np.array(starts, dtype=np.int64), np.array(ends, dtype=np.int64)


def run_circular_binseg(
    penalised_score: BaseIntervalScorer,
    min_segment_length: int,
    max_interval_length: int,
    growth_factor: float,
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    penalised_score.check_is_penalised()
    penalised_score.check_is_fitted()
    n_samples = penalised_score._X.shape[0]

    starts, ends = make_seeded_intervals(
        n_samples,
        2 * min_segment_length,
        max_interval_length,
        growth_factor,
    )

    anomaly_scores = np.zeros(starts.size)
    anomaly_starts = np.zeros(starts.size, dtype=np.int64)
    anomaly_ends = np.zeros(starts.size, dtype=np.int64)
    maximizers = np.zeros((starts.size, 2))
    for i, (start, end) in enumerate(zip(starts, ends)):
        anomaly_start_candidates, anomaly_end_candidates = make_anomaly_intervals(
            start, end, min_segment_length
        )
        intervals = np.column_stack(
            (
                np.repeat(start, anomaly_start_candidates.size),
                anomaly_start_candidates,
                anomaly_end_candidates,
                np.repeat(end, anomaly_start_candidates.size),
            )
        )
        scores = penalised_score.evaluate(intervals)
        agg_scores = np.sum(scores, axis=1)
        argmax = np.argmax(agg_scores)
        anomaly_scores[i] = agg_scores[argmax]
        anomaly_starts[i] = anomaly_start_candidates[argmax]
        anomaly_ends[i] = anomaly_end_candidates[argmax]

    anomalies = greedy_anomaly_selection(
        anomaly_scores, anomaly_starts, anomaly_ends, starts, ends
    )
    return anomalies, anomaly_scores, maximizers, starts, ends


def _make_bic_penalty_from_score(score: BaseIntervalScorer) -> float:
    score.check_is_fitted()
    n = score._X.shape[0]
    p = score._X.shape[1]
    return make_bic_penalty(score.get_model_size(p), n, additional_cpts=2)


[docs] class CircularBinarySegmentation(BaseSegmentAnomalyDetector): """Circular binary segmentation algorithm for multiple segment anomaly detection. Binary segmentation type changepoint detection algorithms recursively split the data into two segments, and test whether the two segments are different. Circular binary segmentation [1]_ is a variant of binary segmentation where the statistical test (anomaly score) is applied to compare the data behaviour of an inner interval subset with the surrounding data contained in an outer interval. In other words, the null hypothesis within each outer interval is that the data is stationary, while the alternative hypothesis is that there is a segment anomaly within the outer interval. Parameters ---------- anomaly_score : BaseIntervalScorer, optional, default=L2Cost() The local anomaly score to use for anomaly detection. If a cost is given, it is converted to a local anomaly score using the `LocalAnomalyScore` class. penalty : np.ndarray or float, optional, default=None The penalty to use for anomaly detection. If the penalty is penalised (`anomaly_score.get_tag("is_penalised")`) the penalty will be ignored. The different types of penalties are as follows: * ``float``: A constant penalty applied to the sum of scores across all variables in the data. * ``np.ndarray``: A penalty array of the same length as the number of columns in the data, where element ``i`` of the array is the penalty for ``i+1`` variables being affected by an anomaly. The penalty array must be positive and increasing (not strictly). A penalised score with a linear penalty array is faster to evaluate than a nonlinear penalty array. * ``None``: A default penalty is created in `predict` based on the fitted score using the `make_bic_penalty` function. min_segment_length : int, default=5 Minimum length between two changepoints. Must be greater than or equal to 1. max_interval_length : int, default=100 The maximum length of an interval to estimate a changepoint in. Must be greater than or equal to ``2 * min_segment_length``. growth_factor : float, default=1.5 The growth factor for the seeded intervals. Intervals grow in size according to ``interval_len=max(interval_len + 1, np.floor(growth_factor * interval_len))``, starting at ``interval_len=min_interval_length``. It also governs the amount of overlap between intervals of the same length, as the start of each interval is shifted by a factor of ``1 + 1 / growth_factor``. Must be a float in ``(1, 2]``. References ---------- .. [1] Olshen, A. B., Venkatraman, E. S., Lucito, R., & Wigler, M. (2004). Circular binary segmentation for the analysis of array-based DNA copy number data. Biostatistics, 5(4), 557-572. Examples -------- >>> from skchange.anomaly_detectors import CircularBinarySegmentation >>> from skchange.datasets import generate_alternating_data >>> df = generate_alternating_data(n_segments=5, mean=10, segment_length=20) >>> detector = CircularBinarySegmentation() >>> detector.fit_predict(df) ilocs labels 0 [20, 40) 1 1 [60, 80) 2 Notes ----- Using costs to generate local anomaly scores will be significantly slower than using anomaly scores that are implemented directly. This is because the local anomaly score requires evaluating the cost at disjoint subsets of the data (before and after an anomaly), which is not a natural operation for costs implemented as interval evaluators. """ _tags = { "capability:missing_values": False, "capability:multivariate": True, "fit_is_empty": True, } def __init__( self, anomaly_score: BaseIntervalScorer | None = None, penalty: float | np.ndarray | None = None, min_segment_length: int = 5, max_interval_length: int = 1000, growth_factor: float = 1.5, ): self.anomaly_score = anomaly_score self.penalty = penalty self.min_segment_length = min_segment_length self.max_interval_length = max_interval_length self.growth_factor = growth_factor super().__init__() _score = L2Cost() if anomaly_score is None else anomaly_score check_interval_scorer( _score, "anomaly_score", "CircularBinarySegmentation", required_tasks=["cost", "anomaly_score"], allow_penalised=True, ) _anomaly_score = to_local_anomaly_score(_score) check_penalty(penalty, "penalty", "CircularBinarySegmentation") self._penalised_score = ( _anomaly_score.clone() # need to avoid modifying the input change_score if _anomaly_score.get_tag("is_penalised") else PenalisedScore( _anomaly_score, penalty, make_default_penalty=_make_bic_penalty_from_score, ) ) check_larger_than(1.0, self.min_segment_length, "min_segment_length") check_larger_than( 2 * self.min_segment_length, self.max_interval_length, "max_interval_length" ) check_in_interval( pd.Interval(1.0, 2.0, closed="right"), self.growth_factor, "growth_factor", ) self.clone_tags(_anomaly_score, ["distribution_type"]) def _predict(self, X: pd.DataFrame | pd.Series) -> pd.Series: """Detect events in test/deployment data. Parameters ---------- X : pd.DataFrame Time series to detect anomalies in. Returns ------- y_sparse: pd.DataFrame A `pd.DataFrame` with a range index and two columns: * ``"ilocs"`` - left-closed ``pd.Interval``s of iloc based segments. * ``"labels"`` - integer labels ``1, ..., K`` for each segment anomaly. Attributes ---------- fitted_score : BaseIntervalScorer The fitted penalised local anomaly score used for the detection. scores : pd.DataFrame A `pd.DataFrame` with the following columns: * ``"interval_start"`` - start of the interval. * ``"interval_end"`` - end of the interval. * ``"argmax_anomaly_start"`` - start of the detected segment anomaly. * ``"argmax_anomaly_end"`` - end of the detected segment anomaly. * ``"score"`` - score for the detected segment anomaly. """ self.fitted_score: BaseIntervalScorer = self._penalised_score.clone() self.fitted_score.fit(X) X = check_data( X, min_length=2 * self.fitted_score.min_size, min_length_name="2 * fitted_change_score.min_size", ) anomalies, scores, maximizers, starts, ends = run_circular_binseg( penalised_score=self.fitted_score, min_segment_length=self.min_segment_length, max_interval_length=self.max_interval_length, growth_factor=self.growth_factor, ) self.scores = pd.DataFrame( { "interval_start": starts, "interval_end": ends, "argmax_anomaly_start": maximizers[:, 0], "argmax_anomaly_end": maximizers[:, 1], "score": scores, } ) return self._format_sparse_output(anomalies)
[docs] @classmethod def get_test_params(cls, parameter_set: str = "default"): """Return testing parameter settings for the estimator. Parameters ---------- parameter_set : str, default="default" Name of the set of test parameters to return, for use in tests. If no special parameters are defined for a value, will return ``"default"`` set. There are currently no reserved values for annotators. Returns ------- params : dict or list of dict, default = {} Parameters to create testing instances of the class Each dict are parameters to construct an "interesting" test instance, i.e., `MyClass(**params)` or `MyClass(**params[i])` creates a valid test instance. `create_test_instance` uses the first (or only) dictionary in `params` """ from skchange.costs import L2Cost, MultivariateGaussianCost params = [ {"anomaly_score": L2Cost(), "penalty": 20}, { "anomaly_score": L2Cost(), "min_segment_length": 3, "max_interval_length": 50, }, { "anomaly_score": MultivariateGaussianCost(), "min_segment_length": 5, "max_interval_length": 20, }, ] return params