Module tsflex.features.segmenter
Series segmentation submodule.
Expand source code
"""Series segmentation submodule."""
__author__ = "Jonas Van Der Donckt"
from .strided_rolling import StridedRolling
from .strided_rolling_factory import StridedRollingFactory
__all__ = [
"StridedRolling",
"StridedRollingFactory",
]
API reference of tsflex.features.segmenter
.strided_rolling
-
Withholds a (rather) fast implementation of an index-based strided rolling window …
.strided_rolling_factory
-
Factory class for creating the proper StridedRolling instances …
Classes
class StridedRolling (data, window, strides=None, segment_start_idxs=None, segment_end_idxs=None, start_idx=None, end_idx=None, func_data_type=numpy.ndarray, window_idx='end', include_final_window=False, approve_sparsity=False)
-
Expand source code
class StridedRolling(ABC): """Custom time-based sliding window with stride. Parameters ---------- data : Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]] ``pd.Series`` or ``pd.DataFrame`` to slide over, the index must be either numeric or a ``pd.DatetimeIndex``. window : Union[float, pd.Timedelta] Either an int, float, or ``pd.Timedelta``, representing the sliding window size in terms of the index (in case of a int or float) or the sliding window duration (in case of ``pd.Timedelta``). strides : Union[float, pd.Timedelta, List[Union[float, pd.Timedelta]]], optional Either a list of int, float, or ``pd.Timedelta``, representing the stride sizes in terms of the index (in case of a int or float) or the stride duration (in case of ``pd.Timedelta``). By default None. segment_start_idxs: np.ndarray, optional The start indices for the segmented windows. If not provided, the start indices will be computed from the data using the passed ``strides`` or by using the ``segment_end_idxs`` (if not none) + ``window``. By default None. segment_end_idxs: np.ndarray, optional The end indices for the segmented windows. If not provided, the end indices will be computed from either (1) the data using the passed ``window`` + ``strides`` or (2) the ``segment_start_idxs`` + ``window``, By default None. .. Note:: When you pass arrays to both ``segment_start_idxs`` and ``segment_end_idxs``, the corresponding index-values of these arrays will be used as segment-ranges. As a result, the following properties must be met:\n - both arrays should have equal length - all values in ``segment_start_idxs`` should be <= ``segment_end_idxs`` start_idx: Union[float, pd.Timestamp], optional The start-index which will be used for each series passed to `data`. This is especially useful if multiple ``StridedRolling`` instances are created and the user want to ensure same (start-)indexes for each of them. end_idx: Union[float, pd.Timestamp], optional The end-index which will be used as sliding end-limit for each series passed to `data`. func_data_type: Union[np.array, pd.Series], optional The data type of the stroll (either np.array or pd.Series), by default np.array. <br> .. Note:: Make sure to only set this argument to pd.Series when this is really required, since pd.Series strided-rolling is significantly less efficient. For a np.array it is possible to create very efficient views, but there is no such thing as a pd.Series view. Thus, for each stroll, a new series is created, inducing a lot of non-feature calculation of overhead. window_idx : str, optional The window's index position which will be used as index for the feature_window aggregation. Must be either of: `["begin", "middle", "end"]`, by default "end". include_final_window: bool, optional Whether the final (possibly incomplete) window should be included in the strided-window segmentation, by default False. .. Note:: The remarks below apply when ``include_final_window`` is set to True. The user should be aware that the last window *might* be incomplete, i.e.; - when equally sampled, the last window *might* be smaller than the the other windows. - when not equally sampled, the last window *might* not include all the data points (as the begin-time + window-size comes after the last data point). Note, that when equally sampled, the last window *will* be a full window when: - the stride is the sampling rate of the data (or stride = 1 for sample-based configurations).<br> **Remark**: that when `include_final_window` is set to False, the last window (which is a full) window will not be included! - *(len * sampling_rate - window_size) % stride = 0*. Remark that the above case is a base case of this. approve_sparsity: bool, optional Bool indicating whether the user acknowledges that there may be sparsity (i.e., irregularly sampled data), by default False. If False and sparsity is observed, a warning is raised. Notes ----- * This instance withholds a **read-only**-view of the data its values. """ # Class variables which are used by subclasses win_str_type: DataType reset_series_index_b4_segmenting: bool = False OUTSIDE_DATA_BOUNDS_WARNING: str = ( "Some segment indexes are outside the range of the data its index." ) # Create the named tuple _NumpySeriesContainer = namedtuple( # type: ignore[name-match] "SeriesContainer", ["name", "values", "start_indexes", "end_indexes"] ) def __init__( self, data: Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]], window: Optional[T], strides: Optional[Union[T, List[T]]] = None, segment_start_idxs: Optional[np.ndarray] = None, segment_end_idxs: Optional[np.ndarray] = None, start_idx: Optional[T] = None, end_idx: Optional[T] = None, func_data_type: Union[np.ndarray, pd.Series] = np.ndarray, window_idx: str = "end", include_final_window: bool = False, approve_sparsity: bool = False, ): if strides is not None: strides = to_list(strides) # Check the passed segment indices if segment_start_idxs is not None and segment_end_idxs is not None: _check_start_end_array(segment_start_idxs, segment_end_idxs) if window is not None: assert AttributeParser.check_expected_type( [window] + ([] if strides is None else strides), self.win_str_type ) self.window = window # type: ignore[var-annotated] self.strides = strides # type: ignore[var-annotated] self.window_idx = window_idx self.include_final_window = include_final_window self.approve_sparsity = approve_sparsity assert func_data_type in SUPPORTED_STROLL_TYPES self.data_type = func_data_type # 0. Standardize the input series_list: List[pd.Series] = to_series_list(data) self.series_dtype = AttributeParser.determine_type(series_list) self.series_key: Tuple[str, ...] = tuple([str(s.name) for s in series_list]) # 1. Determine the start index self.start, self.end = start_idx, end_idx # type: ignore[var-annotated] if self.start is None or self.end is None: # We always pass start_idx and end_idx from the FeatureCollection.calculate # Hence, this code is only useful for testing purposes start, end = _determine_bounds("inner", series_list) # update self.start & self.end if it was not passed self.start = start if self.start is None else self.start self.end = end if self.end is None else self.end # Especially useful when the index dtype differs from the win-stride-dtype # e.g. -> performing a int-based stroll on time-indexed data # Note: this is very niche and thus requires advanced knowledge # TODO: this code can be omitted if we remove TimeIndexSampleStridedRolling self._update_start_end_indices_to_stroll_type(series_list) # 2. Construct the index ranges # Either use the passed segment indices or compute the start or end times of the # segments. The segment indices have precedence over the stride (and window) for # index computation. if segment_start_idxs is not None or segment_end_idxs is not None: self.strides = None if segment_start_idxs is not None and segment_end_idxs is not None: # When both the start and end points are passed, the window does not # matter. self.window = None np_start_times = self._parse_segment_idxs(segment_start_idxs) np_end_times = self._parse_segment_idxs(segment_end_idxs) elif segment_start_idxs is not None: # segment_end_idxs is None np_start_times = self._parse_segment_idxs(segment_start_idxs) np_end_times = np_start_times + self._get_np_value(self.window) else: # segment_end_idxs is not None and segment_start_idxs is None np_end_times = self._parse_segment_idxs(segment_end_idxs) # type: ignore[arg-type] np_start_times = np_end_times - self._get_np_value(self.window) else: np_start_times = self._construct_start_idxs() np_end_times = np_start_times + self._get_np_value(self.window) # Check the numpy start and end indices _check_start_end_array(np_start_times, np_end_times) # 3. Create a new-index which will be used for DataFrame reconstruction # Note: the index-name of the first passed series will be re-used as index-name self.index = self._get_output_index( np_start_times, np_end_times, name=series_list[0].index.name ) # 4. Store the series containers self.series_containers = self._construct_series_containers( series_list, np_start_times, np_end_times ) # 5. Check the sparsity assumption if not self.approve_sparsity and len(self.index): for container in self.series_containers: # Warn when min != max if np.ptp(container.end_indexes - container.start_indexes) != 0: warnings.warn( f"There are gaps in the sequence of the {container.name}" f"-series!", RuntimeWarning, ) def _calc_nb_segments_for_stride(self, stride: T) -> int: """Calculate the number of output items (segments) for a given single stride.""" assert self.start is not None and self.end is not None # for mypy nb_feats = max((self.end - self.start - self.window) // stride + 1, 0) # Add 1 if there is still some data after (including) the last window its # start index - this is only added when `include_last_window` is True. nb_feats += self.include_final_window * ( self.start + stride * nb_feats <= self.end ) return nb_feats def _get_np_start_idx_for_stride(self, stride: T) -> np.ndarray: """Compute the start index for the given single stride.""" # ---------- Efficient numpy code ------- np_start = self._get_np_value(self.start) np_stride = self._get_np_value(stride) # Compute the start times (these remain the same for each series) return np.arange( start=np_start, stop=np_start + self._calc_nb_segments_for_stride(stride) * np_stride, step=np_stride, ) def _construct_start_idxs(self) -> np.ndarray: """Construct the start indices of the segments (for all stride values). To realize this, we compute the start idxs for each stride and then merge them together (without duplicates) in a sorted array. """ start_idxs = [] for stride in self.strides: start_idxs += [self._get_np_start_idx_for_stride(stride)] # note - np.unique also sorts the array return np.unique(np.concatenate(start_idxs)) def _get_output_index( self, start_idxs: np.ndarray, end_idxs: np.ndarray, name: str ) -> pd.Index: """Construct the output index.""" if self.window_idx == "end": return pd.Index(end_idxs, name=name) elif self.window_idx == "middle": return pd.Index( start_idxs + ((end_idxs - start_idxs) / 2), name=name, ) elif self.window_idx == "begin": return pd.Index(start_idxs, name=name) else: raise ValueError( f"window index {self.window_idx} must be either of: " "['end', 'middle', 'begin']" ) def _construct_series_containers( self, series_list: List[pd.Series], np_start_times: np.ndarray, np_end_times: np.ndarray, ) -> List[StridedRolling._NumpySeriesContainer]: series_containers: List[StridedRolling._NumpySeriesContainer] = [] for series in series_list: if not self.reset_series_index_b4_segmenting: np_idx_times = series.index.values else: np_idx_times = np.arange(len(series)) # note: using pd.RangeIndex instead of arange gives the same performance series_name = series.name if self.data_type is np.ndarray: # FuncWrapper.input_type is np.ndarray # create a non-writeable view of the series series = series.values # np.array will be stored in the SeriesContainer series.flags.writeable = False elif self.data_type is pd.Series: # FuncWrapper.input_type is pd.Series # pd.Series will be stored in the SeriesContainer series.values.flags.writeable = False series.index.values.flags.writeable = False else: raise ValueError("unsupported datatype") series_containers.append( StridedRolling._NumpySeriesContainer( name=series_name, values=series, # the slicing will be performed on [ t_start, t_end [ # TODO: this can maybe be optimized -> further look into this # np_idx_times, np_start_times, & np_end_times are all sorted! # as we assume & check that the time index is monotonically # increasing & the latter 2 are created using `np.arange()` start_indexes=np.searchsorted(np_idx_times, np_start_times, "left"), end_indexes=np.searchsorted(np_idx_times, np_end_times, "left"), ) ) return series_containers def apply_func(self, func: FuncWrapper) -> pd.DataFrame: """Apply a function to the segmented series. Parameters ---------- func : FuncWrapper The Callable wrapped function which will be applied. Returns ------- pd.DataFrame The merged output of the function applied to every column in a new DataFrame. The DataFrame's column-names have the format: `<series_col_name(s)>_<feature_name>__w=<window>`. Raises ------ ValueError If the passed ``func`` tries to adjust the data its read-only view. Notes ----- * If ``func`` is only a callable argument, with no additional logic, this will only work for a one-to-one mapping, i.e., no multiple feature-output columns are supported for this case!<br> * If you want to calculate one-to-many, ``func`` should be a ``FuncWrapper`` instance and explicitly use the ``output_names`` attributes of its constructor. """ feat_names = func.output_names t_start = time.perf_counter() # --- Future work --- # would be nice if we could optimize this double for loop with something # more vectorized # # As for now we use a map to apply the function (as this evaluates its # expression only once, whereas a list comprehension evaluates its expression # every time). # See more why: https://stackoverflow.com/a/59838723 out: np.ndarray if func.vectorized: # Vectorized function execution ## IMPL 1 ## Results in a high memory peak as a new np.array is created (and thus no ## view is being used) # out = np.asarray( # func( # *[ # np.array([ # sc.values[sc.start_indexes[idx]: sc.end_indexes[idx]] # for idx in range(len(self.index)) # ]) # for sc in self.series_containers # ], # ) # ) ## IMPL 2 ## Is a good implementation (equivalent to the one below), will also fail in ## the same cases, but it does not perform clear assertions (with their ## accompanied clear messages). # out = np.asarray( # func( # *[ # _sliding_strided_window_1d(sc.values, self.window, self.stride) # for sc in self.series_containers # ], # ) # ) views: List[np.ndarray] = [] for sc in self.series_containers: if len(sc.start_indexes) == 0: # There are no feature windows -> return empty array (see below) views = [] break elif len(sc.start_indexes) == 1: # There is only 1 feature window (bc no steps in the sliding window) views.append( np.expand_dims( sc.values[sc.start_indexes[0] : sc.end_indexes[0]], axis=0, ) ) else: # There are >1 feature windows (bc >=1 steps in the sliding window) windows = sc.end_indexes - sc.start_indexes strides = sc.start_indexes[1:] - sc.start_indexes[:-1] assert np.all(windows == windows[0]), ( "Vectorized functions require same number of samples in each " + "segmented window!" ) assert np.all( strides == strides[0] ), "Vectorized functions require same number of samples as stride!" views.append( _sliding_strided_window_1d( sc.values[sc.start_indexes[0] :], windows[0], strides[0], len(self.index), ) ) # Assign empty array as output when there is no view to apply the vectorized # function on (this is the case when there is at least for one series no # feature windows) out = func(*views) if len(views) >= 1 else np.array([]) out_type = type(out) out = np.asarray(out) # When multiple outputs are returned (= tuple) they should be transposed # when combining into an array out = out.T if out_type is tuple else out else: # Function execution over slices (default) out = np.array( list( map( func, *[ [ sc.values[sc.start_indexes[idx] : sc.end_indexes[idx]] for idx in range(len(self.index)) ] for sc in self.series_containers ], ) ) ) # Check if the function output is valid. # This assertion will be raised when e.g. np.max is applied vectorized without # specifying axis=1. assert out.ndim > 0, "Vectorized function returned only 1 (non-array) value!" # Aggregate function output in a dictionary output_names = list(map(self._create_feat_col_name, feat_names)) feat_out = _process_func_output(out, self.index, output_names, str(func)) # Log the function execution time log_strides = ( "manual" if self.strides is None else tuple(map(str, self.strides)) ) log_window = "manual" if self.window is None else self.window _log_func_execution( t_start, func, self.series_key, log_window, log_strides, output_names # type: ignore[arg-type] ) return pd.DataFrame(feat_out, index=self.index) # --------------------------------- STATIC METHODS --------------------------------- @staticmethod def _get_np_value(val: Union[np.number, pd.Timestamp, pd.Timedelta]) -> np.number: # Convert everything to int64 if isinstance(val, pd.Timestamp): return val.to_datetime64() elif isinstance(val, pd.Timedelta): return val.to_timedelta64() else: return val @staticmethod def construct_output_index( series_keys: Union[str, Tuple[str, ...]], feat_name: str, win_str: str ) -> str: series_keys = to_tuple(series_keys) return f"{'|'.join(series_keys)}__{feat_name}__w={win_str}" # ----------------------------- OVERRIDE THESE METHODS ----------------------------- @abstractmethod def _update_start_end_indices_to_stroll_type( self, series_list: List[pd.Series] ) -> None: # NOTE: This method will only be implemented (with code != pass) in the # TimeIndexSampleStridedRolling raise NotImplementedError @abstractmethod def _parse_segment_idxs(self, segment_idxs: np.ndarray) -> np.ndarray: """Check the segment indexes array to lie between self.start and self.end and convert it to the correct dtype (if necessary).""" raise NotImplementedError @abstractmethod def _create_feat_col_name(self, feat_name: str) -> str: raise NotImplementedError
Custom time-based sliding window with stride.
Parameters
data
:Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]]
pd.Series
orpd.DataFrame
to slide over, the index must be either numeric or apd.DatetimeIndex
.window
:Union[float, pd.Timedelta]
- Either an int, float, or
pd.Timedelta
, representing the sliding window size in terms of the index (in case of a int or float) or the sliding window duration (in case ofpd.Timedelta
). strides
:Union[float, pd.Timedelta, List[Union[float, pd.Timedelta]]]
, optional- Either a list of int, float, or
pd.Timedelta
, representing the stride sizes in terms of the index (in case of a int or float) or the stride duration (in case ofpd.Timedelta
). By default None. segment_start_idxs
:np.ndarray
, optional- The start indices for the segmented windows. If not provided, the start indices
will be computed from the data using the passed
strides
or by using thesegment_end_idxs
(if not none) +window
. By default None. segment_end_idxs
:np.ndarray
, optional- The end indices for the segmented windows. If not provided, the end indices will
be computed from either (1) the data using the passed
window
+strides
or (2) thesegment_start_idxs
+window
, By default None.Note
When you pass arrays to bothsegment_start_idxs
andsegment_end_idxs
, the corresponding index-values of these arrays will be used as segment-ranges. As a result, the following properties must be met:- both arrays should have equal length
- all values in
segment_start_idxs
should be <=segment_end_idxs
start_idx
:Union[float, pd.Timestamp]
, optional- The start-index which will be used for each series passed to
data
. This is especially useful if multipleStridedRolling
instances are created and the user want to ensure same (start-)indexes for each of them. end_idx
:Union[float, pd.Timestamp]
, optional- The end-index which will be used as sliding end-limit for each series passed to
data
. func_data_type
:Union[np.array, pd.Series]
, optional- The data type of the stroll (either np.array or pd.Series), by default np.array.
Note
Make sure to only set this argument to pd.Series when this is really required, since pd.Series strided-rolling is significantly less efficient. For a np.array it is possible to create very efficient views, but there is no such thing as a pd.Series view. Thus, for each stroll, a new series is created, inducing a lot of non-feature calculation of overhead. window_idx
:str
, optional- The window's index position which will be used as index for the
feature_window aggregation. Must be either of:
["begin", "middle", "end"]
, by default "end". include_final_window
:bool
, optional-
Whether the final (possibly incomplete) window should be included in the strided-window segmentation, by default False.
Note
The remarks below apply when
include_final_window
is set to True. The user should be aware that the last window might be incomplete, i.e.;- when equally sampled, the last window might be smaller than the the other windows.
- when not equally sampled, the last window might not include all the data points (as the begin-time + window-size comes after the last data point).
Note, that when equally sampled, the last window will be a full window when:
- the stride is the sampling rate of the data (or stride = 1 for
sample-based configurations).
Remark: that wheninclude_final_window
is set to False, the last window (which is a full) window will not be included! - (len * sampling_rate - window_size) % stride = 0. Remark that the above case is a base case of this.
approve_sparsity
:bool
, optional- Bool indicating whether the user acknowledges that there may be sparsity (i.e., irregularly sampled data), by default False. If False and sparsity is observed, a warning is raised.
Notes
- This instance withholds a read-only-view of the data its values.
Ancestors
- abc.ABC
Subclasses
Class variables
var win_str_type
var reset_series_index_b4_segmenting
var OUTSIDE_DATA_BOUNDS_WARNING
Static methods
def construct_output_index(series_keys, feat_name, win_str)
-
Expand source code
@staticmethod def construct_output_index( series_keys: Union[str, Tuple[str, ...]], feat_name: str, win_str: str ) -> str: series_keys = to_tuple(series_keys) return f"{'|'.join(series_keys)}__{feat_name}__w={win_str}"
Methods
def apply_func(self, func)
-
Expand source code
def apply_func(self, func: FuncWrapper) -> pd.DataFrame: """Apply a function to the segmented series. Parameters ---------- func : FuncWrapper The Callable wrapped function which will be applied. Returns ------- pd.DataFrame The merged output of the function applied to every column in a new DataFrame. The DataFrame's column-names have the format: `<series_col_name(s)>_<feature_name>__w=<window>`. Raises ------ ValueError If the passed ``func`` tries to adjust the data its read-only view. Notes ----- * If ``func`` is only a callable argument, with no additional logic, this will only work for a one-to-one mapping, i.e., no multiple feature-output columns are supported for this case!<br> * If you want to calculate one-to-many, ``func`` should be a ``FuncWrapper`` instance and explicitly use the ``output_names`` attributes of its constructor. """ feat_names = func.output_names t_start = time.perf_counter() # --- Future work --- # would be nice if we could optimize this double for loop with something # more vectorized # # As for now we use a map to apply the function (as this evaluates its # expression only once, whereas a list comprehension evaluates its expression # every time). # See more why: https://stackoverflow.com/a/59838723 out: np.ndarray if func.vectorized: # Vectorized function execution ## IMPL 1 ## Results in a high memory peak as a new np.array is created (and thus no ## view is being used) # out = np.asarray( # func( # *[ # np.array([ # sc.values[sc.start_indexes[idx]: sc.end_indexes[idx]] # for idx in range(len(self.index)) # ]) # for sc in self.series_containers # ], # ) # ) ## IMPL 2 ## Is a good implementation (equivalent to the one below), will also fail in ## the same cases, but it does not perform clear assertions (with their ## accompanied clear messages). # out = np.asarray( # func( # *[ # _sliding_strided_window_1d(sc.values, self.window, self.stride) # for sc in self.series_containers # ], # ) # ) views: List[np.ndarray] = [] for sc in self.series_containers: if len(sc.start_indexes) == 0: # There are no feature windows -> return empty array (see below) views = [] break elif len(sc.start_indexes) == 1: # There is only 1 feature window (bc no steps in the sliding window) views.append( np.expand_dims( sc.values[sc.start_indexes[0] : sc.end_indexes[0]], axis=0, ) ) else: # There are >1 feature windows (bc >=1 steps in the sliding window) windows = sc.end_indexes - sc.start_indexes strides = sc.start_indexes[1:] - sc.start_indexes[:-1] assert np.all(windows == windows[0]), ( "Vectorized functions require same number of samples in each " + "segmented window!" ) assert np.all( strides == strides[0] ), "Vectorized functions require same number of samples as stride!" views.append( _sliding_strided_window_1d( sc.values[sc.start_indexes[0] :], windows[0], strides[0], len(self.index), ) ) # Assign empty array as output when there is no view to apply the vectorized # function on (this is the case when there is at least for one series no # feature windows) out = func(*views) if len(views) >= 1 else np.array([]) out_type = type(out) out = np.asarray(out) # When multiple outputs are returned (= tuple) they should be transposed # when combining into an array out = out.T if out_type is tuple else out else: # Function execution over slices (default) out = np.array( list( map( func, *[ [ sc.values[sc.start_indexes[idx] : sc.end_indexes[idx]] for idx in range(len(self.index)) ] for sc in self.series_containers ], ) ) ) # Check if the function output is valid. # This assertion will be raised when e.g. np.max is applied vectorized without # specifying axis=1. assert out.ndim > 0, "Vectorized function returned only 1 (non-array) value!" # Aggregate function output in a dictionary output_names = list(map(self._create_feat_col_name, feat_names)) feat_out = _process_func_output(out, self.index, output_names, str(func)) # Log the function execution time log_strides = ( "manual" if self.strides is None else tuple(map(str, self.strides)) ) log_window = "manual" if self.window is None else self.window _log_func_execution( t_start, func, self.series_key, log_window, log_strides, output_names # type: ignore[arg-type] ) return pd.DataFrame(feat_out, index=self.index)
Apply a function to the segmented series.
Parameters
func
:FuncWrapper
- The Callable wrapped function which will be applied.
Returns
pd.DataFrame
- The merged output of the function applied to every column in a
new DataFrame. The DataFrame's column-names have the format:
<series_col_name(s)>_<feature_name>__w=<window>
.
Raises
ValueError
- If the passed
func
tries to adjust the data its read-only view.
Notes
- If
func
is only a callable argument, with no additional logic, this will only work for a one-to-one mapping, i.e., no multiple feature-output columns are supported for this case! - If you want to calculate one-to-many,
func
should be aFuncWrapper
instance and explicitly use theoutput_names
attributes of its constructor.
class StridedRollingFactory
-
Expand source code
class StridedRollingFactory: """Factory class for creating the appropriate StridedRolling segmenter.""" _datatype_to_stroll = { DataType.TIME: TimeStridedRolling, DataType.SEQUENCE: SequenceStridedRolling, } @staticmethod def get_segmenter( # type: ignore[no-untyped-def] data: Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]], window: Union[int, float, pd.Timedelta], strides: Optional[List[Union[int, float, pd.Timedelta]]], **kwargs, ) -> StridedRolling: """Get the appropriate StridedRolling instance for the passed data. The returned instance will be determined by the data its index type Parameters ---------- data : Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]] The data to segment. window : Union[int, float, pd.Timedelta] The window size to use for the segmentation. strides : Union[List[Union[int, float, pd.Timedelta]], None] The stride(s) to use for the segmentation. **kwargs : dict, optional Additional keyword arguments, see the `StridedRolling` its documentation for more info. .. Note:: When passing `time-based` data, but **int**-based window & stride params, the strided rolling will be `TimeIndexSampleStridedRolling`. This class **assumes** that **all data series** _roughly_ have the **same sample frequency**, as the windows and strides are interpreted in terms of **number of samples**. Raises ------ ValueError When incompatible data & window-stride data types are passed (e.g. time window-stride args on sequence data-index). Returns ------- StridedRolling The constructed StridedRolling instance. """ data_dtype = AttributeParser.determine_type(data) if strides is None: args_dtype = AttributeParser.determine_type(window) else: args_dtype = AttributeParser.determine_type([window] + strides) if window is None or data_dtype.value == args_dtype.value: return StridedRollingFactory._datatype_to_stroll[data_dtype]( data, window, strides, **kwargs ) elif data_dtype == DataType.TIME and args_dtype == DataType.SEQUENCE: # Note: this is very niche and thus requires advanced knowledge assert isinstance(window, int) if strides is not None: assert isinstance(strides, list) and all( isinstance(s, int) for s in strides ) return TimeIndexSampleStridedRolling(data, window, strides, **kwargs) elif data_dtype == DataType.SEQUENCE and args_dtype == DataType.TIME: raise ValueError("Cannot segment a sequence-series with a time window") # This should never happen raise ValueError( f"Cannot segment data of type {data_dtype} with window-stride of type {args_dtype}" )
Factory class for creating the appropriate StridedRolling segmenter.
Static methods
def get_segmenter(data, window, strides, **kwargs)
-
Expand source code
@staticmethod def get_segmenter( # type: ignore[no-untyped-def] data: Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]], window: Union[int, float, pd.Timedelta], strides: Optional[List[Union[int, float, pd.Timedelta]]], **kwargs, ) -> StridedRolling: """Get the appropriate StridedRolling instance for the passed data. The returned instance will be determined by the data its index type Parameters ---------- data : Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]] The data to segment. window : Union[int, float, pd.Timedelta] The window size to use for the segmentation. strides : Union[List[Union[int, float, pd.Timedelta]], None] The stride(s) to use for the segmentation. **kwargs : dict, optional Additional keyword arguments, see the `StridedRolling` its documentation for more info. .. Note:: When passing `time-based` data, but **int**-based window & stride params, the strided rolling will be `TimeIndexSampleStridedRolling`. This class **assumes** that **all data series** _roughly_ have the **same sample frequency**, as the windows and strides are interpreted in terms of **number of samples**. Raises ------ ValueError When incompatible data & window-stride data types are passed (e.g. time window-stride args on sequence data-index). Returns ------- StridedRolling The constructed StridedRolling instance. """ data_dtype = AttributeParser.determine_type(data) if strides is None: args_dtype = AttributeParser.determine_type(window) else: args_dtype = AttributeParser.determine_type([window] + strides) if window is None or data_dtype.value == args_dtype.value: return StridedRollingFactory._datatype_to_stroll[data_dtype]( data, window, strides, **kwargs ) elif data_dtype == DataType.TIME and args_dtype == DataType.SEQUENCE: # Note: this is very niche and thus requires advanced knowledge assert isinstance(window, int) if strides is not None: assert isinstance(strides, list) and all( isinstance(s, int) for s in strides ) return TimeIndexSampleStridedRolling(data, window, strides, **kwargs) elif data_dtype == DataType.SEQUENCE and args_dtype == DataType.TIME: raise ValueError("Cannot segment a sequence-series with a time window") # This should never happen raise ValueError( f"Cannot segment data of type {data_dtype} with window-stride of type {args_dtype}" )
Get the appropriate StridedRolling instance for the passed data.
The returned instance will be determined by the data its index type
Parameters
data
:Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]]
- The data to segment.
window
:Union[int, float, pd.Timedelta]
- The window size to use for the segmentation.
strides
:Union[List[Union[int, float, pd.Timedelta]], None]
- The stride(s) to use for the segmentation.
**kwargs
:dict
, optional- Additional keyword arguments, see the
StridedRolling
its documentation for more info.
Note
When passing
time-based
data, but int-based window & stride params, the strided rolling will beTimeIndexSampleStridedRolling
. This class assumes that all data series roughly have the same sample frequency, as the windows and strides are interpreted in terms of number of samples.Raises
ValueError
- When incompatible data & window-stride data types are passed (e.g. time window-stride args on sequence data-index).
Returns
StridedRolling
- The constructed StridedRolling instance.