Source code for skchange.change_scores._multivariate_gaussian_score

"""The multivariate Gaussian test statistic for a change in mean and/or covariance."""

__author__ = ["johannvk"]
__all__ = ["MultivariateGaussianScore"]

import numpy as np

from ..base import BaseIntervalScorer
from ..costs._multivariate_gaussian_cost import MultivariateGaussianCost
from ..utils.numba import njit


@njit
def _half_integer_digamma(twice_n: int) -> float:
    """Calculate the digamma function for half integer values, i.e. ``twice_n/2``.

    The digamma function is the logarithmic derivative of the gamma function.
    This function is capable of calculating the
    digamma function for half integer values.

    Source: https://en.wikipedia.org/wiki/Digamma_function

    Parameters
    ----------
    twice_n : int
        Twice the integer value.

    Returns
    -------
    res : float
        Value of the digamma function for the half integer value.
    """
    assert twice_n > 0, "n must be a positive integer."

    if twice_n % 2 == 0:
        # Even integer: twice_n = 2n
        res = -np.euler_gamma
        n = twice_n // 2
        for k in range(0, n - 1):
            res += 1.0 / (k + 1.0)
    else:
        res = -2 * np.log(2) - np.euler_gamma
        # Odd integer: twice_n = 2n + 1
        n = (twice_n - 1) // 2
        for k in range(1, n + 1):
            res += 2.0 / (2.0 * k - 1.0)

    return res


@njit
def likelihood_ratio_expected_value(
    sequence_length: int, cut_point: int, dimension: int
) -> float:
    """Calculate the expected value of twice the negative log likelihood ratio.

    We check that the cut point is within the sequence length, and that both `k` and `n`
    are large enough relative to the dimension `p`, to ensure that the expected
    value is finite.
    Should at least have `p+1` points on each side of a split, for `p` dimensional data.

    Parameters
    ----------
    sequence_length : int
        Length of the sequence.
    cut_point : int
        Cut point of the sequence.
    dimension : int
        Dimension of the data.

    Returns
    -------
    g_k_n : float
        Expected value of twice the negative log likelihood ratio.
    """
    n, k, p = sequence_length, cut_point, dimension

    assert 0 < k < n, "Cut point `k` must be within the sequence length `n`."
    assert p > 0, "Dimension `p` must be a positive integer."
    assert k > (p + 1), "Cut point `k` must be larger than the dimension + 1."
    assert n - k > (
        p + 1
    ), "Run length after cut point `n - k` must be larger than dimension + 1."

    g_k_n = p * (
        np.log(2)
        + (n - 1) * np.log(n - 1)
        - (n - k - 1) * np.log(n - k - 1)
        - (k - 1) * np.log(k - 1)
    )

    for j in range(1, p + 1):
        g_k_n += (
            (n - 1) * _half_integer_digamma(n - j)
            - (k - 1) * _half_integer_digamma(k - j)
            - (n - k - 1) * _half_integer_digamma(n - k - j)
        )

    return g_k_n


@njit
def compute_bartlett_corrections(
    sequence_lengths: np.ndarray, cut_points: np.ndarray, dimension: int
):
    """Calculate the Bartlett correction for the twice negated log likelihood ratio.

    Parameters
    ----------
    twice_negated_log_lr : float
        Twice the negative log likelihood ratio.
    sequence_length : int
        Length of the sequence.
    cut_point : int
        Cut point of the sequence.
    dimension : int
        Dimension of the data.

    Returns
    -------
    bartlett_corr_log_lr : float
    """
    bartlett_corrections = np.zeros(
        shape=(sequence_lengths.shape[0], 1), dtype=np.float64
    )

    for i, (sequence_length, cut_point) in enumerate(zip(sequence_lengths, cut_points)):
        g_k_n = likelihood_ratio_expected_value(
            sequence_length=sequence_length, cut_point=cut_point, dimension=dimension
        )
        bartlett_correction_factor = dimension * (dimension + 3.0) / g_k_n
        bartlett_corrections[i] = bartlett_correction_factor

    return bartlett_corrections


[docs] class MultivariateGaussianScore(BaseIntervalScorer): """Multivariate Gaussian change score for a change in mean and/or covariance. Scores are calculated as the likelihood ratio scores for a change in mean and covariance under a multivariate Gaussian distribution[1]_. To stabilize the score, the Bartlett correction is applied by default, which adjusts the score for the relative sizes of the left and right segments. Applying the Bartlett correction results in a more stable score, especially for small sample sizes, ensuring that the scores approach the chi-squared distribution asymptotically. Parameters ---------- apply_bartlett_correction : bool, default=True Whether to apply the Bartlett correction to the change scores. References ---------- .. [1] Zamba, K. D., & Hawkins, D. M. (2009). A Multivariate Change-Point Model\ for Change in Mean Vector and/or Covariance Structure. \ Journal of Quality Technology, 41(3), 285-303. """ _tags = { "authors": ["johannvk"], "maintainers": "johannvk", "task": "change_score", "is_aggregated": True, } def __init__(self, apply_bartlett_correction: bool = True): super().__init__() self._cost = MultivariateGaussianCost() self.apply_bartlett_correction = apply_bartlett_correction @property def min_size(self) -> int: """Minimum size of the interval to evaluate.""" if self._is_fitted: return self._cost.min_size else: return None
[docs] def get_model_size(self, p: int) -> int: """Get the number of parameters to estimate over each interval. The primary use of this method is to determine an appropriate default penalty value in detectors. Parameters ---------- p : int Number of variables in the data. """ return p + p * (p + 1) // 2
def _fit(self, X: np.ndarray, y=None): """Fit the multivariate Gaussian change score evaluator. Parameters ---------- X : np.ndarray Data to evaluate. Must be a 2D array. y : None Ignored. Included for API consistency by convention. Returns ------- self : Reference to self. """ self._cost.fit(X) return self def _evaluate(self, cuts: np.ndarray): """Evaluate the change score for a split within an interval. Parameters ---------- cuts : np.ndarray A 2D array with three columns of integer locations. The first column is the ``start``, the second is the ``split``, and the third is the ``end`` of the interval to evaluate. The difference between subsets ``X[start:split]`` and ``X[split:end]`` is evaluated for each row in `cuts`. Returns ------- scores : np.ndarray A 2D array of change scores. One row for each cut. The number of columns is 1 if the change score is inherently multivariate. The number of columns is equal to the number of columns in the input data if the score is univariate. In this case, each column represents the univariate score for the corresponding input data column. """ start_intervals = cuts[:, [0, 1]] end_intervals = cuts[:, [1, 2]] total_intervals = cuts[:, [0, 2]] raw_scores = self._cost.evaluate(total_intervals) - ( self._cost.evaluate(start_intervals) + self._cost.evaluate(end_intervals) ) if self.apply_bartlett_correction: segment_lengths = cuts[:, 2] - cuts[:, 0] segment_splits = cuts[:, 1] - cuts[:, 0] bartlett_corrections = compute_bartlett_corrections( sequence_lengths=segment_lengths, cut_points=segment_splits, dimension=self._X.shape[1], ) return bartlett_corrections * raw_scores else: return raw_scores
[docs] @classmethod def get_test_params(cls, parameter_set="default"): """Return testing parameter settings for the estimator. Parameters ---------- parameter_set : str, default="default" Name of the set of test parameters to return, for use in tests. If no special parameters are defined for a value, will return `"default"` set. There are currently no reserved values for interval scorers. Returns ------- params : dict or list of dict, default = {} Parameters to create testing instances of the class Each dict are parameters to construct an "interesting" test instance, i.e., `MyClass(**params)` or `MyClass(**params[i])` creates a valid test instance. `create_test_instance` uses the first (or only) dictionary in `params` """ params = [ {"apply_bartlett_correction": False}, {"apply_bartlett_correction": True}, ] return params