"""Seeded binary segmentation algorithm for multiple changepoint detection."""
__author__ = ["Tveten"]
__all__ = ["SeededBinarySegmentation"]
import numpy as np
import pandas as pd
from ..base import BaseIntervalScorer
from ..change_scores import CUSUM, to_change_score
from ..compose.penalised_score import PenalisedScore
from ..utils.numba import njit
from ..utils.validation.data import check_data
from ..utils.validation.interval_scorer import check_interval_scorer
from ..utils.validation.parameters import check_in_interval, check_larger_than
from ..utils.validation.penalties import check_penalty
from .base import BaseChangeDetector
@njit
def make_seeded_intervals(
n: int, min_length: int, max_length: int, growth_factor: float = 1.5
) -> tuple[np.ndarray, np.ndarray]:
starts = [0] # For numba to be able to compile type.
ends = [1] # For numba to be able to compile type.
step_factor = 1 - 1 / growth_factor
max_length = min(max_length, n)
n_lengths = int(np.ceil(np.log(max_length / min_length) / np.log(growth_factor)))
interval_lens = np.unique(np.round(np.geomspace(min_length, max_length, n_lengths)))
for interval_len in interval_lens:
step = max(1, np.round(step_factor * interval_len))
n_steps = int(np.ceil((n - interval_len) / step))
new_starts = [int(i * step) for i in range(n_steps + 1)]
starts += new_starts
new_ends = [int(min(i * step + interval_len, n)) for i in range(n_steps + 1)]
ends += new_ends
if ends[-1] - starts[-1] < min_length:
starts[-1] = n - min_length
return np.array(starts[1:]), np.array(ends[1:])
@njit
def greedy_selection(
max_scores: np.ndarray,
argmax_scores: np.ndarray,
starts: np.ndarray,
ends: np.ndarray,
) -> list[int]:
max_scores = max_scores.copy()
cpts = []
while np.any(max_scores > 0):
argmax = max_scores.argmax()
cpt = argmax_scores[argmax]
cpts.append(int(cpt))
# remove intervals that contain the detected changepoint.
max_scores[(cpt >= starts) & (cpt < ends)] = 0.0
cpts.sort()
return cpts
@njit
def narrowest_selection(
max_scores: np.ndarray,
argmax_scores: np.ndarray,
starts: np.ndarray,
ends: np.ndarray,
) -> list[int]:
cpts = []
scores_above_threshold = max_scores > 0
candidate_starts = starts[scores_above_threshold]
candidate_ends = ends[scores_above_threshold]
candidate_maximizers = argmax_scores[scores_above_threshold]
while len(candidate_starts) > 0:
argmin = np.argmin(candidate_ends - candidate_starts)
cpt = candidate_maximizers[argmin]
cpts.append(int(cpt))
# remove candidates that contain the detected changepoint.
cpt_not_in_interval = ~((cpt >= candidate_starts) & (cpt < candidate_ends))
candidate_starts = candidate_starts[cpt_not_in_interval]
candidate_ends = candidate_ends[cpt_not_in_interval]
candidate_maximizers = candidate_maximizers[cpt_not_in_interval]
cpts.sort()
return cpts
def run_seeded_binseg(
penalised_score: BaseIntervalScorer,
max_interval_length: int,
growth_factor: float,
selection_method: str = "greedy",
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
penalised_score.check_is_penalised()
penalised_score.check_is_fitted()
n_samples = penalised_score._X.shape[0]
starts, ends = make_seeded_intervals(
n_samples,
2 * penalised_score.min_size,
max_interval_length,
growth_factor,
)
max_scores = np.zeros(starts.size)
argmax_scores = np.zeros(starts.size, dtype=np.int64)
for i, (start, end) in enumerate(zip(starts, ends)):
splits = np.arange(
start + penalised_score.min_size, end - penalised_score.min_size + 1
)
intervals = np.column_stack(
(np.repeat(start, splits.size), splits, np.repeat(end, splits.size))
)
scores = penalised_score.evaluate(intervals)
argmax = np.argmax(scores)
max_scores[i] = scores[argmax, 0] # index 0 to get a scalar value
argmax_scores[i] = splits[0] + argmax
if selection_method == "greedy":
cpts = greedy_selection(max_scores, argmax_scores, starts, ends)
elif selection_method == "narrowest":
cpts = narrowest_selection(max_scores, argmax_scores, starts, ends)
return cpts, max_scores, argmax_scores, starts, ends
[docs]
class SeededBinarySegmentation(BaseChangeDetector):
"""Seeded binary segmentation algorithm for multiple changepoint detection.
Binary segmentation type changepoint detection algorithms recursively split the data
into two segments, and test whether the two segments are different. The seeded
binary segmentation algorithm is an efficient version of such algorithms, which
tests for changepoints in intervals of exponentially growing length. It has the same
theoretical guarantees as the original binary segmentation algorithm, but runs
in log-linear time no matter the changepoint configuration.
Parameters
----------
change_score : BaseIntervalScorer, optional, default=CUSUM()
The change score to use in the algorithm. If a cost is given, it is
converted to a change score using the `ChangeScore` class.
penalty : np.ndarray or float, optional, default=None
The penalty to use for change detection. If the penalty is
penalised (`change_score.get_tag("is_penalised")`) the penalty will
be ignored. The different types of penalties are as follows:
* ``float``: A constant penalty applied to the sum of scores across all
variables in the data.
* ``np.ndarray``: A penalty array of the same length as the number of
columns in the data, where element ``i`` of the array is the penalty for
``i+1`` variables being affected by a change. The penalty array
must be positive and increasing (not strictly). A penalised score with a
linear penalty array is faster to evaluate than a nonlinear penalty array.
* ``None``: A default penalty is created in `predict` based on the fitted
score using the `make_bic_penalty` function.
max_interval_length : int, default=200
The maximum length of an interval to estimate a changepoint in. Must be greater
than or equal to ``2 * change_score.min_size``.
growth_factor : float, default=1.5
The growth factor for the seeded intervals. Intervals grow in size according to
``interval_len=max(interval_len + 1, np.floor(growth_factor * interval_len))``,
starting at ``interval_len=min_interval_length``. It also governs the amount
of overlap between intervals of the same length, as the start of each interval
is shifted by a factor of ``1 + 1 / growth_factor``. Must be a float in
``(1, 2]``.
selection_method : str, default="greedy"
The method to use for selecting changepoints. The options are:
* ``"greedy"``: Selects the changepoint with the highest score, then removes all
intervals that contain the detected changepoint. This process is repeated
until no intervals are left with a score above the threshold.
* ``"narrowest"``: Searches among the intervals with scores above the threshold,
and selects the one with the narrowest interval. It then removes all
intervals that contain the detected changepoint, and repeats these two steps
until no intervals are left with a score above the threshold.
References
----------
.. [1] Kovács, S., Bühlmann, P., Li, H., & Munk, A. (2023). Seeded binary
segmentation: a general methodology for fast and optimal changepoint detection.
Biometrika, 110(1), 249-256.
.. [2] Baranowski, R., Chen, Y., & Fryzlewicz, P. (2019). Narrowest-over-threshold
detection of multiple change points and change-point-like features. Journal of
the Royal Statistical Society Series B: Statistical Methodology, 81(3), 649-672.
Examples
--------
>>> from skchange.change_detectors import SeededBinarySegmentation
>>> from skchange.datasets import generate_alternating_data
>>> df = generate_alternating_data(n_segments=4, mean=10, segment_length=100, p=5)
>>> detector = SeededBinarySegmentation()
>>> detector.fit_predict(df)
ilocs
0 100
1 200
2 300
"""
_tags = {
"authors": ["Tveten"],
"maintainers": ["Tveten"],
"fit_is_empty": True,
}
def __init__(
self,
change_score: BaseIntervalScorer | None = None,
penalty: np.ndarray | float | None = None,
max_interval_length: int = 200,
growth_factor: float = 1.5,
selection_method: str = "greedy",
):
self.change_score = change_score
self.penalty = penalty
self.max_interval_length = max_interval_length
self.growth_factor = growth_factor
self.selection_method = selection_method
super().__init__()
_score = CUSUM() if change_score is None else change_score
check_interval_scorer(
_score,
"change_score",
"SeededBinarySegmentation",
required_tasks=["cost", "change_score"],
allow_penalised=True,
)
_change_score = to_change_score(_score)
check_penalty(penalty, "penalty", "SeededBinarySegmentation")
self._penalised_score = (
_change_score.clone() # need to avoid modifying the input change_score
if _change_score.get_tag("is_penalised")
else PenalisedScore(_change_score, penalty)
)
check_in_interval(
pd.Interval(1.0, 2.0, closed="right"),
self.growth_factor,
"growth_factor",
)
valid_selection_methods = ["greedy", "narrowest"]
if self.selection_method not in valid_selection_methods:
raise ValueError(
f"Invalid selection method. Must be one of {valid_selection_methods}."
)
self.clone_tags(_change_score, ["distribution_type"])
def _predict(self, X: pd.DataFrame | pd.Series) -> pd.Series:
"""Detect events in test/deployment data.
Parameters
----------
X : pd.DataFrame
Time series to detect change points in.
Returns
-------
y_sparse : pd.DataFrame
A `pd.DataFrame` with a range index and one column:
* ``"ilocs"`` - integer locations of the change points.
Attributes
----------
fitted_score : BaseIntervalScorer
The fitted penalised change score used for the detection.
scores: pd.DataFrame
A `pd.DataFrame` with the following columns:
* ``"start"`` - start of the interval.
* ``"end"`` - end of the interval.
* ``"argmax"`` - index of the maximum score in the interval.
* ``"max"`` - maximum score in the interval.
"""
self.fitted_score: BaseIntervalScorer = self._penalised_score.clone()
self.fitted_score.fit(X)
X = check_data(
X,
min_length=2 * self.fitted_score.min_size,
min_length_name="2 * fitted_change_score.min_size",
)
check_larger_than(
2 * self.fitted_score.min_size,
self.max_interval_length,
"max_interval_length",
)
cpts, scores, maximizers, starts, ends = run_seeded_binseg(
penalised_score=self.fitted_score,
max_interval_length=self.max_interval_length,
growth_factor=self.growth_factor,
selection_method=self.selection_method,
)
self.scores = pd.DataFrame(
{"start": starts, "end": ends, "argmax": maximizers, "max": scores}
)
return self._format_sparse_output(cpts)
[docs]
@classmethod
def get_test_params(cls, parameter_set="default"):
"""Return testing parameter settings for the estimator.
Parameters
----------
parameter_set : str, default="default"
Name of the set of test parameters to return, for use in tests. If no
special parameters are defined for a value, will return `"default"` set.
There are currently no reserved values for annotators.
Returns
-------
params : dict or list of dict, default = {}
Parameters to create testing instances of the class
Each dict are parameters to construct an "interesting" test instance, i.e.,
`MyClass(**params)` or `MyClass(**params[i])` creates a valid test instance.
`create_test_instance` uses the first (or only) dictionary in `params`
"""
from skchange.costs import L2Cost
params = [
{
"change_score": L2Cost(),
"max_interval_length": 100,
"penalty": 30,
},
{
"change_score": L2Cost(),
"max_interval_length": 20,
"penalty": 10,
},
{
"change_score": L2Cost(),
"max_interval_length": 20,
"penalty": 10,
"selection_method": "narrowest",
},
]
return params