Source code for skchange.costs._multivariate_gaussian_cost

"""Multivariate Gaussian likelihood cost."""

__author__ = ["johannvk", "Tveten"]
__all__ = ["MultivariateGaussianCost"]

import numpy as np

from ..utils.numba import njit, prange
from ..utils.numba.stats import log_det_covariance
from ._utils import CovType, MeanType, check_cov, check_mean
from .base import BaseCost


@njit
def _gaussian_ll_at_mle_params(
    X: np.ndarray,
    start: int,
    end: int,
) -> float:
    """Calculate the Gaussian log likelihood at the MLE parameters for a segment.

    Parameters
    ----------
    X : np.ndarray
        Data matrix. Rows are observations and columns are variables.
    start : int
        Start index of the segment (inclusive).
    end : int
        End index of the segment (exclusive).

    Returns
    -------
    log_likelihood : float
        The log likelihood of the observations in the
        interval ``[start, end)`` in the data matrix `X`,
        evaluated at the maximum likelihood parameter
        estimates for the mean and covariance matrix.
    """
    n = end - start
    p = X.shape[1]

    X_segment = X[start:end]
    log_det_cov = log_det_covariance(X_segment)

    if np.isnan(log_det_cov):
        raise RuntimeError(
            f"The covariance matrix of X[{start}:{end}] is not positive definite."
            + " Quick and dirty fix: Add a tiny amount of random noise to the data."
        )

    twice_log_likelihood = -n * p * np.log(2 * np.pi) - n * log_det_cov - p * n
    return twice_log_likelihood / 2.0


@njit
def gaussian_cost_mle_params(
    starts: np.ndarray, ends: np.ndarray, X: np.ndarray
) -> np.ndarray:
    """Calculate the Gaussian log likelihood at fixed parameters for a segment.

    Parameters
    ----------
    X : np.ndarray
        Data matrix. Rows are observations and columns are variables.
    start : int
        Start index of the segment (inclusive).
    end : int
        End index of the segment (exclusive).

    Returns
    -------
    costs : np.ndarray
        A 2D array of twice negated log likelihood costs. One row for each interval,
        and a single column.
    """
    num_starts = len(starts)
    costs = np.zeros(num_starts).reshape(-1, 1)
    for i in prange(num_starts):
        segment_log_likelihood = _gaussian_ll_at_mle_params(X, starts[i], ends[i])
        costs[i, 0] = -2.0 * segment_log_likelihood
    return costs


@njit
def _gaussian_ll_at_fixed_params(
    X: np.ndarray,
    start: int,
    end: int,
    mean: np.ndarray,
    log_det_cov: float,
    inv_cov: np.ndarray,
) -> float:
    """Calculate the Gaussian log likelihood at fixed parameters for a segment.

    Parameters
    ----------
    X : np.ndarray
        Data matrix. Rows are observations and columns are variables.
    start : int
        Start index of the segment (inclusive).
    end : int
        End index of the segment (exclusive).
    mean : np.ndarray
        Fixed mean for the cost calculation.
    log_det_cov : float
        Log determinant of the fixed covariance matrix.
    inv_cov : np.ndarray
        Inverse of the fixed covariance matrix.

    Returns
    -------
    log_likelihood : float
        The log likelihood of the observations in the
        interval ``[start, end)`` in the data matrix `X`,
        evaluated at the fixed mean and covariance matrix
        parameters provided.
    """
    n = end - start
    p = X.shape[1]

    X_segment = X[start:end]
    X_centered = X_segment - mean
    quadratic_form = np.sum(X_centered @ inv_cov * X_centered, axis=1)
    twice_log_likelihood = (
        -n * p * np.log(2 * np.pi) - n * log_det_cov - np.sum(quadratic_form)
    )
    return twice_log_likelihood / 2.0


@njit
def gaussian_cost_fixed_params(
    starts: np.ndarray,
    ends: np.ndarray,
    X: np.ndarray,
    mean: np.ndarray,
    log_det_cov: float,
    inv_cov: np.ndarray,
) -> np.ndarray:
    """Calculate the Gaussian log likelihood at fixed parameters for a segment.

    Parameters
    ----------
    mean : np.ndarray
        Fixed mean for the cost calculation.
    starts : np.ndarray
        Start indices of the segments.
    ends : np.ndarray
        End indices of the segments.
    X : np.ndarray
        Data matrix. Rows are observations and columns are variables.
    log_det_cov : float
        Log determinant of the fixed covariance matrix.
    inv_cov : np.ndarray
        Inverse of the fixed covariance matrix.

    Returns
    -------
    costs : np.ndarray
        A 2D array of twice negated log likelihood costs. One row for each interval,
        and a single column.
    """
    num_starts = len(starts)
    costs = np.zeros(num_starts).reshape(-1, 1)
    for i in prange(num_starts):
        segment_log_likelihood = _gaussian_ll_at_fixed_params(
            X, starts[i], ends[i], mean, log_det_cov, inv_cov
        )
        costs[i, 0] = -2.0 * segment_log_likelihood
    return costs


[docs] class MultivariateGaussianCost(BaseCost): """Multivariate Gaussian likelihood cost. Parameters ---------- param : 2-tuple of float or np.ndarray, or None (default=None) Fixed mean and covariance matrix for the cost calculation. If ``None``, the maximum likelihood estimates are used. """ _tags = { "authors": ["johannvk", "Tveten"], "maintainers": "johannvk", "supports_fixed_param": True, "is_aggregated": True, } def __init__(self, param: tuple[MeanType, CovType] | None = None): super().__init__(param) def _check_fixed_param( self, param: tuple[MeanType, CovType], X: np.ndarray ) -> np.ndarray: """Check if the fixed mean parameter is valid. Parameters ---------- param : 2-tuple of float or np.ndarray Fixed mean and covariance matrix for the cost calculation. X : np.ndarray Input data. Returns ------- mean : np.ndarray Fixed mean for the cost calculation. """ mean, cov = param mean = check_mean(mean, X) cov = check_cov(cov, X) return mean, cov @property def min_size(self) -> int | None: """Minimum size of the interval to evaluate. The size of each interval is defined as ``cuts[i, 1] - cuts[i, 0]``. """ if self.is_fitted: return self._X.shape[1] + 1 else: return None
[docs] def get_model_size(self, p: int) -> int: """Get the number of parameters in the cost function. Parameters ---------- p : int Number of variables in the data. """ return p + p * (p + 1) // 2
def _fit(self, X: np.ndarray, y=None): """Fit the cost. This method precomputes quantities that speed up the cost evaluation. Parameters ---------- X : np.ndarray Data to evaluate. Must be a 2D array. y: None Ignored. Included for API consistency by convention. """ self._param = self._check_param(self.param, X) if self.param is not None: self._mean, cov = self._param self._inv_cov = np.linalg.inv(cov) _, self._log_det_cov = np.linalg.slogdet(cov) return self def _evaluate_optim_param(self, starts: np.ndarray, ends: np.ndarray) -> np.ndarray: """Evaluate the cost for the optimal parameters. Parameters ---------- starts : np.ndarray Start indices of the intervals (inclusive). ends : np.ndarray End indices of the intervals (exclusive). Returns ------- costs : np.ndarray A 2D array of costs. One row for each interval. The number of columns is 1 since the MultivariateGaussianCost is inherently multivariate. """ return gaussian_cost_mle_params(starts, ends, self._X) def _evaluate_fixed_param(self, starts, ends): """Evaluate the cost for the fixed parameters. Parameters ---------- starts : np.ndarray Start indices of the intervals (inclusive). ends : np.ndarray End indices of the intervals (exclusive). Returns ------- costs : np.ndarray A 2D array of costs. One row for each interval. The number of columns is 1 since the MultivariateGaussianCost is inherently multivariate. """ return gaussian_cost_fixed_params( starts, ends, self._X, self._mean, self._log_det_cov, self._inv_cov )
[docs] @classmethod def get_test_params(cls, parameter_set="default"): """Return testing parameter settings for the estimator. Parameters ---------- parameter_set : str, default="default" Name of the set of test parameters to return, for use in tests. If no special parameters are defined for a value, will return ``"default"`` set. There are currently no reserved values for interval evaluators. Returns ------- params : dict or list of dict, default = {} Parameters to create testing instances of the class Each dict are parameters to construct an "interesting" test instance, i.e., `MyClass(**params)` or `MyClass(**params[i])` creates a valid test instance. `create_test_instance` uses the first (or only) dictionary in `params` """ params = [ {"param": None}, {"param": (0.0, 1.0)}, {"param": (np.zeros(1), np.eye(1))}, ] return params