Source code for skchange.change_detectors._moving_window

"""The Moving Window algorithm for multiple changepoint detection."""

__author__ = ["Tveten"]
__all__ = ["MovingWindow"]

import numpy as np
import pandas as pd

from ..base import BaseIntervalScorer
from ..change_scores import CUSUM, to_change_score
from ..compose.penalised_score import PenalisedScore
from ..utils.numba import njit
from ..utils.numba.general import where
from ..utils.validation.data import check_data
from ..utils.validation.interval_scorer import check_interval_scorer
from ..utils.validation.parameters import check_in_interval, check_larger_than
from ..utils.validation.penalties import check_penalty
from .base import BaseChangeDetector


@njit
def make_extended_moving_window_cuts(
    n_samples: int,
    bandwidth: int,
    min_size: int,
) -> np.ndarray:
    splits = np.arange(min_size, n_samples - min_size + 1)

    starts = splits - bandwidth
    starts[starts < 0] = 0
    max_start = n_samples - 2 * bandwidth
    starts[starts > max_start] = max_start

    ends = splits + bandwidth
    ends[ends > n_samples] = n_samples
    min_end = 2 * bandwidth
    ends[ends < min_end] = min_end

    cuts = np.column_stack((starts, splits, ends))
    return cuts


def transform_moving_window(
    penalised_score: BaseIntervalScorer,
    bandwidth: int,
) -> np.ndarray:
    penalised_score.check_is_penalised()
    penalised_score.check_is_fitted()

    n_samples = penalised_score._X.shape[0]
    cuts = make_extended_moving_window_cuts(
        n_samples, bandwidth, penalised_score.min_size
    )
    scores = np.repeat(np.nan, n_samples)
    scores[cuts[:, 1]] = penalised_score.evaluate(cuts).reshape(-1)
    return scores


def transform_multiple_moving_window(
    penalised_score: BaseIntervalScorer,
    bandwidths: list,
) -> np.ndarray:
    n_samples = penalised_score._X.shape[0]
    scores = np.full((n_samples, len(bandwidths)), np.nan)
    for i, bw in enumerate(bandwidths):
        scores[:, i] = transform_moving_window(penalised_score, bw)
    return scores


@njit
def get_candidate_changepoints(
    scores: np.ndarray,
) -> tuple[list[int], list[tuple[int, int]]]:
    detection_intervals = where(scores > 0)
    changepoints = []
    for start, end in detection_intervals:
        cpt = start + np.argmax(scores[start:end])
        changepoints.append(cpt)
    return changepoints, detection_intervals


@njit
def select_changepoints_by_detection_length(
    scores: np.ndarray, min_detection_interval: int
) -> list:
    candidate_cpts, detection_intervals = get_candidate_changepoints(scores)
    cpts = [
        cpt
        for cpt, interval in zip(candidate_cpts, detection_intervals)
        if interval[1] - interval[0] >= min_detection_interval
    ]

    return cpts


@njit
def select_changepoints_by_local_optimum(
    scores: np.ndarray, selection_bandwidth: int
) -> list:
    candidate_cpts, _ = get_candidate_changepoints(scores)
    cpts = [
        cpt
        for cpt in candidate_cpts
        if np.isclose(
            scores[cpt],
            np.max(scores[cpt - selection_bandwidth : cpt + selection_bandwidth + 1]),
        )
    ]
    return cpts


@njit
def select_changepoints_by_bottom_up(
    scores: np.ndarray, bandwidths: list, local_optimum_fraction: float
) -> list:
    bandwidths = sorted(bandwidths)
    candidate_cpts = []
    for i, bw in enumerate(bandwidths):
        local_optimum_bandwidth = int(local_optimum_fraction * bw)
        candidate_cpts_bw = select_changepoints_by_local_optimum(
            scores[:, i], local_optimum_bandwidth
        )
        for candidate_cpt in candidate_cpts_bw:
            candidate_cpts.append((candidate_cpt, bw))

    cpts = [candidate_cpts[0][0]]
    for candidate_cpt, bw in candidate_cpts[1:]:
        distance_to_closest = np.min(np.abs(candidate_cpt - np.array(cpts)))
        local_optimum_bandwidth = int(local_optimum_fraction * bw)
        if distance_to_closest >= local_optimum_bandwidth:
            cpts.append(candidate_cpt)

    return cpts


[docs] class MovingWindow(BaseChangeDetector): """Moving window algorithm for multiple change-point detection. The MOSUM (moving sum) algorithm [1]_, but generalized to allow for any penalised and unpenalised change score. The basic algorithm runs a test statistic for a single change-point across the data in a moving window fashion. In each window, the data is split into two equal halves with `bandwidth` samples on either side of a split point. This process generates a time series of penalised scores, which are used to generate candidate change-points as local maxima within intervals where the penalised scores are all above zero. The final set of change-points is selected from the candidate change-points using one of the two selection methods described in [2]_. Several of the extensions available in the mosum R package [2]_ are also available in this implementation, including the ability to use multiple bandwidths. The CUSUM-type boundary extension for computing the test statistic for candidate change- points less than `bandwidth` samples from the start and end of the data is also implemented by default. Parameters ---------- change_score : BaseIntervalScorer, optional, default=CUSUM() The change score to use in the algorithm. If a cost is given, it is converted to a change score using the `ChangeScore` class. penalty : np.ndarray or float, optional, default=None The penalty to use for change detection. If the penalty is penalised (`change_score.get_tag("is_penalised")`) the penalty will be ignored. The different types of penalties are as follows: * ``float``: A constant penalty applied to the sum of scores across all variables in the data. * ``np.ndarray``: A penalty array of the same length as the number of columns in the data, where element ``i`` of the array is the penalty for ``i+1`` variables being affected by a change. The penalty array must be positive and increasing (not strictly). A penalised score with a linear penalty array is faster to evaluate than a nonlinear penalty array. * ``None``: A default penalty is created in `predict` based on the fitted score using the `make_bic_penalty` function. bandwidth : int or list of int, default=20 The bandwidth is the number of samples on either side of a candidate change-point. Must be 1 or greater. If a list of bandwidths is given, the algorithm will run for each bandwidth in the list and combine the results accoring to the "bottom-up" merging approach described in [2]_. A fibonacci sequence of bandwidths is recommended for multiple bandwidths by the authors in [2]_. selection_method : str, default="local_optimum" The method used to select the final set of change-points from a set of candidate change-points. The options are: * ``"detection_length"``: Accepts a candidate change-point if the ``min_detection_fraction * bandwidth`` consecutive penalised scores are above zero. Corresponds to the epsilon-criterion in [2]_. This method is only available for a single bandwidth. * ``"local_optimum"``: Accepts a candidate change-point if it is the local maximum in the scores within a neighbourhood of size ``local_optimum_fraction * bandwidth``. Corresponds to the eta-criterion in [2]_. This method is used within the "bottom-up" merging approach if multiple bandwidths are given. min_detection_fraction : float, default=0.2 The minimum size of the detection interval for a candidate change-point to be accepted in the ``"detection_length"`` selection method. be between ``0`` (exclusive) and ``1/2`` (exclusive). local_optimum_fraction : float, default=0.4 The size of the neighbourhood around a candidate change-point used in the ``"local_optimum"`` selection method. Must be larger than or equal to ``0``. References ---------- .. [1] Eichinger, B., & Kirch, C. (2018). A MOSUM procedure for the estimation of multiple random change points. .. [2] Meier, A., Kirch, C., & Cho, H. (2021). mosum: A package for moving sums in change-point analysis. Journal of Statistical Software, 97, 1-42. Examples -------- >>> from skchange.change_detectors import MovingWindow >>> from skchange.datasets import generate_alternating_data >>> df = generate_alternating_data(n_segments=4, mean=10, segment_length=100, p=5) >>> detector = MovingWindow() >>> detector.fit_predict(df) ilocs 0 100 1 200 2 300 """ _tags = { "authors": ["Tveten"], "maintainers": ["Tveten"], "fit_is_empty": True, } def __init__( self, change_score: BaseIntervalScorer | None = None, penalty: np.ndarray | float | None = None, bandwidth: int | list = 20, selection_method: str = "local_optimum", min_detection_fraction: float = 0.2, local_optimum_fraction: float = 0.4, ): self.change_score = change_score self.penalty = penalty self.bandwidth = bandwidth self.selection_method = selection_method self.min_detection_fraction = min_detection_fraction self.local_optimum_fraction = local_optimum_fraction super().__init__() _score = CUSUM() if change_score is None else change_score check_interval_scorer( _score, "change_score", "MovingWindow", required_tasks=["cost", "change_score"], allow_penalised=True, ) _change_score = to_change_score(_score) check_penalty(penalty, "penalty", "MovingWindow") self._penalised_score = ( _change_score.clone() # need to avoid modifying the input change_score if _change_score.get_tag("is_penalised") else PenalisedScore(_change_score, penalty) ) if isinstance(bandwidth, int): check_larger_than(1, bandwidth, "bandwidth") self._bandwidth = [bandwidth] elif isinstance(bandwidth, list): if len(bandwidth) == 0: raise ValueError("`bandwidth` must be a non-empty list.") if not all(isinstance(bw, int) for bw in bandwidth): raise TypeError("All elements of `bandwidth` must be integers.") if any(bw < 1 for bw in bandwidth): raise ValueError("All elements of `bandwidth` must be greater than 0.") self._bandwidth = bandwidth else: raise TypeError( "`bandwidth` must be an integer or a list of integers. " f"Got {type(bandwidth)}." ) check_in_interval( pd.Interval(0, 1 / 2, closed="neither"), min_detection_fraction, "min_detection_fraction", ) check_larger_than(0, local_optimum_fraction, "local_optimum_fraction") valid_selection_methods = ["local_optimum", "detection_length"] if selection_method not in valid_selection_methods: raise ValueError( f"`selection_method` must be one of {valid_selection_methods}." f" Got {selection_method}." ) if len(self._bandwidth) > 1 and self.selection_method == "detection_length": raise ValueError( "The selection method `detection_length` is not supported for multiple" " bandwidths. Use `'local_optimum'` instead." ) self.clone_tags(_change_score, ["distribution_type"]) def _transform_scores(self, X: pd.DataFrame | pd.Series) -> pd.DataFrame: """Return scores for predicted labels on test/deployment data. Parameters ---------- X : pd.DataFrame, pd.Series or np.ndarray Data to score (time series). Returns ------- scores : pd.DataFrame with same index as X Scores for sequence `X`. Attributes ---------- fitted_score : BaseIntervalScorer The fitted penalised change score used for the detection. """ X = check_data( X, min_length=2 * max(self._bandwidth), min_length_name="2*max(bandwidth)", ) self.fitted_score: BaseIntervalScorer = self._penalised_score.clone() self.fitted_score.fit(X) scores = transform_multiple_moving_window( self.fitted_score, self._bandwidth, ) formatted_scores = pd.DataFrame( scores, index=X.index, columns=pd.Index([bw for bw in self._bandwidth], name="bandwidth"), ) return formatted_scores def _predict(self, X: pd.DataFrame | pd.Series) -> pd.DataFrame: """Detect events in test/deployment data. Parameters ---------- X : pd.DataFrame Time series to detect change points in. Returns ------- y_sparse : pd.DataFrame A `pd.DataFrame` with a range index and one column: * ``"ilocs"`` - integer locations of the changepoints. Attributes ---------- fitted_score : BaseIntervalScorer The fitted penalised change score used for the detection. scores : pd.Series The detection scores obtained by the `transform_scores` method. """ self.scores: pd.DataFrame = self.transform_scores(X) if self.selection_method == "detection_length": min_detection_length = int(self.min_detection_fraction * self.bandwidth) changepoints = select_changepoints_by_detection_length( self.scores.values.reshape(-1), min_detection_length ) elif self.selection_method == "local_optimum" and len(self._bandwidth) == 1: local_optimum_bandwidth = int(self.local_optimum_fraction * self.bandwidth) changepoints = select_changepoints_by_local_optimum( self.scores.values.reshape(-1), local_optimum_bandwidth ) else: changepoints = select_changepoints_by_bottom_up( self.scores.values, self._bandwidth, self.local_optimum_fraction ) return self._format_sparse_output(changepoints)
[docs] @classmethod def get_test_params(cls, parameter_set: str = "default"): """Return testing parameter settings for the estimator. Parameters ---------- parameter_set : str, default="default" Name of the set of test parameters to return, for use in tests. If no special parameters are defined for a value, will return ``"default"`` set. There are currently no reserved values for annotators. Returns ------- params : dict or list of dict, default = {} Parameters to create testing instances of the class Each dict are parameters to construct an "interesting" test instance, i.e., `MyClass(**params)` or `MyClass(**params[i])` creates a valid test instance. `create_test_instance` uses the first (or only) dictionary in `params` """ from skchange.costs import GaussianCost, L2Cost params = [ {"change_score": L2Cost(), "bandwidth": 5, "penalty": 20}, {"change_score": GaussianCost(), "bandwidth": 5, "penalty": 30}, ] return params