"""Linear Regression cost function.
This module contains the LinearRegressionCost class, which is a cost function for
change point detection based on linear regression. The cost is the sum of squared
residuals from fitting a linear regression model within each segment.
"""
import numpy as np
from ..utils.numba import njit
from ..utils.validation.parameters import check_data_column
from .base import BaseCost
@njit
def linear_regression_cost(X: np.ndarray, y: np.ndarray) -> float:
"""Compute the cost for a linear regression model.
Parameters
----------
X : np.ndarray
Features.
y : np.ndarray
Target values.
Returns
-------
cost : float
Sum of squared residuals from the linear regression.
"""
# Returns: (coeffs, residuals, X_rank, X_singular_values)
coeffs, residuals, X_rank, _ = np.linalg.lstsq(X, y)
# If rank(X) < X.shape[1], or X.shape[0] <= X.shape[1],
# "residuals" is an empty array.
if X_rank < X.shape[1]:
# Underdetermined system, need to compute residuals manually.
manual_residuals = y - X @ coeffs
return np.sum(np.square(manual_residuals))
else:
# Full rank or overdetermined system, use lstsq residuals.
return np.sum(residuals)
@njit
def linear_regression_cost_fixed_params(
starts: np.ndarray,
ends: np.ndarray,
response_data: np.ndarray,
covariate_data: np.ndarray,
coeffs: np.ndarray,
) -> np.ndarray:
"""Evaluate the linear regression cost for fixed parameters.
Parameters
----------
starts : np.ndarray
Start indices of the intervals (inclusive).
ends : np.ndarray
End indices of the intervals (exclusive).
response_data : np.ndarray
Array containing the response variable values.
covariate_data : np.ndarray
Array containing the covariate/predictor values.
coeffs : np.ndarray
Fixed regression coefficients. Shape should be compatible with
covariate_data for matrix multiplication.
Returns
-------
costs : np.ndarray
A 2D array of costs with one row for each interval and one column.
"""
n_intervals = len(starts)
costs = np.zeros((n_intervals, 1))
for i in range(n_intervals):
start, end = starts[i], ends[i]
X_features = covariate_data[start:end]
y_response = response_data[start:end, None]
# Compute predictions using fixed parameters:
y_pred = X_features @ coeffs
# Calculate residual sum of squares:
costs[i, 0] = np.sum(np.square(y_response - y_pred))
return costs
@njit
def linear_regression_cost_intervals(
starts: np.ndarray,
ends: np.ndarray,
response_data: np.ndarray,
covariate_data: np.ndarray,
) -> np.ndarray:
"""Evaluate the linear regression cost for each interval.
Parameters
----------
starts : np.ndarray
Start indices of the intervals (inclusive).
ends : np.ndarray
End indices of the intervals (exclusive).
response_data : np.ndarray
Array containing the response variable values.
covariate_data : np.ndarray
Array containing the covariate/predictor values.
Returns
-------
costs : np.ndarray
A 2D array of costs with one row for each interval and one column.
"""
n_intervals = len(starts)
costs = np.zeros((n_intervals, 1))
for i in range(n_intervals):
start, end = starts[i], ends[i]
X_features = covariate_data[start:end]
y_response = response_data[start:end]
costs[i, 0] = linear_regression_cost(X_features, y_response)
return costs
[docs]
class LinearRegressionCost(BaseCost):
"""Linear Regression sum of squared residuals cost.
This cost computes the sum of squared residuals from fitting a linear
regression model within each segment. One column of the input data X is
used as the response variable, and the remaining columns are used as
predictors.
Parameters
----------
response_col : int, optional (default=0)
Index of column in X to use as the response variable.
covariate_cols : list of `int` or `str`, optional (default=None)
Indices of columns in X to use as predictors. If None, all columns
except the response column are used as predictors.
param : array-like, optional (default=None)
Fixed regression coefficients. If None, coefficients are estimated
for each interval using ordinary least squares. Must be an array
with the same length as `covariate_cols`, if provided, or the number of
columns in X minus one if `covariate_cols` is None.
"""
_tags = {
"authors": ["johannvk"],
"maintainers": "johannvk",
"supports_fixed_param": True,
"is_conditional": True, # The cost uses covariates.
"is_aggregated": True, # Only a single response column is supported.
}
def __init__(
self,
response_col: str | int,
covariate_cols: list[str | int] = None,
param=None,
):
super().__init__(param)
self.response_col = response_col
self.covariate_cols = covariate_cols
self._response_col_idx: int | None = None
self._covariate_col_indices: list[int] | None = None
self._response_data: np.ndarray | None = None
self._covariate_data: np.ndarray | None = None
self._coeffs: np.ndarray | None = None
def _fit(self, X: np.ndarray, y=None):
"""Fit the cost.
This method validates input data and stores it for cost evaluation.
Parameters
----------
X : np.ndarray
Data to evaluate. Must be a 2D array.
y: None
Ignored. Included for API consistency by convention.
"""
# Check that X has enough columns
if X.shape[1] <= 1:
raise ValueError(
"X must have at least 2 columns for linear regression "
"(1 for response and at least 1 for predictors)."
)
# Check that response_col is valid:
self._response_col_idx = check_data_column(
self.response_col, "Response", X, self._X_columns
)
if self.covariate_cols is not None:
self._covariate_col_indices = [
check_data_column(col, "Covariate", X, self._X_columns)
for col in self.covariate_cols
]
else:
# Use all columns except the response column as covariates:
self._covariate_col_indices = list(range(X.shape[1]))
self._covariate_col_indices.remove(self._response_col_idx)
self._response_data = X[:, self._response_col_idx]
self._covariate_data = X[:, self._covariate_col_indices]
# Check params after input column indices are set:
self._param = self._check_param(self.param, X)
if self.param is not None:
self._coeffs = self._param
def _evaluate_optim_param(self, starts: np.ndarray, ends: np.ndarray) -> np.ndarray:
"""Evaluate the linear regression cost for each interval.
Evaluates the cost for `X[start:end]` for each each start, end in starts, ends.
The cost is computed using the optimal L2 regression coefficients.
Parameters
----------
starts : np.ndarray
Start indices of the intervals (inclusive).
ends : np.ndarray
End indices of the intervals (exclusive).
Returns
-------
costs : np.ndarray
A 2D array of costs with one row for each interval and one column.
"""
return linear_regression_cost_intervals(
starts, ends, self._response_data, self._covariate_data
)
def _evaluate_fixed_param(self, starts, ends) -> np.ndarray:
"""Evaluate the cost for fixed regression coefficients.
Evaluates the cost for `X[start:end]` for each each start, end in starts, ends.
The cost is computed using the fixed regression coefficients provided.
Parameters
----------
starts : np.ndarray
Start indices of the intervals (inclusive).
ends : np.ndarray
End indices of the intervals (exclusive).
Returns
-------
costs : np.ndarray
A 2D array of costs with one row for each interval and one column.
"""
return linear_regression_cost_fixed_params(
starts, ends, self._response_data, self._covariate_data, self._coeffs
)
def _check_fixed_param(self, param, X: np.ndarray) -> np.ndarray:
"""Check if the fixed parameter is valid relative to the data.
Parameters
----------
param : array-like
Fixed regression coefficients.
X : np.ndarray
Input data.
Returns
-------
param: np.ndarray
Fixed regression coefficients for cost calculation.
"""
param = np.asarray(param)
expected_length = len(self._covariate_col_indices)
if param.size != expected_length:
raise ValueError(
f"Expected {expected_length} coefficients"
f" ({expected_length} predictors), got {param.size}."
)
if param.ndim != 1 and param.shape[1] != 1:
raise ValueError(
f"Coefficients must have shape ({expected_length}, 1) or"
f" ({expected_length},). Got shape {param.shape}."
)
return param.reshape(-1, 1)
@property
def min_size(self) -> int:
"""Minimum size of the interval to evaluate.
Returns
-------
int
The minimum valid size of an interval to evaluate.
"""
if self.is_fitted:
# Need at least as many samples as covariates:
return len(self._covariate_col_indices)
else:
return None
[docs]
def get_model_size(self, p: int) -> int:
"""Get the number of parameters in the cost function.
Parameters
----------
p : int
Number of variables in the data.
Returns
-------
int
Number of parameters in the cost function.
"""
# Number of parameters = all features except response variable.
if self.is_fitted:
# Could use fewer covariates than the total number of columns:
return len(self._covariate_col_indices)
else:
# Default to all columns except the response variable:
return p - 1
[docs]
@classmethod
def get_test_params(cls, parameter_set="default"):
"""Return testing parameter settings for the estimator.
Parameters
----------
parameter_set : str, default="default"
Name of the set of test parameters to return, for use in tests.
Returns
-------
params : list of dict
Parameters to create testing instances of the class
"""
params = [
{"param": None, "response_col": 0},
{"param": np.array([1.0, 0.5, -0.3]), "response_col": 1},
]
return params