Source code for skchange.datasets._generate

"""Data generators."""

__author__ = ["Tveten"]

from numbers import Number

import numpy as np
import pandas as pd
from scipy.stats import multivariate_normal


[docs] def generate_changing_data( n: int = 100, changepoints: int | list[int] = 50, means: float | list[float] | list[np.ndarray] = 0.0, variances: float | list[float] | list[np.ndarray] = 1.0, random_state: int = None, ): """ Generate piecewise multivariate normal data with changing means and variances. Parameters ---------- n : int, optional, default=100 Number of observations. changepoints : int or list of ints, optional, default=50 Changepoints in the data. means : list of floats or list of arrays, optional, default=0.0 List of means for each segment. variances : list of floats or list of arrays, optional, default=1.0 List of variances for each segment. random_state : int or `RandomState`, optional Seed or random state for reproducible results. Defaults to None. Returns ------- `pd.DataFrame` DataFrame with generated data. """ if isinstance(changepoints, int): changepoints = [changepoints] if isinstance(means, Number): means = [means] if isinstance(variances, Number): variances = [variances] means = [np.asarray(mean).reshape(-1) for mean in means] variances = [np.asarray(variance).reshape(-1) for variance in variances] n_segments = len(changepoints) + 1 if len(means) == 1: means = means * n_segments if len(variances) == 1: variances = variances * n_segments if n_segments != len(means) or n_segments != len(variances): raise ValueError( "Number of segments (len(changepoints) + 1)," + " means and variances must be the same." ) if any([changepoint > n - 1 for changepoint in changepoints]): raise ValueError( "Changepoints must be within the range of the data" + f" (n={n} and max(changepoints)={max(changepoints)})." ) p = len(means[0]) x = multivariate_normal.rvs(np.zeros(p), np.eye(p), n, random_state) changepoints = [0] + changepoints + [n] for prev_cpt, next_cpt, mean, variance in zip( changepoints[:-1], changepoints[1:], means, variances ): x[prev_cpt:next_cpt] = mean + np.sqrt(variance) * x[prev_cpt:next_cpt] out_columns = [f"var{i}" for i in range(p)] df = pd.DataFrame(x, index=range(len(x)), columns=out_columns) return df
[docs] def generate_anomalous_data( n: int = 100, anomalies: tuple[int, int] | list[tuple[int, int]] = (70, 80), means: float | list[float] | list[np.ndarray] = 3.0, variances: float | list[float] | list[np.ndarray] = 1.0, random_state: int = None, ) -> pd.DataFrame: """ Generate multivariate normal data with anomalies. Parameters ---------- n : int, optional (default=100) Number of observations. anomalies : list of tuples, optional (default=[(71, 80)]) List of tuples of the form [start, end) indicating the start and end of an anomaly. means : list of floats or list of arrays, optional (default=[0.0]) List of means for each segment. variances : list of floats or list of arrays, optional (default=[1.0]) List of variances for each segment. random_state : int or `RandomState`, optional Seed or random state for reproducible results. Defaults to None. Returns ------- `pd.DataFrame` DataFrame with generated data. """ if isinstance(anomalies, tuple): anomalies = [anomalies] if isinstance(means, Number): means = [means] if isinstance(variances, Number): variances = [variances] means = [np.asarray(mean).reshape(-1) for mean in means] variances = [np.asarray(variance).reshape(-1) for variance in variances] if len(means) == 1: means = means * len(anomalies) if len(variances) == 1: variances = variances * len(anomalies) if len(anomalies) != len(means) or len(anomalies) != len(variances): raise ValueError("Number of anomalies, means and variances must be the same.") if any([len(anomaly) != 2 for anomaly in anomalies]): raise ValueError("Anomalies must be of length 2.") if any([anomaly[1] <= anomaly[0] for anomaly in anomalies]): raise ValueError("The start of an anomaly must be before its end.") if any([anomaly[1] > n for anomaly in anomalies]): raise ValueError("Anomalies must be within the range of the data.") p = len(means[0]) x = multivariate_normal.rvs(np.zeros(p), np.eye(p), n, random_state) for anomaly, mean, variance in zip(anomalies, means, variances): start, end = anomaly x[start:end] = mean + np.sqrt(variance) * x[start:end] out_columns = [f"var{i}" for i in range(p)] df = pd.DataFrame(x, index=range(len(x)), columns=out_columns) return df
[docs] def add_linspace_outliers(df, n_outliers, outlier_size): """ Add outliers to a DataFrame at evenly spaced positions. Parameters ---------- df : `pd.DataFrame` DataFrame to add outliers to. n_outliers : int Number of outliers to add. outlier_size : float Size of the outliers. Returns ------- `pd.DataFrame` DataFrame with outliers added. """ outlier_positions = np.linspace(0, df.size - 1, n_outliers, dtype=int) df.iloc[outlier_positions] += outlier_size return df
[docs] def generate_alternating_data( n_segments: int, segment_length: int, p: int = 1, mean: float = 0.0, variance: float = 1.0, affected_proportion: float = 1.0, random_state: int = None, ) -> pd.DataFrame: """ Generate multivariate normal data that is alternating between two states. The data alternates between a state with mean 0 and variance 1 and a state with mean `mean` and variance `variance`. The length of the segments are all identical and equal to `segment_length`. The proportion of components that are affected by the change is determined by `affected_proportion`. Parameters ---------- n_segments : int Number of segments to generate. segment_length : int Length of each segment. p : int, optional (default=1) Number of dimensions. mean : float, optional (default=0.0) Mean of every other segment. variance : float, optional (default=1.0) Variances of every other segment. affected_proportion : float, optional (default=1.0) Proportion of components {1, ..., p} that are affected by each change in every other segment. random_state : int or `RandomState`, optional Seed or random state for reproducible results. Defaults to None. Returns ------- `pd.DataFrame` DataFrame with generated data. """ means = [] vars = [] n_affected = int(np.round(p * affected_proportion)) for i in range(n_segments): zero_mean = [0] * p changed_mean = [mean] * n_affected + [0] * (p - n_affected) mean_vec = zero_mean if i % 2 == 0 else changed_mean means.append(mean_vec) one_var = [1] * p changed_var = [variance] * n_affected + [1] * (p - n_affected) vars_vec = one_var if i % 2 == 0 else changed_var vars.append(vars_vec) n = segment_length * n_segments changepoints = [segment_length * i for i in range(1, n_segments)] return generate_changing_data(n, changepoints, means, vars, random_state)