#!/usr/bin/env python3 -u
# copyright: sktime developers, BSD-3-Clause License (see LICENSE file)
"""Base class template for detector base type for time series streams.
class name: BaseDetector
Scitype defining methods:
fitting - fit(self, X, y=None)
annotating - predict(self, X)
updating (temporal) - update(self, X, y=None)
update&annotate - update_predict(self, X, y=None)
Inspection methods:
hyper-parameter inspection - get_params()
fitted parameter inspection - get_fitted_params()
State:
fitted model/strategy - by convention, any attributes ending in "_"
fitted state flag - check_is_fitted()
"""
# todo 0.37.0: remove BaseSeriesAnnotator
__author__ = ["fkiraly", "tveten", "alex-jg3", "satya-pattnaik"]
__all__ = ["BaseDetector", "BaseSeriesAnnotator"]
import numpy as np
import pandas as pd
from sktime.base import BaseEstimator
from sktime.utils.validation.series import check_series
from sktime.utils.warnings import warn
class BaseDetector(BaseEstimator):
"""Base class for time series detectors.
Developers should set the task and learning_type tags in the derived class.
task : str {"segmentation", "change_point_detection", "anomaly_detection"}
The main detection task:
* If ``segmentation``, the detector divides timeseries into discrete chunks
based on certain criteria. The same label can be applied at multiple
disconnected regions of the timeseries.
* If ``change_point_detection``, the detector finds points where the
statistical properties of the timeseries change significantly.
* If ``anomaly_detection``, the detector finds points that differ significantly
from the normal statistical properties of the timeseries.
learning_type : str {"supervised", "unsupervised", "semi_supervised"}
Detection learning type:
* If ``supervised``, the detector learns from labelled data.
* If ``unsupervised``, the detector learns from unlabelled data.
* If ``semi_supervised``, the detector learns from a combination of labelled
and unlabelled data.
Notes
-----
The base series detector specifies the methods and method
signatures that all detectors have to implement.
Specific implementations of these methods is deferred to concrete detectors.
"""
_tags = {
# packaging info
# --------------
"authors": "sktime developers", # author(s) of the object
"maintainers": "sktime developers", # current maintainer(s) of the object
"python_version": None, # PEP 440 python version specifier to limit versions
"python_dependencies": None, # str or list of str, package soft dependencies
# estimator tags
# --------------
# todo 0.37.0 switch order of series-annotator and detector
# todo 1.0.0 - remove series-annotator
"object_type": ["series-annotator", "detector"], # type of object
"learning_type": "None", # supervised, unsupervised
"task": "None", # anomaly_detection, change_point_detection, segmentation
"capability:multivariate": False,
"capability:missing_values": False,
"capability:update": False,
#
# todo: distribution_type does not seem to be used - refactor or remove
"distribution_type": "None",
"X_inner_mtype": "pd.DataFrame",
"fit_is_empty": False,
}
def __init__(self):
self._is_fitted = False
self._X = None
self._Y = None
task = self.get_tag("task")
learning_type = self.get_tag("learning_type")
super().__init__()
self.set_tags(**{"task": task, "learning_type": learning_type})
def __rmul__(self, other):
"""Magic * method, return (left) concatenated DetectorPipeline.
Implemented for ``other`` being a transformer, otherwise returns
``NotImplemented``.
Parameters
----------
other: ``sktime`` transformer, must inherit from BaseTransformer
otherwise, ``NotImplemented`` is returned
Returns
-------
DetectorPipeline object,
concatenation of ``other`` (first) with ``self`` (last).
not nested, contains only non-DetectorPipeline ``sktime`` steps
"""
from sktime.detection.compose import DetectorPipeline
from sktime.transformations.base import BaseTransformer
from sktime.transformations.series.adapt import TabularToSeriesAdaptor
from sktime.utils.sklearn import is_sklearn_transformer
# we wrap self in a pipeline, and concatenate with the other
# the TransformedTargetForecaster does the rest, e.g., dispatch on other
if isinstance(other, BaseTransformer):
self_as_pipeline = DetectorPipeline(steps=[self])
return other * self_as_pipeline
elif is_sklearn_transformer(other):
return TabularToSeriesAdaptor(other) * self
else:
return NotImplemented
# todo 0.37.0: remove the Y parameter and related handling
[docs]
def fit(self, X, y=None, Y=None):
"""Fit to training data.
Parameters
----------
X : pd.DataFrame, pd.Series or np.ndarray
Training data to fit model to (time series).
y : pd.DataFrame with RangeIndex, optional.
Known events for traininmg, in ``X``, if detector is supervised.
Each row ``y`` is a known event.
Can have the following columns:
* ``"ilocs"`` - always. Values encode where/when the event takes place,
via ``iloc`` references to indices of ``X``,
or ranges ot indices of ``X``, as below.
* ``"label"`` - if the task, by tags, is supervised or semi-supervised
segmentation with labels, or segment clustering.
The meaning of entries in the ``"ilocs"`` column and ``"labels"``
column describe the event in a given row as follows:
* If ``task`` is ``"anomaly_detection"`` or ``"change_point_detection"``,
``"ilocs"`` contains the iloc index at which the event takes place.
* If ``task`` is ``"segmentation"``, ``"ilocs"`` contains left-closed
intervals of iloc based segments, interpreted as the range
of indices over which the event takes place.
Labels (if present) in the ``"labels"`` column indicate the type of event.
Returns
-------
self :
Reference to self.
Notes
-----
Creates fitted model that updates attributes ending in "_". Sets
_is_fitted flag to True.
"""
X_inner = self._check_X(X)
# skip inner _fit if fit is empty
# we also do not need to memorize data, since we do same in _update
# basic checks (above) are still needed
if self.get_tag("fit_is_empty", False):
self._is_fitted = True
return self
if Y is not None:
warn(
"Warning: the Y parameter in detection/annotation algorithms "
"is deprecated and will be removed in the 0.37.0 release. "
"Users should use the y parameter instead. "
"Until the 0.37.0 release, the Y parameter will be used if "
"no y parameter is provided, ensuring backwards compatibility.",
stacklevel=2,
)
if Y is not None and y is None:
y = Y
self._X = X
self._y = y
# fkiraly: insert checks/conversions here, after PR #1012 I suggest
if _method_has_arg(self._fit, "y"):
self._fit(X=X, y=y)
elif _method_has_arg(self._fit, "Y"):
self._fit(X=X, Y=y)
warn(
"Warning: the Y parameter in detection/annotation algorithms "
"is deprecated and will be removed in the 0.37.0 release. "
"Users should use the y parameter instead. "
f"The class {self.__class__.__name__} uses the Y parameter "
"internally in _fit, this should be replaced with y by a maintainer. "
f"Until the 0.37.0 release, this will raise no exceptions, "
"ensuring backwards compatibility.",
stacklevel=2,
)
else:
self._fit(X=X_inner)
# this should happen last
self._is_fitted = True
return self
[docs]
def predict(self, X):
"""Create labels on test/deployment data.
This method returns a list-like type specific to the detection task,
e.g., segments for segmentation, anomalies for anomaly detection.
The encoding varies by task and learning_type (tags), see below.
For returns that are type consistent across tasks, see
``predict_points`` and ``predict_segments``.
Parameters
----------
X : pd.DataFrame, pd.Series or np.ndarray
Time series subject to detection, which will be assigned labels or scores.
Returns
-------
y : pd.DataFrame with RangeIndex
Detected or predicted events.
Each row ``y`` is a detected or predicted event.
Can have the following columns:
* ``"ilocs"`` - always. Values encode where/when the event takes place,
via ``iloc`` references to indices of ``X``,
or ranges ot indices of ``X``, as below.
* ``"label"`` - if the task, by tags, is supervised or semi-supervised
segmentation with labels, or segment clustering.
The meaning of entries in the ``"ilocs"`` column and ``"labels"``
column describe the event in a given row as follows:
* If ``task`` is ``"anomaly_detection"`` or ``"change_point_detection"``,
``"ilocs"`` contains the iloc index at which the event takes place.
* If ``task`` is ``"segmentation"``, ``"ilocs"`` contains left-closed
intervals of iloc based segments, interpreted as the range
of indices over which the event takes place.
Labels (if present) in the ``"labels"`` column indicate the type of event.
"""
self.check_is_fitted()
X_inner = self._check_X(X)
# fkiraly: insert checks/conversions here, after PR #1012 I suggest
y = self._predict(X=X_inner)
# deal with legacy return format with intervals in index
y = self._coerce_to_df(y, columns=["ilocs"])
return y
def transform(self, X):
"""Create labels on test/deployment data.
Parameters
----------
X : pd.DataFrame, pd.Series or np.ndarray
Time series subject to detection, which will be assigned labels or scores.
Returns
-------
y : pd.DataFrame with same index as X
Labels for sequence ``X``.
* If ``task`` is ``"anomaly_detection"``, the values are integer labels.
A value of 0 indicates that ``X``, at the same time index, has no anomaly.
Other values indicate an anomaly.
Most detectors will return 0 or 1, but some may return more values,
if they can detect different types of anomalies.
indicating whether ``X``, at the same
index, is an anomaly, 0 for no, 1 for yes.
* If ``task`` is ``"changepoint_detection"``, the values are integer labels,
indicating labels for segments between changepoints.
Possible labels are integers starting from 0.
* If ``task`` is "segmentation", the values are integer labels of the
segments. Possible labels are integers starting from 0.
"""
y_sparse = self.predict(X)
y_dense = self.sparse_to_dense(y_sparse, pd.RangeIndex(len(X)))
y_dense = self._coerce_to_df(y_dense, columns=["labels"])
return y_dense
[docs]
def predict_scores(self, X):
"""Return scores for predicted labels on test/deployment data.
Parameters
----------
X : pd.DataFrame, pd.Series or np.ndarray
Data to label (time series).
Returns
-------
scores : pd.DataFrame with same index as return of predict
Scores for prediction of sequence ``X``.
"""
self.check_is_fitted()
X_inner = self._check_X(X)
scores = self._predict_scores(X_inner)
return pd.DataFrame(scores)
[docs]
def update(self, X, y=None, Y=None):
"""Update model with new data and optional ground truth labels.
Parameters
----------
X : pd.DataFrame, pd.Series or np.ndarray
Training data to update model with (time series).
y : pd.Series, optional
Ground truth labels for training if detector is supervised.
Returns
-------
self :
Reference to self.
Notes
-----
Updates fitted model that updates attributes ending in "_".
"""
self.check_is_fitted()
X_inner = self._check_X(X)
# no update needed if fit is empty
if self.get_tag("fit_is_empty", False):
return self
if Y is not None:
warn(
"Warning: the Y parameter in detection/annotation algorithms "
"is deprecated and will be removed in the 0.37.0 release. "
"Users should use the y parameter instead. "
"Until the 0.37.0 release, the Y parameter will be used if "
"no y parameter is provided, ensuring backwards compatibility.",
stacklevel=2,
)
if y is None and Y is not None:
y = Y
self._X = X_inner.combine_first(self._X)
if y is not None:
self._y = y.combine_first(self._y)
if _method_has_arg(self._update, "y"):
self._update(X=X_inner, y=y)
elif _method_has_arg(self._update, "Y"):
self._update(X=X_inner, Y=y)
else:
self._update(X=X_inner)
return self
[docs]
def update_predict(self, X, y=None):
"""Update model with new data and create labels for it.
Parameters
----------
X : pd.DataFrame, pd.Series or np.ndarray
Training data to update model with, time series.
y : pd.DataFrame with RangeIndex, optional.
Known events for training, in ``X``, if detector is supervised.
Each row ``y`` is a known event.
Can have the following columns:
* ``"ilocs"`` - always. Values encode where/when the event takes place,
via ``iloc`` references to indices of ``X``,
or ranges ot indices of ``X``, as below.
* ``"label"`` - if the task, by tags, is supervised or semi-supervised
segmentation with labels, or segment clustering.
The meaning of entries in the ``"ilocs"`` column and ``"labels"``
column describe the event in a given row as follows:
* If ``task`` is ``"anomaly_detection"`` or ``"change_point_detection"``,
``"ilocs"`` contains the iloc index at which the event takes place.
* If ``task`` is ``"segmentation"``, ``"ilocs"`` contains left-closed
intervals of iloc based segments, interpreted as the range
of indices over which the event takes place.
Labels (if present) in the ``"labels"`` column indicate the type of event.
Returns
-------
y : pd.DataFrame with RangeIndex
Detected or predicted events.
Each row ``y`` is a detected or predicted event.
Can have the following columns:
* ``"ilocs"`` - always. Values encode where/when the event takes place,
via ``iloc`` references to indices of ``X``,
or ranges ot indices of ``X``, as below.
* ``"label"`` - if the task, by tags, is supervised or semi-supervised
segmentation, or segment clustering.
The meaning of entries in the ``"ilocs"`` column and ``"labels"``
column describe the event in a given row as follows:
* If ``task`` is ``"anomaly_detection"`` or ``"change_point_detection"``,
``"ilocs"`` contains the iloc index at which the event takes place.
* If ``task`` is ``"segmentation"``, ``"ilocs"`` contains left-closed
intervals of iloc based segments, interpreted as the range
of indices over which the event takes place.
Labels (if present) in the ``"labels"`` column indicate the type of event.
"""
X_inner = self._check_X(X)
self.update(X=X, y=y)
y = self.predict(X=X_inner)
return y
# todo 0.37.0: remove Y argument
[docs]
def fit_predict(self, X, y=None, Y=None):
"""Fit to data, then predict it.
Fits model to X and Y with given detection parameters
and returns the detection labels produced by the model.
Parameters
----------
X : pd.DataFrame, pd.Series or np.ndarray
Data to be transformed
y : pd.DataFrame with RangeIndex, optional.
Known events for training, in ``X``, if detector is supervised.
Each row ``y`` is a known event.
Can have the following columns:
* ``"ilocs"`` - always. Values encode where/when the event takes place,
via ``iloc`` references to indices of ``X``,
or ranges ot indices of ``X``, as below.
* ``"label"`` - if the task, by tags, is supervised or semi-supervised
segmentation with labels, or segment clustering.
The meaning of entries in the ``"ilocs"`` column and ``"labels"``
column describe the event in a given row as follows:
* If ``task`` is ``"anomaly_detection"`` or ``"change_point_detection"``,
``"ilocs"`` contains the iloc index at which the event takes place.
* If ``task`` is ``"segmentation"``, ``"ilocs"`` contains left-closed
intervals of iloc based segments, interpreted as the range
of indices over which the event takes place.
Labels (if present) in the ``"labels"`` column indicate the type of event.
Returns
-------
y : pd.DataFrame with RangeIndex
Detected or predicted events.
Each row ``y`` is a detected or predicted event.
Can have the following columns:
* ``"ilocs"`` - always. Values encode where/when the event takes place,
via ``iloc`` references to indices of ``X``,
or ranges ot indices of ``X``, as below.
* ``"label"`` - if the task, by tags, is supervised or semi-supervised
segmentation with labels, or segment clustering.
The meaning of entries in the ``"ilocs"`` column and ``"labels"``
column describe the event in a given row as follows:
* If ``task`` is ``"anomaly_detection"`` or ``"change_point_detection"``,
``"ilocs"`` contains the iloc index at which the event takes place.
* If ``task`` is ``"segmentation"``, ``"ilocs"`` contains left-closed
intervals of iloc based segments, interpreted as the range
of indices over which the event takes place.
Labels (if present) in the ``"labels"`` column indicate the type of event.
"""
# Non-optimized default implementation; override when a better
# method is possible for a given algorithm.
return self.fit(X, y=y, Y=Y).predict(X)
# todo 0.37.0: remove Y argument
def fit_transform(self, X, y=None, Y=None):
"""Fit to data, then transform it.
Fits model to X and Y with given detection parameters
and returns the detection labels made by the model.
Parameters
----------
X : pd.DataFrame, pd.Series or np.ndarray
Data to be transformed
y : pd.Series or np.ndarray, optional (default=None)
Target values of data to be predicted.
Returns
-------
y : pd.DataFrame with same index as X
Labels for sequence ``X``.
* If ``task`` is ``"anomaly_detection"``, the values are integer labels.
A value of 0 indicatesthat ``X``, at the same time index, has no anomaly.
Other values indicate an anomaly.
Most detectors will return 0 or 1, but some may return more values,
if they can detect different types of anomalies.
indicating whether ``X``, at the same
index, is an anomaly, 0 for no, 1 for yes.
* If ``task`` is ``"changepoint_detection"``, the values are integer labels,
indicating labels for segments between changepoints.
Possible labels are integers starting from 0.
* If ``task`` is "segmentation", the values are integer labels of the
segments. Possible labels are integers starting from 0.
"""
y_sparse = self.fit_predict(X, y=y, Y=Y)
y_dense = self.sparse_to_dense(y_sparse, index=X.index)
y_dense = self._coerce_to_df(y_dense, columns=["labels"])
return y_dense
def _coerce_to_df(self, y, columns=None):
"""Coerce output to a DataFrame.
Also deals with the following downwards cases:
* IntervalIndex containing segments -> DataFrame with "ilocs" column
"""
if not isinstance(y, (pd.Series, pd.DataFrame)):
y = pd.DataFrame(y, columns=columns, dtype="int64")
if isinstance(y.index, pd.IntervalIndex):
if isinstance(y, pd.Series):
y = pd.DataFrame(y.index, columns=columns)
elif isinstance(y, pd.DataFrame):
y_index = y.index
y_index = pd.DataFrame(y_index, columns=columns)
y = y.reset_index(drop=True)
y = pd.concat([y_index, y], axis=1)
if not isinstance(y, pd.DataFrame):
y = pd.DataFrame(y, columns=columns, dtype="int64")
return y
def _coerce_intervals_to_values(self, y):
if not isinstance(y, (pd.Series, pd.DataFrame)):
y = pd.Series(y, dtype="int64")
if isinstance(y.index, pd.IntervalIndex):
if isinstance(y, pd.Series):
y = pd.Series(y.index)
return y
def _check_X(self, X):
"""Check input data.
Parameters
----------
X : pd.DataFrame, pd.Series or np.ndarray
Data to be transformed
Returns
-------
X : X_inner_mtype
Data to be transformed
"""
return X
# this causes errors, we need to investigate
# X_inner_mtype = self.get_tag("X_inner_mtype")
# X_inner = convert_to(X, X_inner_mtype)
# return X_inner
def _fit(self, X, y=None):
"""Fit to training data.
private _fit containing the core logic, called from fit
Writes to self:
Sets fitted model attributes ending in "_".
Parameters
----------
X : pd.DataFrame
Training data to fit model to time series.
y : pd.Series, optional
Ground truth labels for training if detector is supervised.
Returns
-------
self :
Reference to self.
"""
raise NotImplementedError("abstract method")
def _predict(self, X):
"""Create labels on test/deployment data.
private _predict containing the core logic, called from predict
Parameters
----------
X : pd.DataFrame
Time series subject to detection, which will be assigned labels or scores.
Returns
-------
y : pd.Series with RangeIndex
Labels for sequence ``X``, in sparse format.
Values are ``iloc`` references to indices of ``X``.
* If ``task`` is ``"anomaly_detection"`` or ``"change_point_detection"``,
the values are integer indices of the changepoints/anomalies.
* If ``task`` is "segmentation", the values are ``pd.Interval`` objects.
"""
raise NotImplementedError("abstract method")
def _predict_scores(self, X):
"""Return scores for predicted labels on test/deployment data.
core logic
Parameters
----------
X : pd.DataFrame
Time series subject to detection, which will be assigned labels or scores.
Returns
-------
Y : pd.Series
Labels for sequence X exact format depends on detection type.
"""
raise NotImplementedError("abstract method")
def _transform_scores(self, X):
"""Return scores for predicted labels on test/deployment data.
core logic
Parameters
----------
X : pd.DataFrame
Time series subject to detection, which will be assigned labels or scores.
Returns
-------
scores : pd.DataFrame with same index as X
Scores for sequence ``X``.
"""
raise NotImplementedError("abstract method")
def _transform_scores(self, X):
"""Return scores for predicted labels on test/deployment data.
core logic
Parameters
----------
X : pd.DataFrame
Time series subject to detection, which will be assigned labels or scores.
Returns
-------
scores : pd.DataFrame with same index as X
Scores for sequence ``X``.
"""
raise NotImplementedError("abstract method")
def _update(self, X, y=None):
"""Update model with new data and optional ground truth labels.
core logic
Parameters
----------
X : pd.DataFrame
Training data to update model with time series
y : pd.Series, optional
Ground truth labels for training if detector is supervised.
Returns
-------
self :
Reference to self.
Notes
-----
Updates fitted model that updates attributes ending in "_".
"""
# default/fallback: re-fit to all data
self._fit(self._X, self._y)
return self
def predict_segments(self, X):
"""Predict segments on test/deployment data.
The main difference to ``predict`` is that this method always returns
a ``pd.DataFrame`` with segments of interest, even if the task is not
segmentation.
Parameters
----------
X : pd.DataFrame
Time series subject to detection, which will be assigned labels or scores.
Returns
-------
y : pd.DataFrame with RangeIndex
``pd.DataFrame`` with the following columns:
* ``"ilocs"`` - always. Values are left-closed intervals with
left/right values being ``iloc`` references to indices of ``X``,
signifying segments.
* ``"labels"`` - if the task, by tags, is supervised or semi-supervised
segmentation, or segment clustering.
The meaning of segments in the ``"ilocs"`` column and ``"labels"``
column is as follows:
* If ``task`` is ``"anomaly_detection"`` or ``"change_point_detection"``,
the intervals are intervals between changepoints/anomalies, and
potential labels are consecutive integers starting from 0.
* If ``task`` is ``"segmentation"``, the values are segmentation labels.
"""
self.check_is_fitted()
X = check_series(X)
task = self.get_tag("task")
if task in ["anomaly_detection", "change_point_detection"]:
y_pred_pts = self.predict_points(X)
y_pred = self.change_points_to_segments(y_pred_pts, start=0, end=len(X))
elif task == "segmentation":
y_pred = self._predict_segments(X)
y_pred = self._coerce_to_df(y_pred, columns=["ilocs"])
return y_pred
[docs]
def predict_points(self, X):
"""Predict changepoints/anomalies on test/deployment data.
The main difference to ``predict`` is that this method always returns
a ``pd.DataFrame`` with points of interest, even if the task is not
anomaly or change point detection.
Parameters
----------
X : pd.DataFrame
Time series subject to detection, which will be assigned labels or scores.
Returns
-------
y : pd.DataFrame with RangeIndex
``pd.DataFrame`` with the following columns:
* ``"ilocs"`` - always. Values are integers, ``iloc``
references to indices of ``X``, signifying points of interest.
* ``"labels"`` - if the task, by tags, is supervised or semi-supervised
segmentation, or anomaly clustering.
The meaning of segments in the ``"ilocs"`` column and ``"labels"``
column is as follows:
* If ``task`` is ``"anomaly_detection"`` or ``"change_point_detection"``,
the values are integer indices of the changepoints/anomalies.
* If ``task`` is ``"segmentation"``, the values are consecutive
segment boundaries.
The ``"labels"`` are potential labels for the points of interest.
"""
self.check_is_fitted()
X = check_series(X)
task = self.get_tag("task")
if task in ["anomaly_detection", "change_point_detection"]:
y_pred = self._predict_points(X)
elif task == "segmentation":
y_pred_seg = pd.DataFrame(self.predict_segments(X))
y_pred = self.segments_to_change_points(y_pred_seg)
y_pred = self._coerce_to_df(y_pred, columns=["ilocs"])
return y_pred
def _predict_segments(self, X):
"""Predict segments on test/deployment data.
Parameters
----------
X : pd.DataFrame
Time series subject to detection, which will be assigned labels or scores.
Returns
-------
Y : pd.Series
A series with an index of intervals. Each interval is the range of a
segment and the corresponding value is the label of the segment.
"""
return self._predict(X)
def _predict_points(self, X):
"""Predict changepoints/anomalies on test/deployment data.
Parameters
----------
X : pd.DataFrame
Time series subject to detection, which will be assigned labels or scores.
Returns
-------
Y : pd.Series
A series whose values are the changepoints/anomalies in X.
"""
return self._predict(X)
[docs]
@staticmethod
def sparse_to_dense(y_sparse, index):
"""Convert the sparse output from an detector to a dense format.
Parameters
----------
y_sparse : pd.Series
* If ``y_sparse`` is a series with an index of intervals, it should
represent segments where each value of the series is label of a segment.
Unclassified intervals should be labelled -1. Segments must never have
the label 0.
* If the index of ``y_sparse`` is not a set of intervals, the values of the
series should represent the indexes of changepoints/anomalies.
index : array-like
Larger set of indices which contains event indices in ``y_sparse``,
to be used as the index of the returned series.
Returns
-------
pd.Series
A series with an index of ``index`` is returned.
* If ``y_sparse`` is a series of changepoints/anomalies then the returned
series is labelled 0 and 1 dependendy on whether the index is associated
with an anomaly/changepoint. Where 1 means anomaly/changepoint.
* If ``y_sparse`` is a series of segments then the returned series is
labelled depending on the segment its indexes fall into. Indexes that
fall into no segments are labelled -1.
Examples
--------
>>> import pandas as pd
>>> from sktime.detection.base import BaseDetector
>>> y_sparse = pd.Series([2, 5, 7]) # Indices of changepoints/anomalies
>>> index = range(0, 8)
>>> BaseDetector.sparse_to_dense(y_sparse, index=index)
0 0
1 0
2 1
3 0
4 0
5 1
6 0
7 1
dtype: int64
>>> y_sparse = pd.Series(
... [1, 2, 1],
... index=pd.IntervalIndex.from_arrays(
... [0, 4, 6], [4, 6, 10], closed="left"
... )
... )
>>> index = range(10)
>>> BaseDetector.sparse_to_dense(y_sparse, index=index)
0 1
1 1
2 1
3 1
4 2
5 2
6 1
7 1
8 1
9 1
dtype: int64
"""
if isinstance(y_sparse, pd.DataFrame):
y_sparse = y_sparse.iloc[:, 0]
if not isinstance(y_sparse, pd.Series):
y_sparse = pd.Series(y_sparse, dtype="int64")
if isinstance(y_sparse.index.dtype, pd.IntervalDtype):
# Segmentation case
y_dense = BaseDetector._sparse_segments_to_dense(y_sparse, index)
return y_dense
else:
# Anomaly/changepoint detection case
y_dense = BaseDetector._sparse_points_to_dense(y_sparse, index)
return y_dense
@staticmethod
def _sparse_points_to_dense(y_sparse, index):
"""Label the indexes in ``index`` if they are in ``y_sparse``.
Parameters
----------
y_sparse: pd.Series
The values of the series must be the indexes of changepoints/anomalies.
index: array-like
Array of indexes that are to be labelled according to ``y_sparse``.
Returns
-------
pd.Series
A series with an index of ``index``. Its values are 1 if the index is in
y_sparse and 0 otherwise.
"""
y_dense = pd.Series(np.zeros(len(index)), index=index, dtype="int64")
y_dense[y_sparse.values] = 1
return y_dense
@staticmethod
def _sparse_segments_to_dense(y_sparse, index):
"""Find the label for each index in ``index`` from sparse segments.
Parameters
----------
y_sparse : pd.Series
A sparse representation of segments. The index must be the pandas interval
datatype and the values must be the integer labels of the segments.
index : array-like
List of indexes that are to be labelled according to ``y_sparse``.
Returns
-------
pd.Series
A series with the same index as ``index`` where each index is labelled
according to ``y_sparse``. Indexes that do not fall within any index are
labelled -1.
"""
if y_sparse.index.is_overlapping:
raise NotImplementedError(
"Cannot convert overlapping segments to a dense format yet."
)
interval_indexes = y_sparse.index.get_indexer(index)
# Negative indexes do not fall within any interval so they are ignored
interval_labels = y_sparse.iloc[
interval_indexes[interval_indexes >= 0]
].to_numpy()
# -1 is used to represent points do not fall within a segment
labels_dense = interval_indexes.copy()
labels_dense[labels_dense >= 0] = interval_labels
y_dense = pd.Series(labels_dense, index=index)
return y_dense
[docs]
@staticmethod
def dense_to_sparse(y_dense):
"""Convert the dense output from an detector to a sparse format.
Parameters
----------
y_dense : pd.Series
* If ``y_sparse`` contains only 1's and 0's, the 1's represent change
points or anomalies.
* If ``y_sparse`` contains only contains integers greater than 0, it is an
an array of segments.
Returns
-------
pd.Series
* If ``y_sparse`` is a series of changepoints/anomalies, a pandas series
will be returned containing the indexes of the changepoints/anomalies
* If ``y_sparse`` is a series of segments, a series with an interval
datatype index will be returned. The values of the series will be the
labels of segments.
"""
if isinstance(y_dense, pd.DataFrame):
y_sparse = y_dense.iloc[:, 0]
if not isinstance(y_dense, pd.Series):
y_dense = pd.Series(y_dense, dtype="int64")
if 0 in y_dense.values:
# y_dense is a series of change points
change_points = np.where(y_dense.values != 0)[0]
return pd.Series(change_points, dtype="int64")
else:
segment_start_indexes = np.where(y_dense.diff() != 0)[0]
segment_end_indexes = np.roll(segment_start_indexes, -1)
# The final index is always the end of a segment
segment_end_indexes[-1] = y_dense.index[-1]
segment_labels = y_dense.iloc[segment_start_indexes].to_numpy()
interval_index = pd.IntervalIndex.from_arrays(
segment_start_indexes, segment_end_indexes, closed="left"
)
y_sparse = pd.Series(segment_labels, index=interval_index)
# -1 represents unclassified regions so we remove them
y_sparse = y_sparse.loc[y_sparse != -1]
return y_sparse
@staticmethod
def _empty_sparse():
"""Return an empty sparse series in indicator format.
Returns
-------
pd.Series
An empty series with a RangeIndex.
"""
return pd.Series(index=pd.RangeIndex(0), dtype="int64")
@staticmethod
def _empty_segments():
"""Return an empty sparse series in segmentation format.
Returns
-------
pd.Series
An empty series with an IntervalIndex.
"""
return pd.Series(index=pd.IntervalIndex([]), dtype="int64")
@staticmethod
def change_points_to_segments(y_sparse, start=None, end=None):
"""Convert an series of change point indexes to segments.
Parameters
----------
y_sparse : pd.Series of int, sorted ascendingly
A series containing the iloc indexes of change points.
start : optional, default=0
Starting point of the first segment.
Must be before the first change point, i.e., < y_sparse[0].
end : optional, default=y_sparse[-1] + 1
End point of the last segment.
Must be after the last change point, i.e., > y_sparse[-1].
Returns
-------
pd.Series
A series with an interval index indicating the start and end points of the
segments. The values of the series are the labels of the segments.
Examples
--------
>>> import pandas as pd
>>> from sktime.detection.base import BaseDetector
>>> change_points = pd.Series([1, 2, 5])
>>> BaseDetector.change_points_to_segments(change_points, 0, 7)
[0, 1) 0
[1, 2) 1
[2, 5) 2
[5, 7) 3
dtype: int64
"""
if len(y_sparse) == 0:
return BaseDetector._empty_segments()
breaks = y_sparse.values
if start is not None and start > breaks.min():
raise ValueError("The start index must be before the first change point.")
if end is not None and end < breaks.max():
raise ValueError("The end index must be after the last change point.")
if start is None:
start = 0
if end is None:
end = breaks[-1] + 1
breaks = np.insert(breaks, 0, start)
breaks = np.append(breaks, end)
index = pd.IntervalIndex.from_breaks(breaks, copy=True, closed="left")
segments = pd.Series(0, index=index)
in_range = index.left >= start
number_of_segments = in_range.sum()
segments.loc[in_range] = range(0, number_of_segments)
return segments
@staticmethod
def segments_to_change_points(y_sparse):
"""Convert segments to change points.
Parameters
----------
y_sparse : pd.DataFrame
A series of segments. The index must be the interval data type and the
values should be the integer labels of the segments.
Returns
-------
pd.Series
A series containing the indexes of the start of each segment.
Index is RangeIndex, and values are iloc references to the start of each
segment.
Examples
--------
>>> import pandas as pd
>>> from sktime.detection.base import BaseDetector
>>> segments = pd.Series(
... [3, -1, 2],
... index=pd.IntervalIndex.from_breaks([2, 5, 7, 9], closed="left")
... )
>>> BaseDetector.segments_to_change_points(segments)
0 2
1 5
2 7
dtype: int64
"""
if len(y_sparse) == 0:
return BaseDetector._empty_sparse()
change_points = pd.Series(y_sparse.index.left)
return change_points
class BaseSeriesAnnotator(BaseDetector):
"""Base class for time series detectors - DEPRECATED - use BaseDetector instead."""
def __init__(self):
super().__init__()
warn(
"Warning: BaseSeriesAnnotator is deprecated. "
"Extension developers should use BaseDetector instead, "
"from sktime.detection.base, this is a replacement with "
"equivalent functionality. "
"The BaseSeriesAnnotator will be removed in the 0.37.0 release.",
stacklevel=2,
)
# todo 0.37.0: remove this
def _method_has_arg(method, arg="y"):
"""Return if transformer.method has a parameter, and whether it has a default.
Parameters
----------
method : callable
method to check
arg : str, optional, default="y"
parameter name to check
Returns
-------
has_param : bool
whether the method ``method`` has a parameter with name ``arg``
"""
from inspect import signature
method_params = list(signature(method).parameters.keys())
return arg in method_params