"""Base classes for anomaly detectors.
classes:
BaseSegmentAnomalyDetector
By inheriting from these classes the remaining methods of the BaseDetector class to
implement to obtain a fully functional anomaly detector are given below.
Needs to be implemented:
_fit(self, X, y=None)
_predict(self, X)
Optional to implement:
_transform_scores(self, X)
_update(self, X, y=None)
"""
import numpy as np
import pandas as pd
from ..base import BaseDetector
[docs]
class BaseSegmentAnomalyDetector(BaseDetector):
"""Base class for segment anomaly detectors.
Segment anomaly detectors detect segments of data points that are considered
anomalous.
Output format of the `predict` method: See the `dense_to_sparse` method.
Output format of the `transform` method: See the `sparse_to_dense` method.
"""
_tags = {
"authors": ["Tveten"],
"maintainers": ["Tveten"],
"task": "segmentation",
}
[docs]
@staticmethod
def sparse_to_dense(
y_sparse: pd.DataFrame, index: pd.Index, columns: pd.Index = None
) -> pd.DataFrame:
"""Convert the sparse output from the `predict` method to a dense format.
Parameters
----------
y_sparse : pd.DataFrame with RangeIndex
Detected segment anomalies. Must have the following column:
* ``"ilocs"`` - left-closed intervals of iloc based segments.
Can also have the following columns:
* ``"icolumns"`` - array of identified variables for each anomaly.
index : array-like
Indices that are to be annotated according to `y_sparse`.
columns : array-like
Columns that are to be annotated according to `y_sparse`. Only relevant if
y_sparse contains the column ``"icolumns"`` with identified variables.
Returns
-------
pd.DataFrame with the input data index and one column:
* ``"label"`` - integer labels ``1, ..., K`` for each segment anomaly.
``0`` is reserved for the normal instances.
"""
if "icolumns" in y_sparse:
return BaseSegmentAnomalyDetector._sparse_to_dense_icolumns(
y_sparse, index, columns
)
return BaseSegmentAnomalyDetector._sparse_to_dense_ilocs(y_sparse, index)
[docs]
@staticmethod
def dense_to_sparse(y_dense: pd.DataFrame) -> pd.DataFrame:
"""Convert the dense output from the `transform` method to a sparse format.
Parameters
----------
y_dense : pd.DataFrame
The dense output from the `transform` method. It must either have the
following column:
* ``"labels"`` - integer labels ``1, ..., K`` for each segment anomaly and
label 0 for normal instances.
Or it must have columns of the form:
* ``"labels_<*>"`` with integer labels ``1, ..., K`` for each segment
anomaly, and 0 for normal instances.
Returns
-------
pd.DataFrame :
A ``pd.DataFrame`` with a range index and two columns:
* ``"ilocs"`` - left-closed `pd.Interval`s of iloc based segments.
* ``"labels"`` - integer labels ``1, ..., K`` for each segment anomaly.
Notes
-----
The start and end points of the intervals can be accessed by
``output["ilocs"].array.left`` and ``output["ilocs"].array.right``,
respectively.
"""
if "labels" in y_dense.columns:
return BaseSegmentAnomalyDetector._dense_to_sparse_ilocs(y_dense)
elif y_dense.columns.str.startswith("labels_").all():
return BaseSegmentAnomalyDetector._dense_to_sparse_icolumns(y_dense)
raise ValueError(
"Invalid columns in `y_dense`. Expected 'labels' or 'labels_*'."
f" Got: {y_dense.columns}"
)
def _format_sparse_output(
self,
segment_anomalies: list[tuple[int, int]] | list[tuple[int, int, np.ndarray]],
closed: str = "left",
) -> pd.DataFrame:
"""Format the sparse output of segment anomaly detectors.
Can be reused by subclasses to format the output of the `_predict` method.
Parameters
----------
segment_anomalies : list
List of tuples containing start and end indices of segment anomalies,
and optionally a ``np.ndarray`` of the identified
variables/components/columns.
closed : str
Whether the ``(start, end)`` tuple correspond to intervals that are closed
on the left, right, both, or neither.
Returns
-------
pd.DataFrame :
A ``pd.DataFrame`` with a range index and two columns:
* ``"ilocs"`` - left-closed ``pd.Interval``s of iloc based segments.
* ``"labels"`` - integer labels ``1, ..., K`` for each segment anomaly.
Notes
-----
The start and end points of the intervals can be accessed by
``output["ilocs"].array.left`` and ``output["ilocs"].array.right``,
respectively.
"""
# Cannot extract this from segment_anomalies as it may be an empty list.
if self.get_tag("capability:identify_variables"):
return self._format_sparse_output_icolumns(segment_anomalies, closed)
else:
return self._format_sparse_output_ilocs(segment_anomalies, closed)
@staticmethod
def _sparse_to_dense_ilocs(
y_sparse: pd.DataFrame, index: pd.Index, columns: pd.Index = None
) -> pd.DataFrame:
"""Convert the sparse output from the `predict` method to a dense format.
Parameters
----------
y_sparse : pd.DataFrame with RangeIndex
Detected segment anomalies. Must have the following column:
* ``"ilocs"`` - left-closed intervals of iloc based segments.
index : array-like
Indices that are to be annotated according to `y_sparse`.
columns: array-like
Not used. Only for API compatibility.
Returns
-------
pd.DataFrame with the input data index and one column:
* ``"label"`` - integer labels ``1, ..., K`` for each segment anomaly.
``0`` is reserved for the normal instances.
"""
labels = pd.IntervalIndex(y_sparse["ilocs"]).get_indexer(index)
# `get_indexer` return values 0 for the values inside the first interval, 1 to
# the values within the next interval and so on, and -1 for values outside any
# interval. The `skchange` convention is that 0 is normal and > 0 is anomalous,
# so we add 1 to the result.
labels += 1
return pd.DataFrame(labels, index=index, columns=["labels"], dtype="int64")
@staticmethod
def _dense_to_sparse_ilocs(y_dense: pd.DataFrame) -> pd.DataFrame:
"""Convert the dense output from the `transform` method to a sparse format.
Parameters
----------
y_dense : pd.DataFrame
The dense output from the `transform` method. Must have the following
column:
* ``"labels"`` - integer labels ``1, ..., K`` for each segment anomaly and
label ``0`` for normal instances.
Returns
-------
pd.DataFrame :
A `pd.DataFrame` with a range index and two columns:
* ``"ilocs"`` - left-closed ``pd.Interval``s of iloc based segments.
* ``"labels"`` - integer labels ``1, ..., K`` for each segment anomaly.
Notes
-----
The start and end points of the intervals can be accessed by
``output["ilocs"].array.left`` and ``output["ilocs"].array.right``,
respectively.
"""
# The sparse format only uses integer positions, so we reset the index.
y_dense = y_dense["labels"].reset_index(drop=True)
y_anomaly = y_dense.loc[y_dense.values > 0]
anomaly_locations_diff = y_anomaly.index.diff()
first_anomaly_start = y_anomaly.index[:1].to_numpy()
anomaly_starts = y_anomaly.index[anomaly_locations_diff > 1]
anomaly_starts = np.insert(anomaly_starts, 0, first_anomaly_start)
last_anomaly_end = y_anomaly.index[-1:].to_numpy() + 1
anomaly_ends = y_anomaly.index[np.roll(anomaly_locations_diff > 1, -1)] + 1
anomaly_ends = np.insert(anomaly_ends, len(anomaly_ends), last_anomaly_end)
anomaly_intervals = list(zip(anomaly_starts, anomaly_ends))
return BaseSegmentAnomalyDetector._format_sparse_output_ilocs(
anomaly_intervals, closed="left"
)
@staticmethod
def _format_sparse_output_ilocs(
anomaly_intervals: list[tuple[int, int]], closed: str = "left"
) -> pd.DataFrame:
"""Format the sparse output of segment anomaly detectors.
Can be reused by subclasses to format the output of the `_predict` method.
Parameters
----------
anomaly_intervals : list
List of tuples containing start and end indices of segment anomalies.
Returns
-------
pd.DataFrame :
A `pd.DataFrame` with a range index and two columns:
* ``"ilocs"`` - left-closed ``pd.Interval``s of iloc based segments.
* ``"labels"`` - integer labels ``1, ..., K`` for each segment anomaly.
Notes
-----
The start and end points of the intervals can be accessed by
``output["ilocs"].array.left`` and ``output["ilocs"].array.right``,
respectively.
"""
anomaly_intervals = [(int(start), int(end)) for start, end in anomaly_intervals]
return pd.DataFrame(
{
"ilocs": pd.IntervalIndex.from_tuples(anomaly_intervals, closed=closed),
"labels": pd.RangeIndex(1, len(anomaly_intervals) + 1),
},
)
@staticmethod
def _sparse_to_dense_icolumns(
y_sparse: pd.DataFrame, index: pd.Index, columns: pd.Index
) -> pd.DataFrame:
"""Convert the sparse output from the `predict` method to a dense format.
Parameters
----------
y_sparse : pd.DataFrame with RangeIndex
Detected segment anomalies. Must have the following columns:
* ``"ilocs"`` - left-closed intervals of iloc based segments.
* ``"icolumns"`` - array of identified variables for each anomaly.
index : array-like
Indices that are to be annotated according to `y_sparse`.
columns : array-like
Columns that are to be annotated according to `y_sparse`.
Returns
-------
pd.DataFrame with the input data index and as many columns as in X:
* ``"labels_<X.columns[i]>"`` for each column index i in ``X.columns``:
Integer labels starting from ``0``.
"""
anomaly_intervals = y_sparse["ilocs"].array
anomaly_starts = anomaly_intervals.left
anomaly_ends = anomaly_intervals.right
anomaly_columns = y_sparse["icolumns"]
start_is_open = anomaly_intervals.closed in ["neither", "right"]
if start_is_open:
anomaly_starts += 1 # Exclude the start index in the for loop below.
end_is_closed = anomaly_intervals.closed in ["both", "right"]
if end_is_closed:
anomaly_ends += 1 # Include the end index in the for loop below.
labels = np.zeros((len(index), len(columns)), dtype="int64")
anomalies = zip(anomaly_starts, anomaly_ends, anomaly_columns)
for i, (start, end, affected_columns) in enumerate(anomalies):
labels[start:end, affected_columns] = i + 1
prefixed_columns = [f"labels_{column}" for column in columns]
return pd.DataFrame(labels, index=index, columns=prefixed_columns)
@staticmethod
def _dense_to_sparse_icolumns(y_dense: pd.DataFrame):
"""Convert the dense output from the `transform` method to a sparse format.
Parameters
----------
y_dense : pd.DataFrame
The dense output from the `transform` method. Must have columns of the form:
* `"labels_<*>"` with integer labels ``1, ..., K`` for each segment anomaly,
and ``0`` for normal instances.
Returns
-------
pd.DataFrame :
A ``pd.DataFrame`` with a range index and three columns:
* ``"ilocs"`` - left-closed ``pd.Interval``s of iloc based segments.
* ``"labels"`` - integer labels ``1, ..., K`` for each segment anomaly.
* ``"icolumns"`` - list of affected columns for each anomaly.
"""
# The sparse format only uses integer positions, so we reset index and columns.
y_dense = y_dense.reset_index(drop=True)
y_dense.columns = range(y_dense.columns.size)
anomaly_intervals = []
unique_labels = np.unique(y_dense.values)
for i in unique_labels[unique_labels > 0]:
anomaly_mask = y_dense == i
which_columns = anomaly_mask.any(axis=0)
which_rows = anomaly_mask.any(axis=1)
anomaly_columns = anomaly_mask.columns[which_columns].to_list()
anomaly_start = anomaly_mask.index[which_rows][0]
anomaly_end = anomaly_mask.index[which_rows][-1]
anomaly_intervals.append((anomaly_start, anomaly_end + 1, anomaly_columns))
return BaseSegmentAnomalyDetector._format_sparse_output_icolumns(
anomaly_intervals, closed="left"
)
@staticmethod
def _format_sparse_output_icolumns(
segment_anomalies: list[tuple[int, int, np.ndarray]],
closed: str = "left",
) -> pd.DataFrame:
"""Format the sparse output of subset segment anomaly detectors.
Can be reused by subclasses to format the output of the `_predict` method.
Parameters
----------
segment_anomalies : list
List of tuples containing start and end indices of segment
anomalies and a ``np.array`` of the affected components/columns.
closed : str
Whether the ``(start, end)`` tuple correspond to intervals that are closed
on the left, right, both, or neither.
Returns
-------
pd.DataFrame :
A ``pd.DataFrame`` with a range index and three columns:
* ``"ilocs"`` - left-closed ``pd.Interval``s of iloc based segments.
* ``"labels"`` - integer labels ``1, ..., K`` for each segment anomaly.
* ``"icolumns"`` - list of affected columns for each anomaly.
"""
ilocs = [(int(start), int(end)) for start, end, _ in segment_anomalies]
icolumns = [
np.array(components, dtype="int64")
for _, _, components in segment_anomalies
]
return pd.DataFrame(
{
"ilocs": pd.IntervalIndex.from_tuples(ilocs, closed=closed),
"labels": pd.RangeIndex(1, len(ilocs) + 1),
"icolumns": icolumns,
}
)