Module tsflex.features.segmenter.strided_rolling
Withholds a (rather) fast implementation of an index-based strided rolling window.
TODO
Look into the implementation of a new func-input-data-type that is a Tuple[index, values]. This should be multitudes faster than using the series-datatype and the user can still leverage the index-awareness of the values.
Expand source code
"""
Withholds a (rather) fast implementation of an **index-based** strided rolling window.
.. TODO::
Look into the implementation of a new func-input-data-type that is a
Tuple[index, values]. This should be multitudes faster than using the
series-datatype and the user can still leverage the index-awareness of the values.
"""
from __future__ import annotations
__author__ = "Jonas Van Der Donckt, Jeroen Van Der Donckt"
import time
import warnings
from abc import ABC, abstractmethod
from collections import namedtuple
from typing import List, Optional, Tuple, TypeVar, Union
import numpy as np
import pandas as pd
from ...utils.argument_parsing import timedelta_to_str
from ...utils.attribute_parsing import AttributeParser, DataType
from ...utils.data import SUPPORTED_STROLL_TYPES, to_list, to_series_list, to_tuple
from ..function_wrapper import FuncWrapper
from ..utils import (
_check_start_end_array,
_determine_bounds,
_log_func_execution,
_process_func_output,
)
# Declare a type variable
T = TypeVar("T", int, float, pd.Timedelta)
class StridedRolling(ABC):
"""Custom time-based sliding window with stride.
Parameters
----------
data : Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]]
``pd.Series`` or ``pd.DataFrame`` to slide over, the index must be either
numeric or a ``pd.DatetimeIndex``.
window : Union[float, pd.Timedelta]
Either an int, float, or ``pd.Timedelta``, representing the sliding window size
in terms of the index (in case of a int or float) or the sliding window duration
(in case of ``pd.Timedelta``).
strides : Union[float, pd.Timedelta, List[Union[float, pd.Timedelta]]], optional
Either a list of int, float, or ``pd.Timedelta``, representing the stride sizes
in terms of the index (in case of a int or float) or the stride duration (in
case of ``pd.Timedelta``). By default None.
segment_start_idxs: np.ndarray, optional
The start indices for the segmented windows. If not provided, the start indices
will be computed from the data using the passed ``strides`` or by using the
``segment_end_idxs`` (if not none) + ``window``. By default None.
segment_end_idxs: np.ndarray, optional
The end indices for the segmented windows. If not provided, the end indices will
be computed from either (1) the data using the passed ``window`` + ``strides``
or (2) the ``segment_start_idxs`` + ``window``, By default None.
.. Note::
When you pass arrays to both ``segment_start_idxs`` and
``segment_end_idxs``, the corresponding index-values of these arrays will be
used as segment-ranges. As a result, the following properties must be met:\n
- both arrays should have equal length
- all values in ``segment_start_idxs`` should be <= ``segment_end_idxs``
start_idx: Union[float, pd.Timestamp], optional
The start-index which will be used for each series passed to `data`. This is
especially useful if multiple ``StridedRolling`` instances are created and the
user want to ensure same (start-)indexes for each of them.
end_idx: Union[float, pd.Timestamp], optional
The end-index which will be used as sliding end-limit for each series passed to
`data`.
func_data_type: Union[np.array, pd.Series], optional
The data type of the stroll (either np.array or pd.Series), by default np.array.
<br>
.. Note::
Make sure to only set this argument to pd.Series when this is really
required, since pd.Series strided-rolling is significantly less efficient.
For a np.array it is possible to create very efficient views, but there is
no such thing as a pd.Series view. Thus, for each stroll, a new series is
created, inducing a lot of non-feature calculation of overhead.
window_idx : str, optional
The window's index position which will be used as index for the
feature_window aggregation. Must be either of: `["begin", "middle", "end"]`, by
default "end".
include_final_window: bool, optional
Whether the final (possibly incomplete) window should be included in the
strided-window segmentation, by default False.
.. Note::
The remarks below apply when ``include_final_window`` is set to True.
The user should be aware that the last window *might* be incomplete, i.e.;
- when equally sampled, the last window *might* be smaller than the
the other windows.
- when not equally sampled, the last window *might* not include all the
data points (as the begin-time + window-size comes after the last data
point).
Note, that when equally sampled, the last window *will* be a full window
when:
- the stride is the sampling rate of the data (or stride = 1 for
sample-based configurations).<br>
**Remark**: that when `include_final_window` is set to False, the last
window (which is a full) window will not be included!
- *(len * sampling_rate - window_size) % stride = 0*. Remark that the above
case is a base case of this.
approve_sparsity: bool, optional
Bool indicating whether the user acknowledges that there may be sparsity (i.e.,
irregularly sampled data), by default False.
If False and sparsity is observed, a warning is raised.
Notes
-----
* This instance withholds a **read-only**-view of the data its values.
"""
# Class variables which are used by subclasses
win_str_type: DataType
reset_series_index_b4_segmenting: bool = False
OUTSIDE_DATA_BOUNDS_WARNING: str = (
"Some segment indexes are outside the range of the data its index."
)
# Create the named tuple
_NumpySeriesContainer = namedtuple( # type: ignore[name-match]
"SeriesContainer", ["name", "values", "start_indexes", "end_indexes"]
)
def __init__(
self,
data: Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]],
window: Optional[T],
strides: Optional[Union[T, List[T]]] = None,
segment_start_idxs: Optional[np.ndarray] = None,
segment_end_idxs: Optional[np.ndarray] = None,
start_idx: Optional[T] = None,
end_idx: Optional[T] = None,
func_data_type: Union[np.ndarray, pd.Series] = np.ndarray,
window_idx: str = "end",
include_final_window: bool = False,
approve_sparsity: bool = False,
):
if strides is not None:
strides = to_list(strides)
# Check the passed segment indices
if segment_start_idxs is not None and segment_end_idxs is not None:
_check_start_end_array(segment_start_idxs, segment_end_idxs)
if window is not None:
assert AttributeParser.check_expected_type(
[window] + ([] if strides is None else strides), self.win_str_type
)
self.window = window # type: ignore[var-annotated]
self.strides = strides # type: ignore[var-annotated]
self.window_idx = window_idx
self.include_final_window = include_final_window
self.approve_sparsity = approve_sparsity
assert func_data_type in SUPPORTED_STROLL_TYPES
self.data_type = func_data_type
# 0. Standardize the input
series_list: List[pd.Series] = to_series_list(data)
self.series_dtype = AttributeParser.determine_type(series_list)
self.series_key: Tuple[str, ...] = tuple([str(s.name) for s in series_list])
# 1. Determine the start index
self.start, self.end = start_idx, end_idx # type: ignore[var-annotated]
if self.start is None or self.end is None:
# We always pass start_idx and end_idx from the FeatureCollection.calculate
# Hence, this code is only useful for testing purposes
start, end = _determine_bounds("inner", series_list)
# update self.start & self.end if it was not passed
self.start = start if self.start is None else self.start
self.end = end if self.end is None else self.end
# Especially useful when the index dtype differs from the win-stride-dtype
# e.g. -> performing a int-based stroll on time-indexed data
# Note: this is very niche and thus requires advanced knowledge
# TODO: this code can be omitted if we remove TimeIndexSampleStridedRolling
self._update_start_end_indices_to_stroll_type(series_list)
# 2. Construct the index ranges
# Either use the passed segment indices or compute the start or end times of the
# segments. The segment indices have precedence over the stride (and window) for
# index computation.
if segment_start_idxs is not None or segment_end_idxs is not None:
self.strides = None
if segment_start_idxs is not None and segment_end_idxs is not None:
# When both the start and end points are passed, the window does not
# matter.
self.window = None
np_start_times = self._parse_segment_idxs(segment_start_idxs)
np_end_times = self._parse_segment_idxs(segment_end_idxs)
elif segment_start_idxs is not None: # segment_end_idxs is None
np_start_times = self._parse_segment_idxs(segment_start_idxs)
np_end_times = np_start_times + self._get_np_value(self.window)
else: # segment_end_idxs is not None and segment_start_idxs is None
np_end_times = self._parse_segment_idxs(segment_end_idxs) # type: ignore[arg-type]
np_start_times = np_end_times - self._get_np_value(self.window)
else:
np_start_times = self._construct_start_idxs()
np_end_times = np_start_times + self._get_np_value(self.window)
# Check the numpy start and end indices
_check_start_end_array(np_start_times, np_end_times)
# 3. Create a new-index which will be used for DataFrame reconstruction
# Note: the index-name of the first passed series will be re-used as index-name
self.index = self._get_output_index(
np_start_times, np_end_times, name=series_list[0].index.name
)
# 4. Store the series containers
self.series_containers = self._construct_series_containers(
series_list, np_start_times, np_end_times
)
# 5. Check the sparsity assumption
if not self.approve_sparsity and len(self.index):
for container in self.series_containers:
# Warn when min != max
if np.ptp(container.end_indexes - container.start_indexes) != 0:
warnings.warn(
f"There are gaps in the sequence of the {container.name}"
f"-series!",
RuntimeWarning,
)
def _calc_nb_segments_for_stride(self, stride: T) -> int:
"""Calculate the number of output items (segments) for a given single stride."""
assert self.start is not None and self.end is not None # for mypy
nb_feats = max((self.end - self.start - self.window) // stride + 1, 0)
# Add 1 if there is still some data after (including) the last window its
# start index - this is only added when `include_last_window` is True.
nb_feats += self.include_final_window * (
self.start + stride * nb_feats <= self.end
)
return nb_feats
def _get_np_start_idx_for_stride(self, stride: T) -> np.ndarray:
"""Compute the start index for the given single stride."""
# ---------- Efficient numpy code -------
np_start = self._get_np_value(self.start)
np_stride = self._get_np_value(stride)
# Compute the start times (these remain the same for each series)
return np.arange(
start=np_start,
stop=np_start + self._calc_nb_segments_for_stride(stride) * np_stride,
step=np_stride,
)
def _construct_start_idxs(self) -> np.ndarray:
"""Construct the start indices of the segments (for all stride values).
To realize this, we compute the start idxs for each stride and then merge them
together (without duplicates) in a sorted array.
"""
start_idxs = []
for stride in self.strides:
start_idxs += [self._get_np_start_idx_for_stride(stride)]
# note - np.unique also sorts the array
return np.unique(np.concatenate(start_idxs))
def _get_output_index(
self, start_idxs: np.ndarray, end_idxs: np.ndarray, name: str
) -> pd.Index:
"""Construct the output index."""
if self.window_idx == "end":
return pd.Index(end_idxs, name=name)
elif self.window_idx == "middle":
return pd.Index(
start_idxs + ((end_idxs - start_idxs) / 2),
name=name,
)
elif self.window_idx == "begin":
return pd.Index(start_idxs, name=name)
else:
raise ValueError(
f"window index {self.window_idx} must be either of: "
"['end', 'middle', 'begin']"
)
def _construct_series_containers(
self,
series_list: List[pd.Series],
np_start_times: np.ndarray,
np_end_times: np.ndarray,
) -> List[StridedRolling._NumpySeriesContainer]:
series_containers: List[StridedRolling._NumpySeriesContainer] = []
for series in series_list:
if not self.reset_series_index_b4_segmenting:
np_idx_times = series.index.values
else:
np_idx_times = np.arange(len(series))
# note: using pd.RangeIndex instead of arange gives the same performance
series_name = series.name
if self.data_type is np.ndarray: # FuncWrapper.input_type is np.ndarray
# create a non-writeable view of the series
series = series.values # np.array will be stored in the SeriesContainer
series.flags.writeable = False
elif self.data_type is pd.Series: # FuncWrapper.input_type is pd.Series
# pd.Series will be stored in the SeriesContainer
series.values.flags.writeable = False
series.index.values.flags.writeable = False
else:
raise ValueError("unsupported datatype")
series_containers.append(
StridedRolling._NumpySeriesContainer(
name=series_name,
values=series,
# the slicing will be performed on [ t_start, t_end [
# TODO: this can maybe be optimized -> further look into this
# np_idx_times, np_start_times, & np_end_times are all sorted!
# as we assume & check that the time index is monotonically
# increasing & the latter 2 are created using `np.arange()`
start_indexes=np.searchsorted(np_idx_times, np_start_times, "left"),
end_indexes=np.searchsorted(np_idx_times, np_end_times, "left"),
)
)
return series_containers
def apply_func(self, func: FuncWrapper) -> pd.DataFrame:
"""Apply a function to the segmented series.
Parameters
----------
func : FuncWrapper
The Callable wrapped function which will be applied.
Returns
-------
pd.DataFrame
The merged output of the function applied to every column in a
new DataFrame. The DataFrame's column-names have the format:
`<series_col_name(s)>_<feature_name>__w=<window>`.
Raises
------
ValueError
If the passed ``func`` tries to adjust the data its read-only view.
Notes
-----
* If ``func`` is only a callable argument, with no additional logic, this
will only work for a one-to-one mapping, i.e., no multiple feature-output
columns are supported for this case!<br>
* If you want to calculate one-to-many, ``func`` should be
a ``FuncWrapper`` instance and explicitly use
the ``output_names`` attributes of its constructor.
"""
feat_names = func.output_names
t_start = time.perf_counter()
# --- Future work ---
# would be nice if we could optimize this double for loop with something
# more vectorized
#
# As for now we use a map to apply the function (as this evaluates its
# expression only once, whereas a list comprehension evaluates its expression
# every time).
# See more why: https://stackoverflow.com/a/59838723
out: np.ndarray
if func.vectorized:
# Vectorized function execution
## IMPL 1
## Results in a high memory peak as a new np.array is created (and thus no
## view is being used)
# out = np.asarray(
# func(
# *[
# np.array([
# sc.values[sc.start_indexes[idx]: sc.end_indexes[idx]]
# for idx in range(len(self.index))
# ])
# for sc in self.series_containers
# ],
# )
# )
## IMPL 2
## Is a good implementation (equivalent to the one below), will also fail in
## the same cases, but it does not perform clear assertions (with their
## accompanied clear messages).
# out = np.asarray(
# func(
# *[
# _sliding_strided_window_1d(sc.values, self.window, self.stride)
# for sc in self.series_containers
# ],
# )
# )
views: List[np.ndarray] = []
for sc in self.series_containers:
if len(sc.start_indexes) == 0:
# There are no feature windows -> return empty array (see below)
views = []
break
elif len(sc.start_indexes) == 1:
# There is only 1 feature window (bc no steps in the sliding window)
views.append(
np.expand_dims(
sc.values[sc.start_indexes[0] : sc.end_indexes[0]],
axis=0,
)
)
else:
# There are >1 feature windows (bc >=1 steps in the sliding window)
windows = sc.end_indexes - sc.start_indexes
strides = sc.start_indexes[1:] - sc.start_indexes[:-1]
assert np.all(windows == windows[0]), (
"Vectorized functions require same number of samples in each "
+ "segmented window!"
)
assert np.all(
strides == strides[0]
), "Vectorized functions require same number of samples as stride!"
views.append(
_sliding_strided_window_1d(
sc.values[sc.start_indexes[0] :],
windows[0],
strides[0],
len(self.index),
)
)
# Assign empty array as output when there is no view to apply the vectorized
# function on (this is the case when there is at least for one series no
# feature windows)
out = func(*views) if len(views) >= 1 else np.array([])
out_type = type(out)
out = np.asarray(out)
# When multiple outputs are returned (= tuple) they should be transposed
# when combining into an array
out = out.T if out_type is tuple else out
else:
# Function execution over slices (default)
out = np.array(
list(
map(
func,
*[
[
sc.values[sc.start_indexes[idx] : sc.end_indexes[idx]]
for idx in range(len(self.index))
]
for sc in self.series_containers
],
)
)
)
# Check if the function output is valid.
# This assertion will be raised when e.g. np.max is applied vectorized without
# specifying axis=1.
assert out.ndim > 0, "Vectorized function returned only 1 (non-array) value!"
# Aggregate function output in a dictionary
output_names = list(map(self._create_feat_col_name, feat_names))
feat_out = _process_func_output(out, self.index, output_names, str(func))
# Log the function execution time
log_strides = (
"manual" if self.strides is None else tuple(map(str, self.strides))
)
log_window = "manual" if self.window is None else self.window
_log_func_execution(
t_start, func, self.series_key, log_window, log_strides, output_names # type: ignore[arg-type]
)
return pd.DataFrame(feat_out, index=self.index)
# --------------------------------- STATIC METHODS ---------------------------------
@staticmethod
def _get_np_value(val: Union[np.number, pd.Timestamp, pd.Timedelta]) -> np.number:
# Convert everything to int64
if isinstance(val, pd.Timestamp):
return val.to_datetime64()
elif isinstance(val, pd.Timedelta):
return val.to_timedelta64()
else:
return val
@staticmethod
def construct_output_index(
series_keys: Union[str, Tuple[str, ...]], feat_name: str, win_str: str
) -> str:
series_keys = to_tuple(series_keys)
return f"{'|'.join(series_keys)}__{feat_name}__w={win_str}"
# ----------------------------- OVERRIDE THESE METHODS -----------------------------
@abstractmethod
def _update_start_end_indices_to_stroll_type(
self, series_list: List[pd.Series]
) -> None:
# NOTE: This method will only be implemented (with code != pass) in the
# TimeIndexSampleStridedRolling
raise NotImplementedError
@abstractmethod
def _parse_segment_idxs(self, segment_idxs: np.ndarray) -> np.ndarray:
"""Check the segment indexes array to lie between self.start and self.end and
convert it to the correct dtype (if necessary)."""
raise NotImplementedError
@abstractmethod
def _create_feat_col_name(self, feat_name: str) -> str:
raise NotImplementedError
class SequenceStridedRolling(StridedRolling):
def __init__( # type: ignore[no-untyped-def]
self,
data: Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]],
window: Union[int, float],
strides: Optional[Union[int, float, List[int], List[float]]] = None,
*args,
**kwargs,
):
# Set the data type & call the super constructor
self.win_str_type = DataType.SEQUENCE
super().__init__(data, window, strides, *args, **kwargs)
# ------------------------------- Overridden methods -------------------------------
def _update_start_end_indices_to_stroll_type(
self, series_list: List[pd.Series]
) -> None:
pass
def _parse_segment_idxs(self, segment_idxs: np.ndarray) -> np.ndarray:
if any((segment_idxs < self.start) | (segment_idxs > self.end)):
warnings.warn(self.OUTSIDE_DATA_BOUNDS_WARNING, RuntimeWarning)
return segment_idxs
def _create_feat_col_name(self, feat_name: str) -> str:
if self.window is not None:
win_str = str(self.window)
else:
win_str = "manual"
return self.construct_output_index(
series_keys=self.series_key, feat_name=feat_name, win_str=win_str
)
class TimeStridedRolling(StridedRolling):
def __init__( # type: ignore[no-untyped-def]
self,
data: Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]],
window: pd.Timedelta,
strides: Optional[Union[pd.Timedelta, List[pd.Timedelta]]] = None,
*args,
**kwargs,
):
# Check that each series / dataframe has the same tz
data = to_series_list(data)
tz_index = data[0].index.tz
for data_entry in to_series_list(data)[1:]:
assert (
data_entry.index.tz == tz_index
), "strided rolling input data must all have same timezone"
self._tz_index = tz_index
# Set the data type & call the super constructor
self.win_str_type = DataType.TIME
super().__init__(data, window, strides, *args, **kwargs)
# -------------------------------- Extended methods --------------------------------
def _get_output_index(
self, start_idxs: np.ndarray, end_idxs: np.ndarray, name: str
) -> pd.Index:
assert start_idxs.dtype.type == np.datetime64
assert end_idxs.dtype.type == np.datetime64
if not len(start_idxs): # to fix "datetime64 values must have a unit specified"
assert not len(end_idxs)
start_idxs = start_idxs.astype("datetime64[ns]")
end_idxs = end_idxs.astype("datetime64[ns]")
start_idxs = pd.to_datetime(start_idxs, utc=True).tz_convert(self._tz_index)
end_idxs = pd.to_datetime(end_idxs, utc=True).tz_convert(self._tz_index)
return super()._get_output_index(start_idxs, end_idxs, name)
# ------------------------------- Overridden methods -------------------------------
def _update_start_end_indices_to_stroll_type(
self, series_list: List[pd.Series]
) -> None:
pass
def _parse_segment_idxs(self, segment_idxs: np.ndarray) -> np.ndarray:
segment_idxs = segment_idxs.astype("datetime64")
start_, end_ = self.start, self.end
if start_.tz is not None:
# Convert to UTC (allowing comparison with the segment_idxs)
assert end_.tz is not None
start_ = start_.tz_convert(None)
end_ = end_.tz_convert(None)
if any((segment_idxs < start_) | (segment_idxs > end_)):
warnings.warn(self.OUTSIDE_DATA_BOUNDS_WARNING, RuntimeWarning)
return segment_idxs
def _create_feat_col_name(self, feat_name: str) -> str:
# Convert win to time-string if available :)
if self.window is not None:
win_str = timedelta_to_str(self.window)
else:
win_str = "manual"
return self.construct_output_index(
series_keys=self.series_key, feat_name=feat_name, win_str=win_str
)
class TimeIndexSampleStridedRolling(SequenceStridedRolling):
def __init__( # type: ignore[no-untyped-def]
self,
# TODO -> update arguments
data: Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]],
window: int,
strides: Optional[Union[int, List[int]]] = None,
segment_start_idxs: Optional[np.ndarray] = None,
segment_end_idxs: Optional[np.ndarray] = None,
*args,
**kwargs,
):
"""
.. Warning::
When `data` consists of multiple independently sampled series
(e.g. feature functions which take multiple series as input),
The time-**index of each series**: \n
- must _roughly_ **share** the same **sample frequency**.
- will be first time-aligned before transitioning to sample-segmentation by
using the inner bounds
.. Note::
`TimeIndexSampleStridedRolling` **does not support** the
``segment_start_idxs`` and ``segment_end_idxs`` arguments. Setting these
will raise a NotImplementedError.
"""
if segment_start_idxs is not None or segment_end_idxs is not None:
raise NotImplementedError(
"TimeIndexSampleStridedRolling is not implemented to support passing"
+ "segment_start_idxs or segment_end_idxs"
)
# We want to reset the index as its type differs from the passed win-stride
# configs
self.reset_series_index_b4_segmenting = True
series_list = to_series_list(data)
if isinstance(data, list) and len(data) > 1:
# Slice data into its inner range so that the start position
# is aligned (when we will use sample-based methodologies)
start, end = _determine_bounds("inner", series_list)
series_list = [s[start:end] for s in series_list]
kwargs.update({"start_idx": start, "end_idx": end})
# We retain the first series list to stitch back the output index
self._series_index = series_list[0].index
# pass the sliced series list instead of data
super().__init__(series_list, window, strides, *args, **kwargs)
assert self.series_dtype == DataType.TIME
# we want to assure that the window-stride arguments are integers (samples)
assert all(isinstance(p, int) for p in [self.window] + self.strides)
def apply_func(self, func: FuncWrapper) -> pd.DataFrame:
# Apply the function and stitch back the time-index
df = super().apply_func(func)
df.index = self._series_index[df.index]
return df
# ---------------------------- Overridden methods ------------------------------
def _update_start_end_indices_to_stroll_type(
self, series_list: List[pd.Series]
) -> None:
# update the start and end times to the sequence datatype
self.start, self.end = np.searchsorted(
series_list[0].index.values,
[self.start.to_datetime64(), self.end.to_datetime64()],
"left",
)
def _sliding_strided_window_1d(
data: np.ndarray, window: int, step: int, nb_segments: int
) -> np.ndarray:
"""View based sliding strided-window for 1-dimensional data.
Parameters
----------
data: np.array
The 1-dimensional series to slide over.
window: int
The window size, in number of samples.
step: int
The step size (i.e., the stride), in number of samples.
nb_segments: int
The number of sliding window steps, this is equal to the number of feature
windows.
Returns
-------
nd.array
A view of the sliding strided window of the data.
"""
# window and step in samples
assert data.ndim == 1, "data must be 1 dimensional"
assert isinstance(window, (int, np.integer)), "window must be an integer"
assert isinstance(step, (int, np.integer)), "step must be an integer"
assert (step >= 1) & (window < len(data))
shape = [
nb_segments,
window,
]
strides = [
data.strides[0] * step,
data.strides[0],
]
return np.lib.stride_tricks.as_strided(
data, shape=shape, strides=strides, writeable=False
)
Classes
class StridedRolling (data, window, strides=None, segment_start_idxs=None, segment_end_idxs=None, start_idx=None, end_idx=None, func_data_type=numpy.ndarray, window_idx='end', include_final_window=False, approve_sparsity=False)
-
Expand source code
class StridedRolling(ABC): """Custom time-based sliding window with stride. Parameters ---------- data : Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]] ``pd.Series`` or ``pd.DataFrame`` to slide over, the index must be either numeric or a ``pd.DatetimeIndex``. window : Union[float, pd.Timedelta] Either an int, float, or ``pd.Timedelta``, representing the sliding window size in terms of the index (in case of a int or float) or the sliding window duration (in case of ``pd.Timedelta``). strides : Union[float, pd.Timedelta, List[Union[float, pd.Timedelta]]], optional Either a list of int, float, or ``pd.Timedelta``, representing the stride sizes in terms of the index (in case of a int or float) or the stride duration (in case of ``pd.Timedelta``). By default None. segment_start_idxs: np.ndarray, optional The start indices for the segmented windows. If not provided, the start indices will be computed from the data using the passed ``strides`` or by using the ``segment_end_idxs`` (if not none) + ``window``. By default None. segment_end_idxs: np.ndarray, optional The end indices for the segmented windows. If not provided, the end indices will be computed from either (1) the data using the passed ``window`` + ``strides`` or (2) the ``segment_start_idxs`` + ``window``, By default None. .. Note:: When you pass arrays to both ``segment_start_idxs`` and ``segment_end_idxs``, the corresponding index-values of these arrays will be used as segment-ranges. As a result, the following properties must be met:\n - both arrays should have equal length - all values in ``segment_start_idxs`` should be <= ``segment_end_idxs`` start_idx: Union[float, pd.Timestamp], optional The start-index which will be used for each series passed to `data`. This is especially useful if multiple ``StridedRolling`` instances are created and the user want to ensure same (start-)indexes for each of them. end_idx: Union[float, pd.Timestamp], optional The end-index which will be used as sliding end-limit for each series passed to `data`. func_data_type: Union[np.array, pd.Series], optional The data type of the stroll (either np.array or pd.Series), by default np.array. <br> .. Note:: Make sure to only set this argument to pd.Series when this is really required, since pd.Series strided-rolling is significantly less efficient. For a np.array it is possible to create very efficient views, but there is no such thing as a pd.Series view. Thus, for each stroll, a new series is created, inducing a lot of non-feature calculation of overhead. window_idx : str, optional The window's index position which will be used as index for the feature_window aggregation. Must be either of: `["begin", "middle", "end"]`, by default "end". include_final_window: bool, optional Whether the final (possibly incomplete) window should be included in the strided-window segmentation, by default False. .. Note:: The remarks below apply when ``include_final_window`` is set to True. The user should be aware that the last window *might* be incomplete, i.e.; - when equally sampled, the last window *might* be smaller than the the other windows. - when not equally sampled, the last window *might* not include all the data points (as the begin-time + window-size comes after the last data point). Note, that when equally sampled, the last window *will* be a full window when: - the stride is the sampling rate of the data (or stride = 1 for sample-based configurations).<br> **Remark**: that when `include_final_window` is set to False, the last window (which is a full) window will not be included! - *(len * sampling_rate - window_size) % stride = 0*. Remark that the above case is a base case of this. approve_sparsity: bool, optional Bool indicating whether the user acknowledges that there may be sparsity (i.e., irregularly sampled data), by default False. If False and sparsity is observed, a warning is raised. Notes ----- * This instance withholds a **read-only**-view of the data its values. """ # Class variables which are used by subclasses win_str_type: DataType reset_series_index_b4_segmenting: bool = False OUTSIDE_DATA_BOUNDS_WARNING: str = ( "Some segment indexes are outside the range of the data its index." ) # Create the named tuple _NumpySeriesContainer = namedtuple( # type: ignore[name-match] "SeriesContainer", ["name", "values", "start_indexes", "end_indexes"] ) def __init__( self, data: Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]], window: Optional[T], strides: Optional[Union[T, List[T]]] = None, segment_start_idxs: Optional[np.ndarray] = None, segment_end_idxs: Optional[np.ndarray] = None, start_idx: Optional[T] = None, end_idx: Optional[T] = None, func_data_type: Union[np.ndarray, pd.Series] = np.ndarray, window_idx: str = "end", include_final_window: bool = False, approve_sparsity: bool = False, ): if strides is not None: strides = to_list(strides) # Check the passed segment indices if segment_start_idxs is not None and segment_end_idxs is not None: _check_start_end_array(segment_start_idxs, segment_end_idxs) if window is not None: assert AttributeParser.check_expected_type( [window] + ([] if strides is None else strides), self.win_str_type ) self.window = window # type: ignore[var-annotated] self.strides = strides # type: ignore[var-annotated] self.window_idx = window_idx self.include_final_window = include_final_window self.approve_sparsity = approve_sparsity assert func_data_type in SUPPORTED_STROLL_TYPES self.data_type = func_data_type # 0. Standardize the input series_list: List[pd.Series] = to_series_list(data) self.series_dtype = AttributeParser.determine_type(series_list) self.series_key: Tuple[str, ...] = tuple([str(s.name) for s in series_list]) # 1. Determine the start index self.start, self.end = start_idx, end_idx # type: ignore[var-annotated] if self.start is None or self.end is None: # We always pass start_idx and end_idx from the FeatureCollection.calculate # Hence, this code is only useful for testing purposes start, end = _determine_bounds("inner", series_list) # update self.start & self.end if it was not passed self.start = start if self.start is None else self.start self.end = end if self.end is None else self.end # Especially useful when the index dtype differs from the win-stride-dtype # e.g. -> performing a int-based stroll on time-indexed data # Note: this is very niche and thus requires advanced knowledge # TODO: this code can be omitted if we remove TimeIndexSampleStridedRolling self._update_start_end_indices_to_stroll_type(series_list) # 2. Construct the index ranges # Either use the passed segment indices or compute the start or end times of the # segments. The segment indices have precedence over the stride (and window) for # index computation. if segment_start_idxs is not None or segment_end_idxs is not None: self.strides = None if segment_start_idxs is not None and segment_end_idxs is not None: # When both the start and end points are passed, the window does not # matter. self.window = None np_start_times = self._parse_segment_idxs(segment_start_idxs) np_end_times = self._parse_segment_idxs(segment_end_idxs) elif segment_start_idxs is not None: # segment_end_idxs is None np_start_times = self._parse_segment_idxs(segment_start_idxs) np_end_times = np_start_times + self._get_np_value(self.window) else: # segment_end_idxs is not None and segment_start_idxs is None np_end_times = self._parse_segment_idxs(segment_end_idxs) # type: ignore[arg-type] np_start_times = np_end_times - self._get_np_value(self.window) else: np_start_times = self._construct_start_idxs() np_end_times = np_start_times + self._get_np_value(self.window) # Check the numpy start and end indices _check_start_end_array(np_start_times, np_end_times) # 3. Create a new-index which will be used for DataFrame reconstruction # Note: the index-name of the first passed series will be re-used as index-name self.index = self._get_output_index( np_start_times, np_end_times, name=series_list[0].index.name ) # 4. Store the series containers self.series_containers = self._construct_series_containers( series_list, np_start_times, np_end_times ) # 5. Check the sparsity assumption if not self.approve_sparsity and len(self.index): for container in self.series_containers: # Warn when min != max if np.ptp(container.end_indexes - container.start_indexes) != 0: warnings.warn( f"There are gaps in the sequence of the {container.name}" f"-series!", RuntimeWarning, ) def _calc_nb_segments_for_stride(self, stride: T) -> int: """Calculate the number of output items (segments) for a given single stride.""" assert self.start is not None and self.end is not None # for mypy nb_feats = max((self.end - self.start - self.window) // stride + 1, 0) # Add 1 if there is still some data after (including) the last window its # start index - this is only added when `include_last_window` is True. nb_feats += self.include_final_window * ( self.start + stride * nb_feats <= self.end ) return nb_feats def _get_np_start_idx_for_stride(self, stride: T) -> np.ndarray: """Compute the start index for the given single stride.""" # ---------- Efficient numpy code ------- np_start = self._get_np_value(self.start) np_stride = self._get_np_value(stride) # Compute the start times (these remain the same for each series) return np.arange( start=np_start, stop=np_start + self._calc_nb_segments_for_stride(stride) * np_stride, step=np_stride, ) def _construct_start_idxs(self) -> np.ndarray: """Construct the start indices of the segments (for all stride values). To realize this, we compute the start idxs for each stride and then merge them together (without duplicates) in a sorted array. """ start_idxs = [] for stride in self.strides: start_idxs += [self._get_np_start_idx_for_stride(stride)] # note - np.unique also sorts the array return np.unique(np.concatenate(start_idxs)) def _get_output_index( self, start_idxs: np.ndarray, end_idxs: np.ndarray, name: str ) -> pd.Index: """Construct the output index.""" if self.window_idx == "end": return pd.Index(end_idxs, name=name) elif self.window_idx == "middle": return pd.Index( start_idxs + ((end_idxs - start_idxs) / 2), name=name, ) elif self.window_idx == "begin": return pd.Index(start_idxs, name=name) else: raise ValueError( f"window index {self.window_idx} must be either of: " "['end', 'middle', 'begin']" ) def _construct_series_containers( self, series_list: List[pd.Series], np_start_times: np.ndarray, np_end_times: np.ndarray, ) -> List[StridedRolling._NumpySeriesContainer]: series_containers: List[StridedRolling._NumpySeriesContainer] = [] for series in series_list: if not self.reset_series_index_b4_segmenting: np_idx_times = series.index.values else: np_idx_times = np.arange(len(series)) # note: using pd.RangeIndex instead of arange gives the same performance series_name = series.name if self.data_type is np.ndarray: # FuncWrapper.input_type is np.ndarray # create a non-writeable view of the series series = series.values # np.array will be stored in the SeriesContainer series.flags.writeable = False elif self.data_type is pd.Series: # FuncWrapper.input_type is pd.Series # pd.Series will be stored in the SeriesContainer series.values.flags.writeable = False series.index.values.flags.writeable = False else: raise ValueError("unsupported datatype") series_containers.append( StridedRolling._NumpySeriesContainer( name=series_name, values=series, # the slicing will be performed on [ t_start, t_end [ # TODO: this can maybe be optimized -> further look into this # np_idx_times, np_start_times, & np_end_times are all sorted! # as we assume & check that the time index is monotonically # increasing & the latter 2 are created using `np.arange()` start_indexes=np.searchsorted(np_idx_times, np_start_times, "left"), end_indexes=np.searchsorted(np_idx_times, np_end_times, "left"), ) ) return series_containers def apply_func(self, func: FuncWrapper) -> pd.DataFrame: """Apply a function to the segmented series. Parameters ---------- func : FuncWrapper The Callable wrapped function which will be applied. Returns ------- pd.DataFrame The merged output of the function applied to every column in a new DataFrame. The DataFrame's column-names have the format: `<series_col_name(s)>_<feature_name>__w=<window>`. Raises ------ ValueError If the passed ``func`` tries to adjust the data its read-only view. Notes ----- * If ``func`` is only a callable argument, with no additional logic, this will only work for a one-to-one mapping, i.e., no multiple feature-output columns are supported for this case!<br> * If you want to calculate one-to-many, ``func`` should be a ``FuncWrapper`` instance and explicitly use the ``output_names`` attributes of its constructor. """ feat_names = func.output_names t_start = time.perf_counter() # --- Future work --- # would be nice if we could optimize this double for loop with something # more vectorized # # As for now we use a map to apply the function (as this evaluates its # expression only once, whereas a list comprehension evaluates its expression # every time). # See more why: https://stackoverflow.com/a/59838723 out: np.ndarray if func.vectorized: # Vectorized function execution ## IMPL 1 ## Results in a high memory peak as a new np.array is created (and thus no ## view is being used) # out = np.asarray( # func( # *[ # np.array([ # sc.values[sc.start_indexes[idx]: sc.end_indexes[idx]] # for idx in range(len(self.index)) # ]) # for sc in self.series_containers # ], # ) # ) ## IMPL 2 ## Is a good implementation (equivalent to the one below), will also fail in ## the same cases, but it does not perform clear assertions (with their ## accompanied clear messages). # out = np.asarray( # func( # *[ # _sliding_strided_window_1d(sc.values, self.window, self.stride) # for sc in self.series_containers # ], # ) # ) views: List[np.ndarray] = [] for sc in self.series_containers: if len(sc.start_indexes) == 0: # There are no feature windows -> return empty array (see below) views = [] break elif len(sc.start_indexes) == 1: # There is only 1 feature window (bc no steps in the sliding window) views.append( np.expand_dims( sc.values[sc.start_indexes[0] : sc.end_indexes[0]], axis=0, ) ) else: # There are >1 feature windows (bc >=1 steps in the sliding window) windows = sc.end_indexes - sc.start_indexes strides = sc.start_indexes[1:] - sc.start_indexes[:-1] assert np.all(windows == windows[0]), ( "Vectorized functions require same number of samples in each " + "segmented window!" ) assert np.all( strides == strides[0] ), "Vectorized functions require same number of samples as stride!" views.append( _sliding_strided_window_1d( sc.values[sc.start_indexes[0] :], windows[0], strides[0], len(self.index), ) ) # Assign empty array as output when there is no view to apply the vectorized # function on (this is the case when there is at least for one series no # feature windows) out = func(*views) if len(views) >= 1 else np.array([]) out_type = type(out) out = np.asarray(out) # When multiple outputs are returned (= tuple) they should be transposed # when combining into an array out = out.T if out_type is tuple else out else: # Function execution over slices (default) out = np.array( list( map( func, *[ [ sc.values[sc.start_indexes[idx] : sc.end_indexes[idx]] for idx in range(len(self.index)) ] for sc in self.series_containers ], ) ) ) # Check if the function output is valid. # This assertion will be raised when e.g. np.max is applied vectorized without # specifying axis=1. assert out.ndim > 0, "Vectorized function returned only 1 (non-array) value!" # Aggregate function output in a dictionary output_names = list(map(self._create_feat_col_name, feat_names)) feat_out = _process_func_output(out, self.index, output_names, str(func)) # Log the function execution time log_strides = ( "manual" if self.strides is None else tuple(map(str, self.strides)) ) log_window = "manual" if self.window is None else self.window _log_func_execution( t_start, func, self.series_key, log_window, log_strides, output_names # type: ignore[arg-type] ) return pd.DataFrame(feat_out, index=self.index) # --------------------------------- STATIC METHODS --------------------------------- @staticmethod def _get_np_value(val: Union[np.number, pd.Timestamp, pd.Timedelta]) -> np.number: # Convert everything to int64 if isinstance(val, pd.Timestamp): return val.to_datetime64() elif isinstance(val, pd.Timedelta): return val.to_timedelta64() else: return val @staticmethod def construct_output_index( series_keys: Union[str, Tuple[str, ...]], feat_name: str, win_str: str ) -> str: series_keys = to_tuple(series_keys) return f"{'|'.join(series_keys)}__{feat_name}__w={win_str}" # ----------------------------- OVERRIDE THESE METHODS ----------------------------- @abstractmethod def _update_start_end_indices_to_stroll_type( self, series_list: List[pd.Series] ) -> None: # NOTE: This method will only be implemented (with code != pass) in the # TimeIndexSampleStridedRolling raise NotImplementedError @abstractmethod def _parse_segment_idxs(self, segment_idxs: np.ndarray) -> np.ndarray: """Check the segment indexes array to lie between self.start and self.end and convert it to the correct dtype (if necessary).""" raise NotImplementedError @abstractmethod def _create_feat_col_name(self, feat_name: str) -> str: raise NotImplementedError
Custom time-based sliding window with stride.
Parameters
data
:Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]]
pd.Series
orpd.DataFrame
to slide over, the index must be either numeric or apd.DatetimeIndex
.window
:Union[float, pd.Timedelta]
- Either an int, float, or
pd.Timedelta
, representing the sliding window size in terms of the index (in case of a int or float) or the sliding window duration (in case ofpd.Timedelta
). strides
:Union[float, pd.Timedelta, List[Union[float, pd.Timedelta]]]
, optional- Either a list of int, float, or
pd.Timedelta
, representing the stride sizes in terms of the index (in case of a int or float) or the stride duration (in case ofpd.Timedelta
). By default None. segment_start_idxs
:np.ndarray
, optional- The start indices for the segmented windows. If not provided, the start indices
will be computed from the data using the passed
strides
or by using thesegment_end_idxs
(if not none) +window
. By default None. segment_end_idxs
:np.ndarray
, optional- The end indices for the segmented windows. If not provided, the end indices will
be computed from either (1) the data using the passed
window
+strides
or (2) thesegment_start_idxs
+window
, By default None.Note
When you pass arrays to bothsegment_start_idxs
andsegment_end_idxs
, the corresponding index-values of these arrays will be used as segment-ranges. As a result, the following properties must be met:- both arrays should have equal length
- all values in
segment_start_idxs
should be <=segment_end_idxs
start_idx
:Union[float, pd.Timestamp]
, optional- The start-index which will be used for each series passed to
data
. This is especially useful if multipleStridedRolling
instances are created and the user want to ensure same (start-)indexes for each of them. end_idx
:Union[float, pd.Timestamp]
, optional- The end-index which will be used as sliding end-limit for each series passed to
data
. func_data_type
:Union[np.array, pd.Series]
, optional- The data type of the stroll (either np.array or pd.Series), by default np.array.
Note
Make sure to only set this argument to pd.Series when this is really required, since pd.Series strided-rolling is significantly less efficient. For a np.array it is possible to create very efficient views, but there is no such thing as a pd.Series view. Thus, for each stroll, a new series is created, inducing a lot of non-feature calculation of overhead. window_idx
:str
, optional- The window's index position which will be used as index for the
feature_window aggregation. Must be either of:
["begin", "middle", "end"]
, by default "end". include_final_window
:bool
, optional-
Whether the final (possibly incomplete) window should be included in the strided-window segmentation, by default False.
Note
The remarks below apply when
include_final_window
is set to True. The user should be aware that the last window might be incomplete, i.e.;- when equally sampled, the last window might be smaller than the the other windows.
- when not equally sampled, the last window might not include all the data points (as the begin-time + window-size comes after the last data point).
Note, that when equally sampled, the last window will be a full window when:
- the stride is the sampling rate of the data (or stride = 1 for
sample-based configurations).
Remark: that wheninclude_final_window
is set to False, the last window (which is a full) window will not be included! - (len * sampling_rate - window_size) % stride = 0. Remark that the above case is a base case of this.
approve_sparsity
:bool
, optional- Bool indicating whether the user acknowledges that there may be sparsity (i.e., irregularly sampled data), by default False. If False and sparsity is observed, a warning is raised.
Notes
- This instance withholds a read-only-view of the data its values.
Ancestors
- abc.ABC
Subclasses
Class variables
var win_str_type
var reset_series_index_b4_segmenting
var OUTSIDE_DATA_BOUNDS_WARNING
Static methods
def construct_output_index(series_keys, feat_name, win_str)
-
Expand source code
@staticmethod def construct_output_index( series_keys: Union[str, Tuple[str, ...]], feat_name: str, win_str: str ) -> str: series_keys = to_tuple(series_keys) return f"{'|'.join(series_keys)}__{feat_name}__w={win_str}"
Methods
def apply_func(self, func)
-
Expand source code
def apply_func(self, func: FuncWrapper) -> pd.DataFrame: """Apply a function to the segmented series. Parameters ---------- func : FuncWrapper The Callable wrapped function which will be applied. Returns ------- pd.DataFrame The merged output of the function applied to every column in a new DataFrame. The DataFrame's column-names have the format: `<series_col_name(s)>_<feature_name>__w=<window>`. Raises ------ ValueError If the passed ``func`` tries to adjust the data its read-only view. Notes ----- * If ``func`` is only a callable argument, with no additional logic, this will only work for a one-to-one mapping, i.e., no multiple feature-output columns are supported for this case!<br> * If you want to calculate one-to-many, ``func`` should be a ``FuncWrapper`` instance and explicitly use the ``output_names`` attributes of its constructor. """ feat_names = func.output_names t_start = time.perf_counter() # --- Future work --- # would be nice if we could optimize this double for loop with something # more vectorized # # As for now we use a map to apply the function (as this evaluates its # expression only once, whereas a list comprehension evaluates its expression # every time). # See more why: https://stackoverflow.com/a/59838723 out: np.ndarray if func.vectorized: # Vectorized function execution ## IMPL 1 ## Results in a high memory peak as a new np.array is created (and thus no ## view is being used) # out = np.asarray( # func( # *[ # np.array([ # sc.values[sc.start_indexes[idx]: sc.end_indexes[idx]] # for idx in range(len(self.index)) # ]) # for sc in self.series_containers # ], # ) # ) ## IMPL 2 ## Is a good implementation (equivalent to the one below), will also fail in ## the same cases, but it does not perform clear assertions (with their ## accompanied clear messages). # out = np.asarray( # func( # *[ # _sliding_strided_window_1d(sc.values, self.window, self.stride) # for sc in self.series_containers # ], # ) # ) views: List[np.ndarray] = [] for sc in self.series_containers: if len(sc.start_indexes) == 0: # There are no feature windows -> return empty array (see below) views = [] break elif len(sc.start_indexes) == 1: # There is only 1 feature window (bc no steps in the sliding window) views.append( np.expand_dims( sc.values[sc.start_indexes[0] : sc.end_indexes[0]], axis=0, ) ) else: # There are >1 feature windows (bc >=1 steps in the sliding window) windows = sc.end_indexes - sc.start_indexes strides = sc.start_indexes[1:] - sc.start_indexes[:-1] assert np.all(windows == windows[0]), ( "Vectorized functions require same number of samples in each " + "segmented window!" ) assert np.all( strides == strides[0] ), "Vectorized functions require same number of samples as stride!" views.append( _sliding_strided_window_1d( sc.values[sc.start_indexes[0] :], windows[0], strides[0], len(self.index), ) ) # Assign empty array as output when there is no view to apply the vectorized # function on (this is the case when there is at least for one series no # feature windows) out = func(*views) if len(views) >= 1 else np.array([]) out_type = type(out) out = np.asarray(out) # When multiple outputs are returned (= tuple) they should be transposed # when combining into an array out = out.T if out_type is tuple else out else: # Function execution over slices (default) out = np.array( list( map( func, *[ [ sc.values[sc.start_indexes[idx] : sc.end_indexes[idx]] for idx in range(len(self.index)) ] for sc in self.series_containers ], ) ) ) # Check if the function output is valid. # This assertion will be raised when e.g. np.max is applied vectorized without # specifying axis=1. assert out.ndim > 0, "Vectorized function returned only 1 (non-array) value!" # Aggregate function output in a dictionary output_names = list(map(self._create_feat_col_name, feat_names)) feat_out = _process_func_output(out, self.index, output_names, str(func)) # Log the function execution time log_strides = ( "manual" if self.strides is None else tuple(map(str, self.strides)) ) log_window = "manual" if self.window is None else self.window _log_func_execution( t_start, func, self.series_key, log_window, log_strides, output_names # type: ignore[arg-type] ) return pd.DataFrame(feat_out, index=self.index)
Apply a function to the segmented series.
Parameters
func
:FuncWrapper
- The Callable wrapped function which will be applied.
Returns
pd.DataFrame
- The merged output of the function applied to every column in a
new DataFrame. The DataFrame's column-names have the format:
<series_col_name(s)>_<feature_name>__w=<window>
.
Raises
ValueError
- If the passed
func
tries to adjust the data its read-only view.
Notes
- If
func
is only a callable argument, with no additional logic, this will only work for a one-to-one mapping, i.e., no multiple feature-output columns are supported for this case! - If you want to calculate one-to-many,
func
should be aFuncWrapper
instance and explicitly use theoutput_names
attributes of its constructor.
class SequenceStridedRolling (data, window, strides=None, *args, **kwargs)
-
Expand source code
class SequenceStridedRolling(StridedRolling): def __init__( # type: ignore[no-untyped-def] self, data: Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]], window: Union[int, float], strides: Optional[Union[int, float, List[int], List[float]]] = None, *args, **kwargs, ): # Set the data type & call the super constructor self.win_str_type = DataType.SEQUENCE super().__init__(data, window, strides, *args, **kwargs) # ------------------------------- Overridden methods ------------------------------- def _update_start_end_indices_to_stroll_type( self, series_list: List[pd.Series] ) -> None: pass def _parse_segment_idxs(self, segment_idxs: np.ndarray) -> np.ndarray: if any((segment_idxs < self.start) | (segment_idxs > self.end)): warnings.warn(self.OUTSIDE_DATA_BOUNDS_WARNING, RuntimeWarning) return segment_idxs def _create_feat_col_name(self, feat_name: str) -> str: if self.window is not None: win_str = str(self.window) else: win_str = "manual" return self.construct_output_index( series_keys=self.series_key, feat_name=feat_name, win_str=win_str )
Custom time-based sliding window with stride.
Parameters
data
:Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]]
pd.Series
orpd.DataFrame
to slide over, the index must be either numeric or apd.DatetimeIndex
.window
:Union[float, pd.Timedelta]
- Either an int, float, or
pd.Timedelta
, representing the sliding window size in terms of the index (in case of a int or float) or the sliding window duration (in case ofpd.Timedelta
). strides
:Union[float, pd.Timedelta, List[Union[float, pd.Timedelta]]]
, optional- Either a list of int, float, or
pd.Timedelta
, representing the stride sizes in terms of the index (in case of a int or float) or the stride duration (in case ofpd.Timedelta
). By default None. segment_start_idxs
:np.ndarray
, optional- The start indices for the segmented windows. If not provided, the start indices
will be computed from the data using the passed
strides
or by using thesegment_end_idxs
(if not none) +window
. By default None. segment_end_idxs
:np.ndarray
, optional- The end indices for the segmented windows. If not provided, the end indices will
be computed from either (1) the data using the passed
window
+strides
or (2) thesegment_start_idxs
+window
, By default None.Note
When you pass arrays to bothsegment_start_idxs
andsegment_end_idxs
, the corresponding index-values of these arrays will be used as segment-ranges. As a result, the following properties must be met:- both arrays should have equal length
- all values in
segment_start_idxs
should be <=segment_end_idxs
start_idx
:Union[float, pd.Timestamp]
, optional- The start-index which will be used for each series passed to
data
. This is especially useful if multipleStridedRolling
instances are created and the user want to ensure same (start-)indexes for each of them. end_idx
:Union[float, pd.Timestamp]
, optional- The end-index which will be used as sliding end-limit for each series passed to
data
. func_data_type
:Union[np.array, pd.Series]
, optional- The data type of the stroll (either np.array or pd.Series), by default np.array.
Note
Make sure to only set this argument to pd.Series when this is really required, since pd.Series strided-rolling is significantly less efficient. For a np.array it is possible to create very efficient views, but there is no such thing as a pd.Series view. Thus, for each stroll, a new series is created, inducing a lot of non-feature calculation of overhead. window_idx
:str
, optional- The window's index position which will be used as index for the
feature_window aggregation. Must be either of:
["begin", "middle", "end"]
, by default "end". include_final_window
:bool
, optional-
Whether the final (possibly incomplete) window should be included in the strided-window segmentation, by default False.
Note
The remarks below apply when
include_final_window
is set to True. The user should be aware that the last window might be incomplete, i.e.;- when equally sampled, the last window might be smaller than the the other windows.
- when not equally sampled, the last window might not include all the data points (as the begin-time + window-size comes after the last data point).
Note, that when equally sampled, the last window will be a full window when:
- the stride is the sampling rate of the data (or stride = 1 for
sample-based configurations).
Remark: that wheninclude_final_window
is set to False, the last window (which is a full) window will not be included! - (len * sampling_rate - window_size) % stride = 0. Remark that the above case is a base case of this.
approve_sparsity
:bool
, optional- Bool indicating whether the user acknowledges that there may be sparsity (i.e., irregularly sampled data), by default False. If False and sparsity is observed, a warning is raised.
Notes
- This instance withholds a read-only-view of the data its values.
Ancestors
- StridedRolling
- abc.ABC
Subclasses
Class variables
var win_str_type
var reset_series_index_b4_segmenting
var OUTSIDE_DATA_BOUNDS_WARNING
Methods
def apply_func(self, func)
-
Inherited from:
StridedRolling
.apply_func
Apply a function to the segmented series …
class TimeStridedRolling (data, window, strides=None, *args, **kwargs)
-
Expand source code
class TimeStridedRolling(StridedRolling): def __init__( # type: ignore[no-untyped-def] self, data: Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]], window: pd.Timedelta, strides: Optional[Union[pd.Timedelta, List[pd.Timedelta]]] = None, *args, **kwargs, ): # Check that each series / dataframe has the same tz data = to_series_list(data) tz_index = data[0].index.tz for data_entry in to_series_list(data)[1:]: assert ( data_entry.index.tz == tz_index ), "strided rolling input data must all have same timezone" self._tz_index = tz_index # Set the data type & call the super constructor self.win_str_type = DataType.TIME super().__init__(data, window, strides, *args, **kwargs) # -------------------------------- Extended methods -------------------------------- def _get_output_index( self, start_idxs: np.ndarray, end_idxs: np.ndarray, name: str ) -> pd.Index: assert start_idxs.dtype.type == np.datetime64 assert end_idxs.dtype.type == np.datetime64 if not len(start_idxs): # to fix "datetime64 values must have a unit specified" assert not len(end_idxs) start_idxs = start_idxs.astype("datetime64[ns]") end_idxs = end_idxs.astype("datetime64[ns]") start_idxs = pd.to_datetime(start_idxs, utc=True).tz_convert(self._tz_index) end_idxs = pd.to_datetime(end_idxs, utc=True).tz_convert(self._tz_index) return super()._get_output_index(start_idxs, end_idxs, name) # ------------------------------- Overridden methods ------------------------------- def _update_start_end_indices_to_stroll_type( self, series_list: List[pd.Series] ) -> None: pass def _parse_segment_idxs(self, segment_idxs: np.ndarray) -> np.ndarray: segment_idxs = segment_idxs.astype("datetime64") start_, end_ = self.start, self.end if start_.tz is not None: # Convert to UTC (allowing comparison with the segment_idxs) assert end_.tz is not None start_ = start_.tz_convert(None) end_ = end_.tz_convert(None) if any((segment_idxs < start_) | (segment_idxs > end_)): warnings.warn(self.OUTSIDE_DATA_BOUNDS_WARNING, RuntimeWarning) return segment_idxs def _create_feat_col_name(self, feat_name: str) -> str: # Convert win to time-string if available :) if self.window is not None: win_str = timedelta_to_str(self.window) else: win_str = "manual" return self.construct_output_index( series_keys=self.series_key, feat_name=feat_name, win_str=win_str )
Custom time-based sliding window with stride.
Parameters
data
:Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]]
pd.Series
orpd.DataFrame
to slide over, the index must be either numeric or apd.DatetimeIndex
.window
:Union[float, pd.Timedelta]
- Either an int, float, or
pd.Timedelta
, representing the sliding window size in terms of the index (in case of a int or float) or the sliding window duration (in case ofpd.Timedelta
). strides
:Union[float, pd.Timedelta, List[Union[float, pd.Timedelta]]]
, optional- Either a list of int, float, or
pd.Timedelta
, representing the stride sizes in terms of the index (in case of a int or float) or the stride duration (in case ofpd.Timedelta
). By default None. segment_start_idxs
:np.ndarray
, optional- The start indices for the segmented windows. If not provided, the start indices
will be computed from the data using the passed
strides
or by using thesegment_end_idxs
(if not none) +window
. By default None. segment_end_idxs
:np.ndarray
, optional- The end indices for the segmented windows. If not provided, the end indices will
be computed from either (1) the data using the passed
window
+strides
or (2) thesegment_start_idxs
+window
, By default None.Note
When you pass arrays to bothsegment_start_idxs
andsegment_end_idxs
, the corresponding index-values of these arrays will be used as segment-ranges. As a result, the following properties must be met:- both arrays should have equal length
- all values in
segment_start_idxs
should be <=segment_end_idxs
start_idx
:Union[float, pd.Timestamp]
, optional- The start-index which will be used for each series passed to
data
. This is especially useful if multipleStridedRolling
instances are created and the user want to ensure same (start-)indexes for each of them. end_idx
:Union[float, pd.Timestamp]
, optional- The end-index which will be used as sliding end-limit for each series passed to
data
. func_data_type
:Union[np.array, pd.Series]
, optional- The data type of the stroll (either np.array or pd.Series), by default np.array.
Note
Make sure to only set this argument to pd.Series when this is really required, since pd.Series strided-rolling is significantly less efficient. For a np.array it is possible to create very efficient views, but there is no such thing as a pd.Series view. Thus, for each stroll, a new series is created, inducing a lot of non-feature calculation of overhead. window_idx
:str
, optional- The window's index position which will be used as index for the
feature_window aggregation. Must be either of:
["begin", "middle", "end"]
, by default "end". include_final_window
:bool
, optional-
Whether the final (possibly incomplete) window should be included in the strided-window segmentation, by default False.
Note
The remarks below apply when
include_final_window
is set to True. The user should be aware that the last window might be incomplete, i.e.;- when equally sampled, the last window might be smaller than the the other windows.
- when not equally sampled, the last window might not include all the data points (as the begin-time + window-size comes after the last data point).
Note, that when equally sampled, the last window will be a full window when:
- the stride is the sampling rate of the data (or stride = 1 for
sample-based configurations).
Remark: that wheninclude_final_window
is set to False, the last window (which is a full) window will not be included! - (len * sampling_rate - window_size) % stride = 0. Remark that the above case is a base case of this.
approve_sparsity
:bool
, optional- Bool indicating whether the user acknowledges that there may be sparsity (i.e., irregularly sampled data), by default False. If False and sparsity is observed, a warning is raised.
Notes
- This instance withholds a read-only-view of the data its values.
Ancestors
- StridedRolling
- abc.ABC
Class variables
var win_str_type
var reset_series_index_b4_segmenting
var OUTSIDE_DATA_BOUNDS_WARNING
Methods
def apply_func(self, func)
-
Inherited from:
StridedRolling
.apply_func
Apply a function to the segmented series …
class TimeIndexSampleStridedRolling (data, window, strides=None, segment_start_idxs=None, segment_end_idxs=None, *args, **kwargs)
-
Expand source code
class TimeIndexSampleStridedRolling(SequenceStridedRolling): def __init__( # type: ignore[no-untyped-def] self, # TODO -> update arguments data: Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]], window: int, strides: Optional[Union[int, List[int]]] = None, segment_start_idxs: Optional[np.ndarray] = None, segment_end_idxs: Optional[np.ndarray] = None, *args, **kwargs, ): """ .. Warning:: When `data` consists of multiple independently sampled series (e.g. feature functions which take multiple series as input), The time-**index of each series**: \n - must _roughly_ **share** the same **sample frequency**. - will be first time-aligned before transitioning to sample-segmentation by using the inner bounds .. Note:: `TimeIndexSampleStridedRolling` **does not support** the ``segment_start_idxs`` and ``segment_end_idxs`` arguments. Setting these will raise a NotImplementedError. """ if segment_start_idxs is not None or segment_end_idxs is not None: raise NotImplementedError( "TimeIndexSampleStridedRolling is not implemented to support passing" + "segment_start_idxs or segment_end_idxs" ) # We want to reset the index as its type differs from the passed win-stride # configs self.reset_series_index_b4_segmenting = True series_list = to_series_list(data) if isinstance(data, list) and len(data) > 1: # Slice data into its inner range so that the start position # is aligned (when we will use sample-based methodologies) start, end = _determine_bounds("inner", series_list) series_list = [s[start:end] for s in series_list] kwargs.update({"start_idx": start, "end_idx": end}) # We retain the first series list to stitch back the output index self._series_index = series_list[0].index # pass the sliced series list instead of data super().__init__(series_list, window, strides, *args, **kwargs) assert self.series_dtype == DataType.TIME # we want to assure that the window-stride arguments are integers (samples) assert all(isinstance(p, int) for p in [self.window] + self.strides) def apply_func(self, func: FuncWrapper) -> pd.DataFrame: # Apply the function and stitch back the time-index df = super().apply_func(func) df.index = self._series_index[df.index] return df # ---------------------------- Overridden methods ------------------------------ def _update_start_end_indices_to_stroll_type( self, series_list: List[pd.Series] ) -> None: # update the start and end times to the sequence datatype self.start, self.end = np.searchsorted( series_list[0].index.values, [self.start.to_datetime64(), self.end.to_datetime64()], "left", )
Custom time-based sliding window with stride.
Parameters
data
:Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]]
pd.Series
orpd.DataFrame
to slide over, the index must be either numeric or apd.DatetimeIndex
.window
:Union[float, pd.Timedelta]
- Either an int, float, or
pd.Timedelta
, representing the sliding window size in terms of the index (in case of a int or float) or the sliding window duration (in case ofpd.Timedelta
). strides
:Union[float, pd.Timedelta, List[Union[float, pd.Timedelta]]]
, optional- Either a list of int, float, or
pd.Timedelta
, representing the stride sizes in terms of the index (in case of a int or float) or the stride duration (in case ofpd.Timedelta
). By default None. segment_start_idxs
:np.ndarray
, optional- The start indices for the segmented windows. If not provided, the start indices
will be computed from the data using the passed
strides
or by using thesegment_end_idxs
(if not none) +window
. By default None. segment_end_idxs
:np.ndarray
, optional- The end indices for the segmented windows. If not provided, the end indices will
be computed from either (1) the data using the passed
window
+strides
or (2) thesegment_start_idxs
+window
, By default None.Note
When you pass arrays to bothsegment_start_idxs
andsegment_end_idxs
, the corresponding index-values of these arrays will be used as segment-ranges. As a result, the following properties must be met:- both arrays should have equal length
- all values in
segment_start_idxs
should be <=segment_end_idxs
start_idx
:Union[float, pd.Timestamp]
, optional- The start-index which will be used for each series passed to
data
. This is especially useful if multipleStridedRolling
instances are created and the user want to ensure same (start-)indexes for each of them. end_idx
:Union[float, pd.Timestamp]
, optional- The end-index which will be used as sliding end-limit for each series passed to
data
. func_data_type
:Union[np.array, pd.Series]
, optional- The data type of the stroll (either np.array or pd.Series), by default np.array.
Note
Make sure to only set this argument to pd.Series when this is really required, since pd.Series strided-rolling is significantly less efficient. For a np.array it is possible to create very efficient views, but there is no such thing as a pd.Series view. Thus, for each stroll, a new series is created, inducing a lot of non-feature calculation of overhead. window_idx
:str
, optional- The window's index position which will be used as index for the
feature_window aggregation. Must be either of:
["begin", "middle", "end"]
, by default "end". include_final_window
:bool
, optional-
Whether the final (possibly incomplete) window should be included in the strided-window segmentation, by default False.
Note
The remarks below apply when
include_final_window
is set to True. The user should be aware that the last window might be incomplete, i.e.;- when equally sampled, the last window might be smaller than the the other windows.
- when not equally sampled, the last window might not include all the data points (as the begin-time + window-size comes after the last data point).
Note, that when equally sampled, the last window will be a full window when:
- the stride is the sampling rate of the data (or stride = 1 for
sample-based configurations).
Remark: that wheninclude_final_window
is set to False, the last window (which is a full) window will not be included! - (len * sampling_rate - window_size) % stride = 0. Remark that the above case is a base case of this.
approve_sparsity
:bool
, optional- Bool indicating whether the user acknowledges that there may be sparsity (i.e., irregularly sampled data), by default False. If False and sparsity is observed, a warning is raised.
Notes
- This instance withholds a read-only-view of the data its values.
Warning
When
data
consists of multiple independently sampled series (e.g. feature functions which take multiple series as input), The time-index of each series:- must roughly share the same sample frequency.
- will be first time-aligned before transitioning to sample-segmentation by using the inner bounds
Note
TimeIndexSampleStridedRolling
does not support thesegment_start_idxs
andsegment_end_idxs
arguments. Setting these will raise a NotImplementedError.Ancestors
- SequenceStridedRolling
- StridedRolling
- abc.ABC
Class variables
var win_str_type
var reset_series_index_b4_segmenting
var OUTSIDE_DATA_BOUNDS_WARNING
Methods
def apply_func(self, func)
-
Expand source code
def apply_func(self, func: FuncWrapper) -> pd.DataFrame: # Apply the function and stitch back the time-index df = super().apply_func(func) df.index = self._series_index[df.index] return df
Inherited from:
SequenceStridedRolling
.apply_func
Apply a function to the segmented series …