"""Multivariate T distribution likelihood cost."""
__author__ = ["johannvk"]
__all__ = ["MultivariateTCost"]
import numpy as np
import pandas as pd
from ..utils.numba import njit, numba_available, prange
from ..utils.numba.stats import (
col_median,
digamma,
kurtosis,
log_gamma,
trigamma,
)
from ..utils.validation.parameters import check_in_interval, check_larger_than
from ._multivariate_gaussian_cost import (
gaussian_cost_fixed_params,
gaussian_cost_mle_params,
)
from ._utils import CovType, MeanType, check_cov, check_mean
from .base import BaseCost
@njit
def _estimate_scale_matrix_trace(
centered_sample_squared_norms: np.ndarray,
non_zero_norm_mask: np.ndarray,
sample_dimension: int,
dof: float,
loo_index=-1,
):
"""Estimate the trace of the MLE multivariate T covariance matrix.
Using an isotropic estimate of the multivariate T distribution,
the authors of [1]_ developed an estimate of the trace of the
maximum likelihood estimate scale matrix, using the squared
norm of centered samples.
Parameters
----------
centered_sample_squared_norms : np.ndarray
The squared norms of centered samples from a multivariate t-distribution.
non_zero_norm_mask : np.ndarray
A boolean mask indicating which squared norms are non-zero.
sample_dimension : int
The dimension of the samples.
dof : float
The degrees of freedom of the multivariate t-distribution.
loo_index : int, optional (default=-1)
The index of the leave-one-out sample. If -1, the full covariance matrix
is estimated.
References
----------
.. [1] Aeschliman, Chad, & Johnny Park, & Avinash C. Kak. (2009). A Novel
Parameter Estimation Algorithm for the Multivariate T-Distribution and Its
Application to Computer Vision. In Computer Vision - ECCV 2010, 594-607.
Berlin, Heidelberg: Springer
Returns
-------
float
The estimated trace of the MLE covariance matrix.
"""
if loo_index >= 0:
# Zero out the leave-one-out sample squared norm.
centered_sample_squared_norms[loo_index] = 0.0
non_zero_norm_mask[loo_index] = False
z_bar = np.log(centered_sample_squared_norms[non_zero_norm_mask]).mean()
log_alpha = (
z_bar - np.log(dof) + digamma(0.5 * dof) - digamma(sample_dimension / 2.0)
)
return sample_dimension * np.exp(log_alpha)
@njit
def _initial_scale_matrix_estimate(
centered_samples: np.ndarray,
dof: float,
loo_index: int = -1,
):
"""Estimate the scale matrix given centered samples and degrees of freedom.
The direction of the scale matrix is estimated from standardized,
centered samples, and the trace of the scale matrix is estimated
using the squared norms of the centered samples, as described in [1]_.
Parameters
----------
centered_samples : np.ndarray
Centered samples from the multivariate t-distribution.
dof : float
The degrees of freedom of the multivariate t-distribution.
loo_index : int, optional (default=-1)
The index of the leave-one-out sample. If -1, the full covariance matrix
is estimated.
References
----------
.. [1] Aeschliman, Chad, & Johnny Park, & Avinash C. Kak. (2009). A Novel
Parameter Estimation Algorithm for the Multivariate T-Distribution and Its
Application to Computer Vision. In Computer Vision - ECCV 2010, 594-607.
Berlin, Heidelberg: Springer
Returns
-------
np.ndarray
The initial estimate of the scale matrix
"""
num_samples, sample_dimension = centered_samples.shape
# Estimate direction of the scale matrix from standardized, centered samples:
centered_sample_squared_norms = np.sum(centered_samples * centered_samples, axis=1)
non_zero_norm_mask = centered_sample_squared_norms > 1.0e-6
centered_sample_weights = np.ones(num_samples)
centered_sample_weights[non_zero_norm_mask] *= (
1.0 / centered_sample_squared_norms[non_zero_norm_mask]
)
weighted_samples = centered_samples * centered_sample_weights[:, np.newaxis]
mle_scale_estimate = weighted_samples.T @ centered_samples
if loo_index >= 0:
# Subtract contribution from the leave-one-out sample:
loo_sample = centered_samples[loo_index, :].reshape(-1, 1)
mle_scale_estimate -= (
centered_sample_weights[loo_index] * loo_sample @ loo_sample.T
)
# Scale by the number of samples minus the leave-one-out sample:
mle_scale_estimate /= num_samples - 1
else:
mle_scale_estimate /= num_samples
scale_trace_estimate = _estimate_scale_matrix_trace(
centered_sample_squared_norms=centered_sample_squared_norms,
non_zero_norm_mask=non_zero_norm_mask,
sample_dimension=sample_dimension,
dof=dof,
loo_index=loo_index,
)
mle_scale_estimate *= scale_trace_estimate / np.trace(mle_scale_estimate)
return mle_scale_estimate
@njit
def _scale_matrix_fixed_point_iteration(
scale_matrix: np.ndarray,
dof: float,
centered_samples: np.ndarray,
loo_index: int = -1,
):
"""Compute a multivariate T MLE scale matrix fixed point iteration.
Parameters
----------
scale_matrix : np.ndarray
The current estimate of the scale matrix.
dof : float
The degrees of freedom of the multivariate t-distribution.
centered_samples : np.ndarray
The centered samples from the multivariate t-distribution.
loo_index : int, optional (default=-1)
The index of the leave-one-out sample. If -1, the full covariance matrix
is estimated.
Returns
-------
np.ndarray
The updated estimate of the scale matrix.
"""
num_samples, sample_dim = centered_samples.shape
inverse_scale_matrix = np.linalg.solve(scale_matrix, np.eye(sample_dim))
mahalanobis_squared_distances = np.sum(
(centered_samples @ inverse_scale_matrix) * centered_samples, axis=1
)
sample_weights = (sample_dim + dof) / (dof + mahalanobis_squared_distances)
weighted_samples = centered_samples * sample_weights[:, np.newaxis]
reconstructed_scale_matrix = weighted_samples.T @ centered_samples
if loo_index >= 0:
# Subtract the leave-one-out sample contribution:
loo_sample = centered_samples[loo_index, :].reshape(-1, 1)
reconstructed_scale_matrix -= (
sample_weights[loo_index] * loo_sample @ loo_sample.T
)
# Scale by the number of samples minus the leave-one-out sample:
reconstructed_scale_matrix /= num_samples - 1
else:
reconstructed_scale_matrix /= num_samples
return reconstructed_scale_matrix
@njit
def _solve_for_mle_scale_matrix(
initial_scale_matrix: np.ndarray,
centered_samples: np.ndarray,
dof: float,
max_iter: int,
abs_tol: float,
rel_tol: float,
loo_index: int = -1,
) -> np.ndarray:
"""Perform fixed point iterations to compute the MLE scale matrix.
Parameters
----------
initial_scale_matrix : np.ndarray
The initial estimate of the scale matrix.
centered_samples : np.ndarray
The centered samples from the multivariate t-distribution.
dof : float
The degrees of freedom of the multivariate t-distribution.
max_iter : int
The maximum number of iterations to perform.
abs_tol : float
The absolute tolerance for convergence.
rel_tol : float
The relative tolerance for convergence.
loo_index : int, optional (default=-1)
The index of the leave-one-out sample. If -1, the full covariance matrix
is estimated.
Returns
-------
np.ndarray
The MLE scale matrix of the multivariate t-distribution.
"""
scale_matrix = initial_scale_matrix.copy()
for iteration in range(1, max_iter + 1):
temp_cov_matrix = _scale_matrix_fixed_point_iteration(
scale_matrix=scale_matrix,
dof=dof,
centered_samples=centered_samples,
loo_index=loo_index,
)
# Note: 'ord = None' computes the Frobenius norm.
absolute_residual_norm = np.linalg.norm(
temp_cov_matrix - scale_matrix, ord=None
)
relative_residual_norm = absolute_residual_norm / max(
np.linalg.norm(scale_matrix, ord=None), 1.0e-12
)
scale_matrix[:, :] = temp_cov_matrix[:, :]
if absolute_residual_norm < abs_tol or relative_residual_norm < rel_tol:
break
if iteration == max_iter:
raise RuntimeError(
f"MultivariateTCost: Maximum number of iterations reached, ({max_iter}) "
"in MLE scale matrix estimation. Relax the tolerance "
"(mle_scale_abs_tol, mle_scale_rel_tol), "
"or increase the maximum number of iterations (max_iter).",
)
return scale_matrix
@njit
def maximum_likelihood_mv_t_scale_matrix(
centered_samples: np.ndarray,
dof: float,
abs_tol: float,
rel_tol: float,
max_iter: int,
loo_index: int = -1,
) -> np.ndarray:
"""Compute the MLE scale matrix for a multivariate t-distribution.
Parameters
----------
centered_samples : np.ndarray
The centered samples from the multivariate t-distribution.
dof : float
The degrees of freedom of the multivariate t-distribution.
abs_tol : float
The absolute tolerance for convergence.
rel_tol : float
The relative tolerance for convergence.
max_iter : int
The maximum number of iterations to perform.
loo_index : int, optional (default=-1)
The index of the leave-one-out sample. If -1, the full covariance matrix
is estimated.
Returns
-------
np.ndarray
The MLE covariance matrix of the multivariate t-distribution.
"""
# Initialize the scale matrix maximum likelihood estimate:
initial_mle_scale_matrix = _initial_scale_matrix_estimate(
centered_samples,
dof,
loo_index=loo_index,
)
mle_scale_matrix = _solve_for_mle_scale_matrix(
initial_scale_matrix=initial_mle_scale_matrix,
centered_samples=centered_samples,
dof=dof,
loo_index=loo_index,
max_iter=max_iter,
abs_tol=abs_tol,
rel_tol=rel_tol,
)
return mle_scale_matrix
@njit
def _multivariate_t_log_likelihood(
scale_matrix: np.ndarray,
centered_samples: np.ndarray,
dof: float,
) -> float:
"""Calculate the log likelihood of a multivariate t-distribution.
Implemented from the definition of the multivariate t-distribution.
Inspired by the scipy implementation of the multivariate t-distribution,
but simplified.
Source: https://en.wikipedia.org/wiki/Multivariate_t-distribution
Parameters
----------
scale_matrix : np.ndarray
The scale matrix of the multivariate t-distribution.
centered_samples : np.ndarray
The centered samples from the multivariate t-distribution.
dof : float
The degrees of freedom of the multivariate t-distribution.
Returns
-------
float
The log likelihood of the multivariate t-distribution.
"""
sample_dim = centered_samples.shape[1]
sign_det, log_det_scale_matrix = np.linalg.slogdet(scale_matrix)
if sign_det <= 0:
return np.nan
inverse_scale_matrix = np.linalg.solve(scale_matrix, np.eye(sample_dim))
total_log_likelihood = _fixed_param_multivariate_t_log_likelihood(
centered_samples=centered_samples,
dof=dof,
inverse_scale_matrix=inverse_scale_matrix,
log_det_scale_matrix=log_det_scale_matrix,
)
return total_log_likelihood
@njit
def _fixed_param_multivariate_t_log_likelihood(
centered_samples: np.ndarray,
dof: float,
inverse_scale_matrix: np.ndarray,
log_det_scale_matrix: float,
) -> float:
"""Calculate the log likelihood of a multivariate t-distribution.
Directly from the definition of the multivariate t-distribution.
Implementation inspired by the scipy implementation of
the multivariate t-distribution, but simplified.
Source: https://en.wikipedia.org/wiki/Multivariate_t-distribution
Parameters
----------
centered_samples : np.ndarray
The centered samples from the multivariate t-distribution.
dof : float
The degrees of freedom of the multivariate t-distribution.
inverse_scale_matrix : np.ndarray
The inverse of the scale matrix of the multivariate t-distribution.
log_det_scale_matrix : float
The log determinant of the scale matrix of the multivariate t-distribution.
Returns
-------
float
The log likelihood of the multivariate t-distribution.
"""
num_samples, sample_dim = centered_samples.shape
mahalanobis_squared_distances = np.sum(
(centered_samples @ inverse_scale_matrix) * centered_samples, axis=1
)
# Normalization constants:
exponent = 0.5 * (dof + sample_dim)
A = log_gamma(exponent)
B = log_gamma(0.5 * dof)
C = 0.5 * sample_dim * np.log(dof * np.pi)
D = 0.5 * log_det_scale_matrix
normalization_contribution = num_samples * (A - B - C - D)
sample_contributions = -exponent * np.log1p(mahalanobis_squared_distances / dof)
total_log_likelihood = normalization_contribution + sample_contributions.sum()
return total_log_likelihood
@njit
def _mv_t_ll_at_mle_params(
X: np.ndarray,
start: int,
end: int,
dof: float,
mle_scale_abs_tol: float,
mle_scale_rel_tol: float,
mle_scale_max_iter: int,
) -> float:
"""Calculate multivariate T log likelihood at the MLE parameters for a segment.
Parameters
----------
X : np.ndarray
Data matrix. Rows are observations and columns are variables.
start : int
Start index of the segment (inclusive).
end : int
End index of the segment (exclusive).
dof : float
The degrees of freedom of the multivariate t-distribution.
mle_scale_abs_tol : float
The absolute tolerance for convergence in the MLE scale matrix estimation.
mle_scale_rel_tol : float
The relative tolerance for convergence in the MLE scale matrix estimation.
mle_scale_max_iter : int
The maximum number of iterations to perform for the MLE scale matrix estimation.
Returns
-------
float
The log likelihood of the observations in the
interval ``[start, end)`` in the data matrix `X`,
evaluated at the maximum likelihood parameter
estimates for the mean and scale matrix, given
the provided degrees of freedom.
"""
X_segment = X[start:end]
# Estimate the mean of each dimension through the sample medians:
sample_medians = col_median(X_segment)
X_centered = X_segment - sample_medians
mle_scale_matrix = maximum_likelihood_mv_t_scale_matrix(
X_centered,
dof,
abs_tol=mle_scale_abs_tol,
rel_tol=mle_scale_rel_tol,
max_iter=mle_scale_max_iter,
)
total_log_likelihood = _multivariate_t_log_likelihood(
scale_matrix=mle_scale_matrix, centered_samples=X_centered, dof=dof
)
return total_log_likelihood
@njit(parallel=True)
def multivariate_t_cost_mle_params(
starts: np.ndarray,
ends: np.ndarray,
X: np.ndarray,
dof: float,
mle_scale_abs_tol: float,
mle_scale_rel_tol: float,
mle_scale_max_iter: int,
) -> np.ndarray:
"""Calculate the multivariate T twice negative log likelihood cost.
At the maximum likelihood estimated mean and scale matrix values.
Parameters
----------
starts : np.ndarray
The start indices of the segments.
ends : np.ndarray
The end indices of the segments.
X : np.ndarray
The data matrix. Rows are observations and columns are variables.
dof : float
The degrees of freedom for the cost calculation.
mle_scale_abs_tol : float
The absolute tolerance for convergence in the MLE scale matrix estimation.
mle_scale_rel_tol : float
The relative tolerance for convergence in the MLE scale matrix estimation.
mle_scale_max_iter : int
The maximum number of iterations to perform for the MLE scale matrix estimation.
Returns
-------
np.ndarray
The twice negative log likelihood costs for each segment.
"""
num_starts = len(starts)
costs = np.zeros((num_starts, 1))
for i in prange(num_starts):
segment_log_likelihood = _mv_t_ll_at_mle_params(
X,
starts[i],
ends[i],
dof=dof,
mle_scale_abs_tol=mle_scale_abs_tol,
mle_scale_rel_tol=mle_scale_rel_tol,
mle_scale_max_iter=mle_scale_max_iter,
)
costs[i, 0] = -2.0 * segment_log_likelihood
return costs
@njit
def _mv_t_ll_at_fixed_params(
X: np.ndarray,
start: int,
end: int,
mean: np.ndarray,
inverse_scale_matrix: np.ndarray,
log_det_scale_matrix: float,
dof: float,
) -> float:
"""Calculate multivariate T log likelihood at the fixed parameters for a segment.
Parameters
----------
X : np.ndarray
Data matrix. Rows are observations and columns are variables.
start : int
Start index of the segment (inclusive).
end : int
End index of the segment (exclusive).
mean : np.ndarray
The mean of the multivariate t-distribution.
inverse_scale_matrix : np.ndarray
The inverse of the scale matrix of the multivariate t-distribution.
log_det_scale_matrix : float
The log determinant of the scale matrix of the multivariate t-distribution.
dof : float
The degrees of freedom of the multivariate t-distribution.
Returns
-------
float
The log likelihood of the observations in the
interval ``[start, end)`` in the data matrix `X`,
evaluated at the fixed parameter values for the
mean and scale matrix, given the provided degrees of freedom.
"""
X_centered = X[start:end] - mean
# Compute the log likelihood of the segment:
total_log_likelihood = _fixed_param_multivariate_t_log_likelihood(
inverse_scale_matrix=inverse_scale_matrix,
log_det_scale_matrix=log_det_scale_matrix,
centered_samples=X_centered,
dof=dof,
)
return total_log_likelihood
@njit(parallel=True)
def multivariate_t_cost_fixed_params(
starts: np.ndarray,
ends: np.ndarray,
X: np.ndarray,
mean: np.ndarray,
inverse_scale_matrix: np.ndarray,
log_det_scale_matrix: float,
dof: float,
) -> np.ndarray:
"""Calculate the multivariate T twice negative log likelihood cost.
At fixed parameter values.
Parameters
----------
starts : np.ndarray
The start indices of the segments.
ends : np.ndarray
The end indices of the segments.
X : np.ndarray
The data matrix.
mean : np.ndarray
The fixed mean for the cost calculation.
inverse_scale_matrix : np.ndarray
The fixed inverse scale matrix for the cost calculation.
log_det_scale_matrix : float
The log determinant of the scale matrix of the multivariate t-distribution.
dof : float
The fixed degrees of freedom for the cost calculation.
Returns
-------
np.ndarray
The twice negative log likelihood costs for each segment.
"""
num_starts = len(starts)
costs = np.zeros((num_starts, 1))
for i in prange(num_starts):
segment_log_likelihood = _mv_t_ll_at_fixed_params(
X,
starts[i],
ends[i],
mean=mean,
inverse_scale_matrix=inverse_scale_matrix,
log_det_scale_matrix=log_det_scale_matrix,
dof=dof,
)
costs[i, 0] = -2.0 * segment_log_likelihood
return costs
@njit
def _isotropic_mv_t_dof_estimate(
centered_samples: np.ndarray, infinite_dof_threshold, zero_norm_tol=1.0e-6
) -> float:
"""Estimate the degrees of freedom of a multivariate t-distribution.
From: A Novel Parameter Estimation Algorithm for the Multivariate
t-Distribution and Its Application to Computer Vision.
Parameters
----------
centered_samples : np.ndarray
The centered samples from the multivariate t-distribution.
infinite_dof_threshold : float
The threshold at which the degrees of freedom are considered infinite.
If the degrees of freedom are above this threshold,
the multivariate t-distribution is approximated with
a multivariate Gaussian distribution.
zero_norm_tol : float, optional (default=1.0e-6)
The tolerance for considering a squared norm as zero.
Returns
-------
float
The estimated degrees of freedom of the multivariate t-distribution.
"""
sample_dim = centered_samples.shape[1]
squared_norms = np.sum(centered_samples * centered_samples, axis=1)
log_norm_sq_var = np.log(squared_norms[squared_norms > zero_norm_tol**2]).var()
b = log_norm_sq_var - trigamma(sample_dim / 2.0)
inf_dof_b_threshold = (2 * infinite_dof_threshold + 4) / (infinite_dof_threshold**2)
if b < inf_dof_b_threshold:
# The dof estimate formula would exceed the infinite dof threshold,
# (or break down due to a negative value), so we return infinity.
return np.inf
dof_estimate = (1 + np.sqrt(1 + 4 * b)) / b
return dof_estimate
@njit
def _kurtosis_mv_t_dof_estimate(
centered_samples: np.ndarray, infinite_dof_threshold: float
) -> float:
"""Estimate the degrees of freedom of a multivariate t-distribution.
Parameters
----------
centered_samples : np.ndarray
The centered samples from the multivariate t-distribution.
infinite_dof_threshold : float
The threshold at which the degrees of freedom are considered infinite.
If the degrees of freedom are above this threshold,
the multivariate t-distribution is approximated with
a multivariate Gaussian distribution.
Returns
-------
float
The estimated degrees of freedom of the multivariate t-distribution.
"""
sample_ellipitical_kurtosis = kurtosis(centered_samples).mean() / 3.0
inf_dof_kurtosis_threshold = 2.0 / (infinite_dof_threshold - 4.0)
if sample_ellipitical_kurtosis < inf_dof_kurtosis_threshold:
# The elliptical kurtosis estimate is below the threshold
# which would lead to a degrees of freedom estimate above the
# infinite degrees of freedom threshold. We return infinity.
return np.inf
dof_estimate = (2.0 / sample_ellipitical_kurtosis) + 4.0
return dof_estimate
@njit
def _iterative_mv_t_dof_estimate(
centered_samples: np.ndarray,
initial_dof: float,
infinite_dof_threshold: float,
mle_scale_abs_tol: float,
mle_scale_rel_tol: float,
mle_scale_max_iter: int,
dof_abs_tol=1.0e-1,
dof_rel_tol=5.0e-2,
dof_max_iter=10,
) -> float:
"""Estimate dof. for a multivariate T distribution, iteratively.
Using algorithm 1 from [1]_, we iteratively estimate the degrees of freedom
parameter of a multivariate T distribution. The algorithm iteratively
computes the MLE scale matrix with the current degrees of freedom estimate,
and then updates the degrees of freedom estimate based on the relative
trace of the sample covariance matrix and the MLE scale matrix.
Parameters
----------
centered_samples : np.ndarray
The centered samples from the multivariate t-distribution.
initial_dof : float
The initial degrees of freedom estimate.
infinite_dof_threshold : float
The threshold at which the degrees of freedom are considered infinite.
If the degrees of freedom are above this threshold,
the multivariate t-distribution is approximated with
a multivariate Gaussian distribution.
mle_scale_abs_tol : float
The absolute tolerance for convergence in the MLE scale matrix estimation.
mle_scale_rel_tol : float
The relative tolerance for convergence in the MLE scale matrix estimation.
mle_scale_max_iter : int
The maximum number of iterations to perform for the MLE scale matrix estimation.
dof_abs_tol : float, optional (default=1.0e-1)
The absolute tolerance for convergence.
dof_rel_tol : float, optional (default=5.0e-2)
The relative tolerance for convergence.
dof_max_iter : int, optional (default=10)
The maximum number of iterations to perform.
Returns
-------
float
The estimated degrees of freedom of the multivariate t-distribution.
.. [1] Ollila, Esa, & Daniel P. Palomar, & Frédéric Pascal. (2020). Shrinking the
Eigenvalues of M-Estimators of Covariance Matrix. IEEE Transactions on Signal
Processing, 256-269.
"""
n = centered_samples.shape[0]
if initial_dof > infinite_dof_threshold:
return np.inf
inf_dof_nu_threshold = infinite_dof_threshold / (infinite_dof_threshold - 2.0)
sample_covariance = (centered_samples.T @ centered_samples) / n
mle_scale_matrix = maximum_likelihood_mv_t_scale_matrix(
centered_samples,
initial_dof,
max_iter=mle_scale_max_iter,
abs_tol=mle_scale_abs_tol,
rel_tol=mle_scale_rel_tol,
)
dof = initial_dof
for _ in range(dof_max_iter):
nu_i = np.trace(sample_covariance) / np.trace(mle_scale_matrix)
if nu_i < inf_dof_nu_threshold:
# The estimated degrees of freedom are high enough to approximate the
# multivariate T distribution with a Gaussian.
dof = np.inf
break
old_dof = dof
dof = 2 * nu_i / max((nu_i - 1), 1.0e-3)
mle_scale_matrix = _solve_for_mle_scale_matrix(
initial_scale_matrix=mle_scale_matrix,
centered_samples=centered_samples,
dof=dof,
abs_tol=mle_scale_abs_tol,
rel_tol=mle_scale_rel_tol,
max_iter=mle_scale_max_iter,
)
absolute_dof_diff = np.abs(dof - old_dof)
rel_tol_satisfied = absolute_dof_diff / old_dof < dof_rel_tol
abs_tol_satisfied = absolute_dof_diff < dof_abs_tol
if rel_tol_satisfied or abs_tol_satisfied:
break
return dof
@njit(parallel=True)
def _loo_iterative_mv_t_dof_estimate(
centered_samples: np.ndarray,
initial_dof: float,
infinite_dof_threshold: float,
mle_scale_abs_tol: float,
mle_scale_rel_tol: float,
mle_scale_max_iter: int,
dof_abs_tol=1.0e-1,
dof_rel_tol=5.0e-2,
dof_max_iter=5,
) -> float:
"""Leave-one-out iterative dof. estimate for a multivariate T distribution.
Using an improved estimator, based on the algorithm in:
'Improved estimation of the degree of freedom parameter of mv t-distribution''.
However, the algorithm computes one MLE scale matrix estimate per samples,
holding out one sample at a time, which increases computation cost.
Parameters
----------
centered_samples : np.ndarray
The centered samples from the multivariate t-distribution.
initial_dof : float
The initial degrees of freedom estimate.
infinite_dof_threshold : float
The threshold at which the degrees of freedom are considered infinite.
If the degrees of freedom are above this threshold,
the multivariate t-distribution is approximated with
a multivariate Gaussian distribution.
mle_scale_abs_tol : float
The absolute tolerance for convergence in the MLE scale matrix estimation.
mle_scale_rel_tol : float
The relative tolerance for convergence in the MLE scale matrix estimation.
mle_scale_max_iter : int
The maximum number of iterations to perform for the MLE scale matrix estimation.
dof_abs_tol : float, optional (default=1.0e-1)
The absolute tolerance for convergence.
dof_rel_tol : float, optional (default=5.0e-2)
The relative tolerance for convergence.
dof_max_iter : int, optional (default=5)
The maximum number of iterations to perform.
Returns
-------
float
The estimated degrees of freedom of the multivariate T distribution.
"""
if initial_dof > infinite_dof_threshold:
return np.inf
num_samples, sample_dimension = centered_samples.shape
inf_dof_theta_threshold = infinite_dof_threshold / (infinite_dof_threshold - 2.0)
sample_covariance = (centered_samples.T @ centered_samples) / num_samples
grand_mle_scale_matrix = maximum_likelihood_mv_t_scale_matrix(
centered_samples,
dof=initial_dof,
max_iter=mle_scale_max_iter,
abs_tol=mle_scale_abs_tol,
rel_tol=mle_scale_rel_tol,
)
contraction_estimate = np.trace(grand_mle_scale_matrix) / np.trace(
sample_covariance
)
current_dof = initial_dof
for _ in range(dof_max_iter):
total_loo_mahalanobis_squared_distance = 0.0
for sample in prange(num_samples):
# Extract the leave-one-out sample as a column vector:
loo_sample = centered_samples[sample, :].reshape(-1, 1)
loo_sample_outer_product = loo_sample @ loo_sample.T
# Initial estimate of the leave-one-out covariance matrix,
# subtracting the contracted contribution of the leave-one-out sample:
loo_scale_estimate = grand_mle_scale_matrix - contraction_estimate * (
loo_sample_outer_product / num_samples
)
loo_mle_scale_matrix = _solve_for_mle_scale_matrix(
initial_scale_matrix=loo_scale_estimate,
centered_samples=centered_samples,
dof=current_dof,
loo_index=sample,
abs_tol=mle_scale_abs_tol,
rel_tol=mle_scale_rel_tol,
max_iter=mle_scale_max_iter,
)
loo_mahalanobis_squared_distance = (
loo_sample.T @ np.linalg.solve(loo_mle_scale_matrix, loo_sample)
)[0, 0]
total_loo_mahalanobis_squared_distance += loo_mahalanobis_squared_distance
theta_k = (1 - sample_dimension / num_samples) * (
(total_loo_mahalanobis_squared_distance / num_samples) / sample_dimension
)
if theta_k < inf_dof_theta_threshold:
# The estimated degrees of freedom are high enough to approximate the
# multivariate T distribution with a Gaussian distribution.
current_dof = np.inf
break
new_dof = 2 * theta_k / (theta_k - 1)
abs_dof_difference = np.abs(new_dof - current_dof)
abs_tol_satisfied = abs_dof_difference < dof_abs_tol
rel_tol_satisfied = (abs_dof_difference / current_dof) < dof_rel_tol
current_dof = new_dof
if abs_tol_satisfied or rel_tol_satisfied:
break
return current_dof
@njit
def _estimate_mv_t_dof(
X: np.ndarray,
infinite_dof_threshold: float,
refine_dof_threshold: int,
mle_scale_abs_tol: float,
mle_scale_rel_tol: float,
mle_scale_max_iter: int,
) -> float:
"""
Estimate the degrees of freedom of a multivariate t-distribution.
Parameters
----------
X : np.ndarray
The data matrix, where rows are observations and columns are variables.
infinite_dof_threshold : float
The threshold at which the degrees of freedom are considered infinite.
If the degrees of freedom are above this threshold,
the multivariate t-distribution is approximated with
a multivariate Gaussian distribution.
refine_dof_threshold : int
The number of samples below which the degrees of freedom
estimate is refined using a leave-one-out iterative method.
mle_scale_abs_tol : float
The absolute tolerance for convergence in the MLE scale matrix estimation.
mle_scale_rel_tol : float
The relative tolerance for convergence in the MLE scale matrix estimation.
mle_scale_max_iter : int
The maximum number of iterations to perform for the MLE scale matrix estimation.
Returns
-------
float
The estimated degrees of freedom of the multivariate t-distribution.
"""
centered_samples = X - col_median(X)
isotropic_dof = _isotropic_mv_t_dof_estimate(
centered_samples, infinite_dof_threshold=infinite_dof_threshold
)
kurtosis_dof = _kurtosis_mv_t_dof_estimate(
centered_samples, infinite_dof_threshold=infinite_dof_threshold
)
if np.isfinite(isotropic_dof) and np.isfinite(kurtosis_dof):
# Initialize the iterative dof estimation method with the
# geometric mean of the isotropic and kurtosis estimates:
initial_dof_estimate = np.sqrt(isotropic_dof * kurtosis_dof)
elif np.isfinite(isotropic_dof):
initial_dof_estimate = isotropic_dof
elif np.isfinite(kurtosis_dof):
initial_dof_estimate = kurtosis_dof
else:
# Both initial estimates are infinite, start the
# iterative method with a reasonably high initial dof:
initial_dof_estimate = infinite_dof_threshold / 2.0
dof_estimate = _iterative_mv_t_dof_estimate(
centered_samples=centered_samples,
initial_dof=initial_dof_estimate,
infinite_dof_threshold=infinite_dof_threshold,
mle_scale_abs_tol=mle_scale_abs_tol,
mle_scale_rel_tol=mle_scale_rel_tol,
mle_scale_max_iter=mle_scale_max_iter,
)
num_samples = X.shape[0]
if num_samples <= refine_dof_threshold:
dof_estimate = _loo_iterative_mv_t_dof_estimate(
centered_samples=centered_samples,
initial_dof=dof_estimate,
infinite_dof_threshold=infinite_dof_threshold,
mle_scale_abs_tol=mle_scale_abs_tol,
mle_scale_rel_tol=mle_scale_rel_tol,
mle_scale_max_iter=mle_scale_max_iter,
)
return dof_estimate
[docs]
class MultivariateTCost(BaseCost):
"""Multivariate T twice negative log likelihood cost.
The multivariate T-distribution is a generalization of the multivariate
Gaussian distribution, allowing for heavier tails. The degrees of freedom
parameter controls the tail heaviness, with higher values leading to
distributions that are closer to the multivariate Gaussian distribution.
With Numba installed the runtime of this cost is is between 5 and 10 times
slower than the Gaussian likelihood cost, but more robust to outliers.
The cost is calculated as the twice negative log likelihood of the
multivariate T-distribution, given the data, mean, scale matrix, and
degrees of freedom. The degrees of freedom can be fixed or estimated
from the data. When estimating the degrees of freedom, we use several
methods. Initially we compute the geometric mean of an isotropic dof.
estimate from [1]_ and a kurtosis-based estimate from [2]_. This initial
estimate is then fed into an iterative dof. estimate from [2]_. If the
number of samples is below a given threshold (`refine_dof_threshold`),
we refine the dof. estimate using a leave-one-out iterative method from [3]_.
If the degrees of freedom are above a given threshold (`infinite_dof_threshold`),
the multivariate T-distribution is approximated with a multivariate Gaussian
distribution. The threshold is set to 50 by default, but can be adjusted
with the `infinite_dof_threshold` parameter.
As there is no analytical formula for the maximum likelihood estimate (MLE) of the
scale matrix of the multivariate T-distribution, we use fixed point iterations
to compute the MLE scale matrix within each segment. The tolerance parameters
`mle_scale_abs_tol` and `mle_scale_rel_tol` for the MLE scale matrix estimation can
be adjusted to control the convergence of the fixed point iterations, and when
either of the tolerances are achieved, the fixed point iterations stop.
The absolute tolerance (`mle_scale_abs_tol`) is achieved when the absolute change
in the norm of the scale matrix changed less than `mle_scale_abs_tol` after an
iteration, and the relative tolerance (`mle_scale_rel_tol`) is achieved when the
relative change in the norm of the scale matrix is less than `mle_scale_rel_tol`
after an iteration, relative to the pre-iteration norm.
The maximum number of iterations (`mle_scale_max_iter`) is used to safeguard against
non-convergence, and will raise a RuntimeError if the maximum number of iterations
is reached. Reaching the maximum number of iterations is a sign that the tolerance
parameters are too strict, and the fixed point iterations are not converging.
In this case, the tolerance parameters can be relaxed, or the maximum number of
iterations can be increased.
Parameters
----------
param : 2-tuple of float or np.ndarray, or None (default=None)
Fixed mean and scale matrix for the cost calculation.
If ``None``, the maximum likelihood estimates are used.
fixed_dof : float, optional (default=None)
Fixed degrees of freedom for the cost calculation.
If None, the degrees of freedom are estimated from the data.
refine_dof_threshold : int, optional
(default=1000 with Numba installed, 100 without)
The number of samples below which the degrees of freedom
estimate is refined using a leave-one-out iterative method.
infinite_dof_threshold : float, optional (default=50.0)
The threshold at which the degrees of freedom are considered infinite.
If the degrees of freedom are above this threshold,
the multivariate t-distribution is approximated with
a multivariate Gaussian distribution.
mle_scale_abs_tol : float, optional (default=1.0e-2)
The absolute tolerance for convergence in the MLE scale matrix estimation.
mle_scale_rel_tol : float, optional (default=1.0e-2)
The relative tolerance for convergence in the MLE scale matrix estimation.
mle_scale_max_iter : int, optional (default=100)
The maximum number of iterations to perform for the MLE scale matrix estimation.
Will raise a RuntimeError if the maximum number of iterations is reached.
References
----------
.. [1] Aeschliman, Chad, & Johnny Park, & Avinash C. Kak. (2009). A Novel
Parameter Estimation Algorithm for the Multivariate T-Distribution and Its
Application to Computer Vision. In Computer Vision - ECCV 2010, 594-607.
Berlin, Heidelberg: Springer
.. [2] Ollila, Esa, & Palomar, Daniel P. & Pascal, Frédéric. (2020). Shrinking the
Eigenvalues of M-Estimators of Covariance Matrix. IEEE Transactions on Signal
Processing, 256-269.
.. [3] Pascal, Frédéric & Ollila, Esa & Palomar, Daniel P. (2021) Improved
Estimation of the Degree of Freedom Parameter of Multivariate T-Distribution.
In 2021 29th European Signal Processing Conference (EUSIPCO), 860-864.
"""
_tags = {
"authors": ["johannvk"],
"maintainers": "johannvk",
"supports_fixed_param": True,
"is_aggregated": True,
}
def __init__(
self,
param: tuple[MeanType, CovType] | None = None,
fixed_dof=None,
refine_dof_threshold=None,
infinite_dof_threshold=5.0e1,
mle_scale_abs_tol=1.0e-2,
mle_scale_rel_tol=1.0e-2,
mle_scale_max_iter=100,
):
super().__init__(param)
# Provided fixed degrees of freedom:
self.fixed_dof = fixed_dof
self.infinite_dof_threshold = infinite_dof_threshold
self.refine_dof_threshold = refine_dof_threshold
# Tolerance parameters for the MLE scale matrix estimation:
self.mle_scale_abs_tol = mle_scale_abs_tol
self.mle_scale_rel_tol = mle_scale_rel_tol
self.mle_scale_max_iter = mle_scale_max_iter
check_in_interval(
interval=pd.Interval(0, np.inf),
value=self.fixed_dof,
name="fixed_dof",
allow_none=True,
)
check_larger_than(
min_value=0.0,
value=self.infinite_dof_threshold,
name="infinite_dof_threshold",
)
check_larger_than(
min_value=0,
value=self.refine_dof_threshold,
name="refine_dof_threshold",
allow_none=True,
)
check_in_interval(
interval=pd.Interval(0, np.inf, closed="left"),
value=self.mle_scale_abs_tol,
name="mle_scale_abs_tol",
)
check_in_interval(
interval=pd.Interval(0, np.inf, closed="left"),
value=self.mle_scale_rel_tol,
name="mle_scale_rel_tol",
)
check_larger_than(
min_value=0, value=self.mle_scale_max_iter, name="mle_scale_max_iter"
)
def _check_fixed_param(
self, param: tuple[MeanType, CovType], X: np.ndarray
) -> np.ndarray:
"""Check if the fixed mean parameter is valid.
The covariance matrix is checked for positive definiteness,
and forced to a floating point representation for numba compatibility.
Parameters
----------
param : 2-tuple of float or np.ndarray
Fixed mean and covariance matrix for the cost calculation.
Both are converted to float values or float arrays.
X : np.ndarray
Input data.
Returns
-------
mean : np.ndarray
Fixed mean for the cost calculation.
"""
mean, cov = param
mean = check_mean(mean, X)
# Require floating point representation of
# the covariance matrix for numba compatibility:
cov = check_cov(cov, X, force_float=True)
return mean, cov
@property
def min_size(self) -> int | None:
"""Minimum size of the interval to evaluate.
The size of each interval is defined as ``cuts[i, 1] - cuts[i, 0]``.
"""
if self.is_fitted:
return self._X.shape[1] + 1
else:
return None
[docs]
def get_model_size(self, p: int) -> int:
"""Get the number of parameters in the cost function.
Parameters
----------
p : int
Number of variables in the data.
"""
return 1 + p + p * (p + 1) // 2
def _fit(self, X: np.ndarray, y=None):
"""Fit the cost.
This method checks fixed distribution parameters, if provided, and
precomputes quantities that are used in the cost evaluation.
Additionally, the degrees of freedom for the multivariate T data generating
distribution it estimated, if the degrees of freedom parameters was not set
during the construction of the MultivariateTCost object.
Parameters
----------
X : np.ndarray
Data to evaluate. Must be a 2D array.
y: None
Ignored. Included for API consistency by convention.
"""
self._param = self._check_param(self.param, X)
if self.param is not None:
self._mean, scale_matrix = self._param
self._inv_scale_matrix = np.linalg.inv(scale_matrix)
_, self._log_det_scale_matrix = np.linalg.slogdet(scale_matrix)
if self.fixed_dof is None:
if self.refine_dof_threshold is None:
if numba_available:
self.refine_dof_threshold = int(1.0e3)
else:
self.refine_dof_threshold = 100
self.dof_ = _estimate_mv_t_dof(
X,
infinite_dof_threshold=self.infinite_dof_threshold,
refine_dof_threshold=self.refine_dof_threshold,
mle_scale_abs_tol=self.mle_scale_abs_tol,
mle_scale_rel_tol=self.mle_scale_rel_tol,
mle_scale_max_iter=self.mle_scale_max_iter,
)
else:
self.dof_ = self.fixed_dof
return self
def _evaluate_optim_param(self, starts: np.ndarray, ends: np.ndarray) -> np.ndarray:
"""Evaluate the cost for the MLE parameters.
Parameters
----------
starts : np.ndarray
Start indices of the intervals (inclusive).
ends : np.ndarray
End indices of the intervals (exclusive).
Returns
-------
costs : np.ndarray
A 2D array of costs. One row for each interval. The number of columns
is 1 since the MultivariateTCost is inherently multivariate.
"""
if np.isposinf(self.dof_):
return gaussian_cost_mle_params(starts, ends, X=self._X)
else:
return multivariate_t_cost_mle_params(
starts,
ends,
X=self._X,
dof=self.dof_,
mle_scale_abs_tol=self.mle_scale_abs_tol,
mle_scale_rel_tol=self.mle_scale_rel_tol,
mle_scale_max_iter=self.mle_scale_max_iter,
)
def _evaluate_fixed_param(self, starts, ends):
"""Evaluate the cost for the fixed parameters.
Parameters
----------
starts : np.ndarray
Start indices of the intervals (inclusive).
ends : np.ndarray
End indices of the intervals (exclusive).
Returns
-------
costs : np.ndarray
A 2D array of costs. One row for each interval. The number of columns
is 1 since the MultivariateGaussianCost is inherently multivariate.
"""
if np.isposinf(self.dof_):
return gaussian_cost_fixed_params(
starts,
ends,
self._X,
mean=self._mean,
inv_cov=self._inv_scale_matrix,
log_det_cov=self._log_det_scale_matrix,
)
else:
return multivariate_t_cost_fixed_params(
starts,
ends,
self._X,
mean=self._mean,
inverse_scale_matrix=self._inv_scale_matrix,
log_det_scale_matrix=self._log_det_scale_matrix,
dof=self.dof_,
)
[docs]
@classmethod
def get_test_params(cls, parameter_set="default"):
"""Return testing parameter settings for the estimator.
Parameters
----------
parameter_set : str, default="default"
Name of the set of test parameters to return, for use in tests. If no
special parameters are defined for a value, will return ``"default"`` set.
There are currently no reserved values for interval evaluators.
Returns
-------
params : dict or list of dict, default = {}
Parameters to create testing instances of the class
Each dict are parameters to construct an "interesting" test instance, i.e.,
`MyClass(**params)` or `MyClass(**params[i])` creates a valid test instance.
`create_test_instance` uses the first (or only) dictionary in `params`
"""
params = [
{"param": None},
{"param": (0.0, 1.0)},
{"param": (np.zeros(1), np.eye(1))},
]
return params