Module tsflex.features.feature_collection

FeatureCollection class for bookkeeping and calculation of time-series features.

Methods, next to .calculate(), worth looking at:

  • .serialize() - serialize the FeatureCollection to a file
  • .reduce() - reduce the number of features after feature selection
Expand source code
"""FeatureCollection class for bookkeeping and calculation of time-series features.

Methods, next to `FeatureCollection.calculate()`, worth looking at: \n
* `FeatureCollection.serialize()` - serialize the FeatureCollection to a file
* `FeatureCollection.reduce()` - reduce the number of features after feature selection

"""

from __future__ import annotations

import warnings

__author__ = "Jonas Van Der Donckt, Emiel Deprost, Jeroen Van Der Donckt"

import logging
import os
import time
import traceback
import uuid
from copy import deepcopy
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, Union

import dill
import numpy as np
import pandas as pd
from multiprocess import Pool
from pandas.api.types import is_datetime64_any_dtype
from tqdm.auto import tqdm

from ..features.function_wrapper import FuncWrapper
from ..utils.argument_parsing import parse_n_jobs, parse_time_arg, timedelta_to_str
from ..utils.attribute_parsing import AttributeParser
from ..utils.data import flatten, to_list, to_series_list
from ..utils.logging import add_logging_handler, delete_logging_handlers
from .feature import FeatureDescriptor, MultipleFeatureDescriptors
from .logger import logger
from .segmenter import StridedRolling, StridedRollingFactory
from .utils import (
    _check_start_end_array,
    _determine_bounds,
    _log_func_execution,
    _process_func_output,
)


class FeatureCollection:
    """Create a FeatureCollection.

    Parameters
    ----------
    feature_descriptors : Union[FeatureDescriptor, MultipleFeatureDescriptors, FeatureCollection, List[Union[FeatureDescriptor, MultipleFeatureDescriptors, FeatureCollection]]], optional
        Initial (list of) feature(s) to add to collection, by default None

    Notes
    -----
    * The `series_name` property of the `FeatureDescriptor`s should **not withhold a "|"
      character**, since "|" is used to join the series names of features which use
      multiple series as input).<br>
      e.g.<br>
        * `ACC|x` is **not** allowed as series name, as this is ambiguous and could
          represent that this feature is constructed with a combination of the `ACC`
          and `x` signal.<br>
          Note that `max|feat` is allowed as feature output name.
    * Both the `series_name` and `output_name` property of the `FeatureDescriptor`s
      **should not withhold "__"** in its string representations. This constraint is
      mainly made for readability purposes.

    The two statements above will be asserted

    """

    def __init__(
        self,
        feature_descriptors: Optional[
            Union[
                FeatureDescriptor,
                MultipleFeatureDescriptors,
                FeatureCollection,
                Sequence[
                    Union[
                        FeatureDescriptor, MultipleFeatureDescriptors, FeatureCollection
                    ]
                ],
            ]
        ] = None,
    ):
        # The feature collection is a dict with keys of type:
        #   tuple(tuple(str), float OR pd.timedelta)
        # The outer tuple's values correspond to (series_key(s), window)
        self._feature_desc_dict: Dict[
            Tuple[Tuple[str, ...], Union[float, pd.Timedelta]], List[FeatureDescriptor]
        ] = {}

        if feature_descriptors:
            self.add(feature_descriptors)

    def get_required_series(self) -> List[str]:
        """Return all required series names for this feature collection.

        Return the list of series names that are required in order to calculate all the
        features (defined by the `FeatureDescriptor` objects) of this feature
        collection.

        Returns
        -------
        List[str]
            List of all the required series names.

        """
        return list(
            set(flatten([fr_key[0] for fr_key in self._feature_desc_dict.keys()]))
        )

    def get_nb_output_features(self) -> int:
        """Return the number of output features in this feature collection.

        Returns
        -------
        int
            The number of output features in this feature collection.

        """
        fd_list: Iterable[FeatureDescriptor] = flatten(self._feature_desc_dict.values())
        return sum(fd.get_nb_output_features() for fd in fd_list)

    def _get_nb_output_features_without_window(self) -> int:
        """Return the number of output features in this feature collection, without
        using the window as a unique identifier.

        This is relevant for when the window value(s) are overridden by passing
        `segment_start_idxs` and `segment_end_idxs` to the `calculate` method.

        Returns
        -------
        int:
            The number of output features in this feature collection without using the
            window as a unique identifier.

        """
        return len(
            set(
                (series, o)
                for (series, _), fd_list in self._feature_desc_dict.items()
                for fd in fd_list
                for o in fd.function.output_names
            )
        )

    def _get_nb_feat_funcs(self) -> int:
        return sum(map(len, self._feature_desc_dict.values()))

    @staticmethod
    def _get_collection_key(
        feature: FeatureDescriptor,
    ) -> Tuple[Tuple[str, ...], Union[pd.Timedelta, float, None]]:
        # Note: `window` property can be either a pd.Timedelta or a float or None
        # assert feature.window is not None
        return feature.series_name, feature.window

    def _check_feature_descriptors(
        self,
        skip_none: bool,
        calc_stride: Optional[Union[float, pd.Timedelta, None]] = None,
    ) -> None:
        """Verify whether all added FeatureDescriptors imply the same-input data type.

        If this condition is not met, a warning will be raised.

        Parameters
        ----------
        skip_none: bool
            Whether to include None stride values in the checks.
        calc_stride: Union[float, pd.Timedelta, None], optional
            The `FeatureCollection.calculate` its stride argument, by default None.
            This stride takes precedence over a `FeatureDescriptor` its stride when
            it is not None.

        """
        dtype_set = set()
        for series_names, win in self._feature_desc_dict.keys():
            for fd in self._feature_desc_dict[(series_names, win)]:
                stride = calc_stride if calc_stride is not None else fd.stride
                if skip_none and stride is None:
                    dtype_set.add(AttributeParser.determine_type(win))
                else:
                    dtype_set.add(
                        AttributeParser.determine_type([win] + to_list(stride))
                    )

        if len(dtype_set) > 1:
            warnings.warn(
                "There are multiple FeatureDescriptor window-stride "
                + f"datatypes present in this FeatureCollection, i.e.: {dtype_set}",
                category=RuntimeWarning,
            )

    def _add_feature(self, feature: FeatureDescriptor) -> None:
        """Add a `FeatureDescriptor` instance to the collection.

        Parameters
        ----------
        feature : FeatureDescriptor
            The feature that will be added to this feature collection.

        """
        # Check whether the `|` is not present in the series
        assert not any("|" in s_name for s_name in feature.get_required_series())
        # Check whether the '__" is not present in the series and function output names
        assert not any(
            "__" in output_name for output_name in feature.function.output_names
        )
        assert not any("__" in s_name for s_name in feature.get_required_series())

        series_win_stride_key = self._get_collection_key(feature)
        if series_win_stride_key in self._feature_desc_dict.keys():
            added_output_names = flatten(
                f.function.output_names
                for f in self._feature_desc_dict[series_win_stride_key]
            )
            # Check that not a feature with the same output_name(s) is already added
            # for the series_win_stride_key
            assert not any(
                output_name in added_output_names
                for output_name in feature.function.output_names
            ), f"Feature with output name(s) {feature.function.output_names} duplicated"
            self._feature_desc_dict[series_win_stride_key].append(feature)
        else:
            self._feature_desc_dict[series_win_stride_key] = [feature]

    def add(
        self,
        features: Union[
            FeatureDescriptor,
            MultipleFeatureDescriptors,
            FeatureCollection,
            Sequence[
                Union[FeatureDescriptor, MultipleFeatureDescriptors, FeatureCollection]
            ],
        ],
    ) -> None:
        """Add feature(s) to the FeatureCollection.

        Parameters
        ----------
        features : Union[FeatureDescriptor, MultipleFeatureDescriptors, FeatureCollection, List[Union[FeatureDescriptor, MultipleFeatureDescriptors, FeatureCollection]]]
            Feature(s) (containers) whose contained features will be added.

        Raises
        ------
        TypeError
            Raised when an item within `features` is not an instance of
            [`MultipleFeatureDescriptors`, `FeatureDescriptors`, `FeatureCollection`].

        """
        # Convert to list if necessary
        features = to_list(features)

        for feature in features:
            if isinstance(feature, MultipleFeatureDescriptors):
                self.add(feature.feature_descriptions)
            elif isinstance(feature, FeatureDescriptor):
                self._add_feature(feature)
            elif isinstance(feature, FeatureCollection):
                # List needs to be flattened
                self.add(list(flatten(feature._feature_desc_dict.values())))
            else:
                raise TypeError(f"type: {type(feature)} is not supported - {feature}")

        # After adding the features, check whether the descriptors are compatible
        self._check_feature_descriptors(skip_none=True)

    @staticmethod
    def _executor_stroll(idx: int) -> pd.DataFrame:
        """Executor function for the StridedRolling.apply_func method.

        Strided rolling feature calculation occurs when either;
        - a `window` and `stride` argument are stored in the `FeatureDescriptor` object
        - the `window` is stored in the `FeatureDescriptor` object and the `stride`
          argument is passed to the `calculate` method, potentially overriding the
          `stride`
        - segment indices are passed to the `calculate` method
        - a `group_by_consecutive` argument is passed to the `calculate` method (since
          we calculate the segment indices for the consecutive groups)

        This method uses the global `get_stroll_func` function, which returns the
        StridedRolling object and the function that needs to be applied to the
        StridedRolling object. Using a global function is necessary to facilitate
        multiprocessing.
        """
        # Uses the global get_stroll_func
        stroll, function = get_stroll_func(idx)
        return stroll.apply_func(function)  # execution time is logged in apply_func

    @staticmethod
    def _executor_grouped(idx: int) -> pd.DataFrame:
        """Executor function for grouped feature calculation.

        Grouped feature calculation occurs when either;
        - a `group_by_all` argument is passed to the `calculate` method
        - a `DataFrameGroupBy` is passed as `data` argument to the `calculate` method

        Note that passing a `group_by_consecutive` argument to the `calculate` method
        will not use this executor function, but will use the `_executor_stroll` as
        executor function (since we calculate the segment indices for the consecutive
        groups).

        This method uses the global `get_group_func` function, which returns a
        pd.DataFrame (containing only the necessary data for the function) and the
        function that needs to be applied to the pd.DataFrame. In addition, the global
        `group_indices` and `group_id_name` are used to access the grouped data and the
        group id name respectively. Using a global function is necessary to facilitate
        multiprocessing.
        """
        # Uses the global get_group_func, group_indices, and group_id_name
        data, function = get_group_func(idx)
        group_ids = group_indices.keys()  # group_ids are the keys of the group_indices
        cols_tuple = tuple(data.columns.values)

        t_start = time.perf_counter()

        # Wrap the function to handle multiple inputs and convert the inputs to numpy
        # array if necessary
        f = function
        if function.input_type is np.array:

            def f(x: pd.DataFrame) -> Any:
                # pass the inputs as positional arguments of numpy array type
                return function(*[x[c].values for c in cols_tuple])

        else:  # function.input_type is pd.Series

            def f(x: pd.DataFrame) -> Any:
                # pass the inputs as positional arguments of pd.Series type
                return function(*[x[c] for c in cols_tuple])

        # Function execution over the grouped data (accessed by using the group_indices)
        out = np.array(list(map(f, [data.iloc[idx] for idx in group_indices.values()])))

        # Aggregate function output in a dictionary
        output_names = [
            StridedRolling.construct_output_index(
                cols_tuple, feat_name, win_str="manual"
            )
            for feat_name in function.output_names
        ]
        feat_out = _process_func_output(out, group_ids, output_names, str(function))

        # Log the function execution time
        _log_func_execution(
            t_start, function, cols_tuple, "manual", "manual", output_names
        )

        return pd.DataFrame(feat_out, index=group_ids).rename_axis(index=group_id_name)

    def _stroll_feat_generator(
        self,
        series_dict: Dict[str, pd.Series],
        calc_stride: Union[List[Union[float, pd.Timedelta]], None],
        segment_start_idxs: Union[np.ndarray, None],
        segment_end_idxs: Union[np.ndarray, None],
        start_idx: Any,
        end_idx: Any,
        window_idx: str,
        include_final_window: bool,
        approve_sparsity: bool,
    ) -> Callable[[int], Tuple[StridedRolling, FuncWrapper]]:
        # --- Future work ---
        # We could also make the StridedRolling creation multithreaded
        # Very low priority because the STROLL __init__ is rather efficient!
        keys_wins_strides = list(self._feature_desc_dict.keys())
        lengths = np.cumsum(
            [len(self._feature_desc_dict[k]) for k in keys_wins_strides]
        )

        def get_stroll_function(idx: int) -> Tuple[StridedRolling, FuncWrapper]:
            key_idx = np.searchsorted(lengths, idx, "right")  # right bc idx starts at 0
            key, win = keys_wins_strides[key_idx]

            feature = self._feature_desc_dict[keys_wins_strides[key_idx]][
                idx - lengths[key_idx]
            ]
            stride = feature.stride if calc_stride is None else calc_stride
            function: FuncWrapper = feature.function
            # The factory method will instantiate the right StridedRolling object
            stroll_arg_dict = dict(
                data=[series_dict[k] for k in key],
                window=win,
                strides=stride,
                segment_start_idxs=segment_start_idxs,
                segment_end_idxs=segment_end_idxs,
                start_idx=start_idx,
                end_idx=end_idx,
                window_idx=window_idx,
                include_final_window=include_final_window,
                approve_sparsity=approve_sparsity,
                func_data_type=function.input_type,
            )
            stroll = StridedRollingFactory.get_segmenter(**stroll_arg_dict)
            return stroll, function

        return get_stroll_function

    def _group_feat_generator(
        self,
        grouped_df: pd.api.typing.DataFrameGroupBy,
    ) -> Callable[[int], Tuple[pd.api.typing.DataFrameGroupBy, FuncWrapper,],]:
        """Return a function that returns the necessary columns of the grouped data and
        the function that needs to be applied to the grouped data.

        Note that the function does not return groups, but rather the necessary columns
        of the grouped data (i.e. the data on which the function needs to be applied).
        To access the groups, the global `group_indices` and `group_id_name` are used.
        """
        keys_wins = list(self._feature_desc_dict.keys())
        lengths = np.cumsum([len(self._feature_desc_dict[k]) for k in keys_wins])

        def get_group_function(
            idx: int,
        ) -> Tuple[pd.api.typing.DataFrameGroupBy, FuncWrapper,]:
            key_idx = np.searchsorted(lengths, idx, "right")  # right bc idx starts at 0
            key, win = keys_wins[key_idx]

            feature = self._feature_desc_dict[keys_wins[key_idx]][
                idx - lengths[key_idx]
            ]
            function: FuncWrapper = feature.function
            return grouped_df.obj[list(key)], function

        return get_group_function

    def _check_no_multiple_windows(self, error_case: str) -> None:
        """Check whether there are no multiple windows in the feature collection.

        Parameters
        ----------
        error_case : str
            The case in which no multiple windows are allowed.

        """
        assert (
            self._get_nb_output_features_without_window()
            == self.get_nb_output_features()
        ), (
            error_case
            + "; each output name - series_input combination can only have 1 window"
            + " (or None)"
        )

    @staticmethod
    def _data_to_series_dict(
        data: Union[pd.DataFrame, pd.Series, List[Union[pd.Series, pd.DataFrame]]],
        required_series: List[str],
    ) -> Dict[str, pd.Series]:
        series_dict: Dict[str, pd.Series] = {}
        for s in to_series_list(data):
            if not s.index.is_monotonic_increasing:
                warnings.warn(
                    f"The index of series '{s.name}' is not monotonic increasing. "
                    + "The series will be sorted by the index.",
                    RuntimeWarning,
                )
                s = s.sort_index(ascending=True, inplace=False, ignore_index=False)

            # Assert the assumptions we make!
            assert s.index.is_monotonic_increasing

            if s.name in required_series:
                series_dict[str(s.name)] = s

        return series_dict

    @staticmethod
    def _process_segment_idxs(
        segment_idxs: Union[list, np.ndarray, pd.Series, pd.Index]
    ) -> np.ndarray:
        if hasattr(segment_idxs, "values"):
            segment_idxs = segment_idxs.values
        segment_idxs = np.asarray(segment_idxs)
        if segment_idxs.ndim > 1:
            segment_idxs = segment_idxs.squeeze()  # remove singleton dimensions
        return segment_idxs

    @staticmethod
    def _group_by_all(
        series_dict: Dict[str, pd.Series], col_name: str
    ) -> pd.api.typing.DataFrameGroupBy:
        """Group all `column_name` values and return the grouped data.

        GroupBy ignores all rows with NaN values for the column on which we group.

        Parameters
        ----------
        series_dict : Dict[str, pd.Series]
            Input data.
        col_name : str
            The column name on which the grouping will need to take place.

        Returns
        -------
        pd.api.typing.DataFrameGroupBy
            A `DataFrameGroupBy` object, with the group names as keys and the indices
            as values.

        """
        df = pd.DataFrame(series_dict)
        assert col_name in df.columns

        # Check if there are nan values in the column on which we group
        if df[col_name].isna().any():
            warnings.warn(
                f"NaN values were found in the column '{col_name}' (when expanding the "
                + f"data to a pd.DataFrame which contains {df.columns}. "
                + "Rows with NaN values for the grouping column will be ignored.",
                RuntimeWarning,
            )

        # GroupBy ignores all rows with NaN values for the column on which we group
        return df.groupby(col_name)

    def _calculate_group_by_all(
        self,
        grouped_data: pd.api.typing.DataFrameGroupBy,
        return_df: bool,
        show_progress: bool,
        n_jobs: Optional[int],
        f_handler: Optional[logging.FileHandler],
    ) -> Union[List[pd.DataFrame], pd.DataFrame]:
        """Calculate features on each group of the grouped data.

        Parameters
        ----------
        grouped_data : pd.api.typing.DataFrameGroupBy
            The grouped data.
        return_df: bool, optional
            Whether the output needs to be a DataFrame or a list thereof.
        show_progress: bool, optional
            Whether to show a progress bar.
        n_jobs: int, optional
            The number of jobs to run in parallel.
        f_handler: logging.FileHandler, optional
            The file handler that is used to log the function execution times.

        .. Note::
            Is comparable to following pseudo-SQL code:
            ```sql
            SELECT func(x)
            FROM `data`
            GROUP BY ...
            ```
            where `func` is the FeatureDescriptor function and `x` is the name
            on which the FeatureDescriptor operates. The group by is already done by
            passing a `DataFrameGroupBy` object to this method.
        """
        global group_indices, group_id_name, get_group_func
        group_indices = grouped_data.indices  # dict - group_id as key; indices as value
        # since in future versions of pandas grouper will be deprecated
        group_attr = "_grouper" if hasattr(grouped_data, "_grouper") else "grouper"
        group_id_name = getattr(grouped_data, group_attr).names  # name of group col(s)
        get_group_func = self._group_feat_generator(grouped_data)

        # sort_output_index can be set to False, since we want to keep the same order as
        # the group_indices
        return self._calculate_feature_list(
            self._executor_grouped, n_jobs, show_progress, return_df, False, f_handler
        )

    @staticmethod
    def _group_by_consecutive(
        df: Union[pd.Series, pd.DataFrame], col_name: Optional[str] = None
    ) -> pd.DataFrame:
        """Group consecutive `col_name` values in a single DataFrame.

        This is especially useful if you want to represent sparse data in a more
        compact format.

        Parameters
        ----------
        df : Union[pd.Series, pd.DataFrame]
            Input data.
        col_name : str, optional
            If a dataFrame is passed, you will need to specify the `col_name` on which
            the consecutive-grouping will need to take place.

        Returns
        -------
        pd.DataFrame
            A new `DataFrame` view, with columns:
            [`start`, `end`, `col_name`], representing the
            start- and endtime of the consecutive range, and the col_name's consecutive
            values.
        """
        if type(df) == pd.Series:
            col_name = df.name
            df = df.to_frame()

        assert col_name in df.columns
        assert col_name not in [
            "start",
            "end",
        ], "Grouping column cannot be 'start' or 'end'"

        # Check if there are nan values in the column on which we group
        if df[col_name].isna().any():
            warnings.warn(
                f"NaN values were found in the column '{col_name}' (when expanding the "
                + f"data to a pd.DataFrame which contains {df.columns}. "
                + "Rows with NaN values for the grouping column will be ignored.",
                RuntimeWarning,
            )

        # Drop all rows with NaN values for the column on which we group
        df.dropna(subset=[col_name], inplace=True)

        df_cum = (
            (df[col_name] != df[col_name].shift(1))
            .astype("int")
            .cumsum()
            .rename("value_grp")
            .to_frame()
        )
        df_cum["sequence_idx"] = df.index
        df_cum[col_name] = df[col_name]

        df_cum_grouped = df_cum.groupby("value_grp")
        df_grouped = pd.DataFrame(
            {
                "start": df_cum_grouped["sequence_idx"].first(),
                "end": df_cum_grouped["sequence_idx"].last(),
                col_name: df_cum_grouped[col_name].first(),
            }
        ).reset_index(drop=True)

        return df_grouped

    def _calculate_group_by_consecutive(  # type: ignore[no-untyped-def]
        self,
        data: Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]],
        group_by: str,
        return_df: bool = False,
        **calculate_kwargs,
    ) -> Union[List[pd.DataFrame], pd.DataFrame]:
        """Calculate features on each consecutive group of the data.

        Parameters
        ----------
        data : Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]]
            Must be time-indexed!
        group_by: str
            Name of column by which to group values.
        return_df: bool, optional
            Whether the output needs to be a DataFrame or a list thereof, by default
            False. If `True` the output dataframes will be merged to a DataFrame with an
            outer merge.
        **calculate_kwargs:
            Keyword arguments that will be passed to the `calculate` method.

        .. Note::
            Is comparable to following pseudo-SQL code:
            ```sql
            SELECT func(x)
            FROM `data`
            GROUP BY `group_by`
            ```
            where `func` is the FeatureDescriptor function and `x` is the name
            on which the FeatureDescriptor operates. Note however that the grouping is
            done on consecutive values of `group_by` (i.e. `group_by` values that are
            the same and are next to each other are grouped together).
        """
        # 0. Transform to dataframe
        series_dict = FeatureCollection._data_to_series_dict(
            data, self.get_required_series() + [group_by]
        )
        df = pd.DataFrame(series_dict)
        # 1. Group by `group_by` column
        consecutive_grouped_by_df = self._group_by_consecutive(df, col_name=group_by)
        # 2. Get start and end idxs of consecutive groups
        start_segment_idxs = consecutive_grouped_by_df["start"]
        end_segment_idxs = start_segment_idxs.shift(-1)
        # fill the nan value with the last end idx
        end_segment_idxs.iloc[-1] = consecutive_grouped_by_df["end"].iloc[-1]
        # because segment end idxs are exclusive, we need to add an offset to the last
        # end idx so that all data gets used
        segment_vals = end_segment_idxs.values
        if is_datetime64_any_dtype(segment_vals):
            segment_vals[-1] += pd.Timedelta(days=1)
        else:
            segment_vals[-1] += 1
        # 3. Calculate features
        try:
            # Filter out the warnings that are raised when segment indices are passed
            # (since users expect irregular window sizes when grouping)
            with warnings.catch_warnings():
                warnings.filterwarnings(
                    "ignore", category=RuntimeWarning, message="^.*segment indexes.*$"
                )
                warnings.filterwarnings(
                    "ignore", category=RuntimeWarning, message="^.*gaps.*$"
                )
                # 3.1. Calculate features using the groupby segment idxs
                calc_results = self.calculate(
                    data=df,
                    segment_start_idxs=start_segment_idxs,
                    segment_end_idxs=end_segment_idxs,
                    **calculate_kwargs,
                )

            # 3.2 Concatenate results and add the group_by column as well as the
            # start and end idxs of the segments
            calc_result = pd.concat(calc_results, join="outer", copy=False, axis=1)
            calc_result.reset_index(inplace=True, drop=True)
            calc_result[group_by] = consecutive_grouped_by_df[group_by]
            calc_result["__start"] = consecutive_grouped_by_df["start"]
            calc_result["__end"] = consecutive_grouped_by_df["end"]

            if return_df:
                return calc_result
            else:
                return [calc_result[col] for col in calc_result.columns]

        except Exception as e:
            raise RuntimeError(
                f"An exception was raised during feature extraction:\n{e}"
            )

    @staticmethod
    def _process_njobs(n_jobs: Union[int, None], nb_funcs: int) -> int:
        """Process the number of jobs to run in parallel.

        On Windows no multiprocessing is supported, see
        https://github.com/predict-idlab/tsflex/issues/51

        Parameters
        ----------
        n_jobs : Union[int, None]
            The number of jobs to run in parallel.
        nb_funcs : int
            The number of feature functions.

        Returns
        -------
        int
            The number of jobs to run in parallel.

        """
        if os.name == "nt":  # On Windows no multiprocessing is supported
            n_jobs = 1
        else:
            n_jobs = parse_n_jobs(n_jobs)
        return min(n_jobs, nb_funcs)

    def _calculate_feature_list(
        self,
        executor: Callable[[int], pd.DataFrame],
        n_jobs: Union[int, None],
        show_progress: bool,
        return_df: bool,
        sort_output_index: bool,
        f_handler: Optional[logging.FileHandler],
    ) -> Union[List[pd.DataFrame], pd.DataFrame]:
        """Calculate the features for the given executor.

        Parameters
        ----------
        executor : Callable[[int], pd.DataFrame]
            The executor function that will be used to calculate the features.
        n_jobs : Union[int, None]
            The number of jobs to run in parallel.
        show_progress : bool
            Whether to show a progress bar.
        return_df : bool
            Whether to return a DataFrame or a list of DataFrames.
        sort_output_index : bool
            Whether to sort the output index. Note that this is only relevant when
            `return_df` is set to `True`.
        f_handler : logging.FileHandler
            The file handler that is used to log the function execution times.

        Returns
        -------
        Union[List[pd.DataFrame], pd.DataFrame]
            The calculated features.

        """
        nb_feat_funcs = self._get_nb_feat_funcs()
        n_jobs = FeatureCollection._process_njobs(n_jobs, nb_feat_funcs)

        calculated_feature_list: Optional[List[pd.DataFrame]] = None

        if n_jobs in [0, 1]:
            # No multiprocessing
            idxs = range(nb_feat_funcs)
            if show_progress:
                idxs = tqdm(idxs)
            try:
                calculated_feature_list = [executor(idx) for idx in idxs]
            except Exception:
                traceback.print_exc()
        else:
            # Multiprocessing
            with Pool(processes=n_jobs) as pool:
                results = pool.imap_unordered(executor, range(nb_feat_funcs))
                if show_progress:
                    results = tqdm(results, total=nb_feat_funcs)
                try:
                    calculated_feature_list = [f for f in results]
                except Exception:
                    traceback.print_exc()
                    pool.terminate()
                finally:
                    # Close & join because: https://github.com/uqfoundation/pathos/issues/131
                    pool.close()
                    pool.terminate()
                    pool.join()

        # Close the file handler (this avoids PermissionError: [WinError 32])
        if f_handler is not None:
            f_handler.close()
            logger.removeHandler(f_handler)

        if calculated_feature_list is None:
            raise RuntimeError(
                "Feature Extraction halted due to error while extracting one "
                + "(or multiple) feature(s)! See stack trace above."
            )

        if return_df:
            # Concatenate & sort the columns
            df = pd.concat(
                calculated_feature_list,
                axis=1,
                join="outer",
                copy=False,
                sort=sort_output_index,
            )
            return df.reindex(sorted(df.columns), axis=1)
        else:
            return calculated_feature_list

    def calculate(
        self,
        data: Union[
            pd.Series,
            pd.DataFrame,
            List[Union[pd.Series, pd.DataFrame]],
            pd.core.groupby.DataFrameGroupby,
        ],
        stride: Optional[Union[float, str, pd.Timedelta, List, None]] = None,
        segment_start_idxs: Optional[
            Union[list, np.ndarray, pd.Series, pd.Index]
        ] = None,
        segment_end_idxs: Optional[Union[list, np.ndarray, pd.Series, pd.Index]] = None,
        return_df: bool = False,
        window_idx: str = "end",
        include_final_window: bool = False,
        group_by_all: Optional[str] = None,  # TODO: support multiple columns
        group_by_consecutive: Optional[str] = None,  # TODO: support multiple columns
        bound_method: str = "inner",
        approve_sparsity: bool = False,
        show_progress: bool = False,
        logging_file_path: Optional[Union[str, Path]] = None,
        n_jobs: Optional[int] = None,
    ) -> Union[List[pd.DataFrame], pd.DataFrame]:
        """Calculate features on the passed data.

        Parameters
        ----------
        data : Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]], pd.core.groupby.DataFrameGroupby]
            Dataframe or Series or list thereof, with all the required data for the
            feature calculation. \n
            **Assumptions**: \n
            * each Series / DataFrame must have a sortable index. This index represents
            the sequence position of the corresponding values, the index can be either
            numeric or a ``pd.DatetimeIndex``.
            * each Series / DataFrame index must be comparable with all others
            * we assume that each series-name / dataframe-column-name is unique.
            Can also be a `DataFrameGroupBy` object, in which case the expected
            behaviour is similar to grouping by all values in `group_by_all` (i.e.,
            for each group, the features are calculated on the group's data).
        stride: Union[float, str, pd.Timedelta, List[Union[float, str, pd.Timedelta], None], optional
            The stride size. By default None. This argument supports multiple types: \n
            * If None, the stride of the `FeatureDescriptor` objects will be used.
            * If the type is an `float` or an `int`, its value represents the series:\n
                - its stride **range** when a **non time-indexed** series is passed.
                - the stride in **number of samples**, when a **time-indexed** series
                is passed (must then be and `int`)
            * If the stride's type is a `pd.Timedelta`, the stride size represents
            the stride-time delta. The passed data **must have a time-index**.
            * If a `str`, it must represent a stride-time-delta-string. Hence, the
            **passed data must have a time-index**. \n
            .. Note::
                When set, this stride argument takes precedence over the stride property
                of the `FeatureDescriptor`s in this `FeatureCollection` (i.e., when a
                not None value for `stride` passed to this method).
        segment_start_idxs: Union[list, np.ndarray, pd.Series, pd.Index], optional
            The start indices of the segments. If None, the start indices will be
            computed from the data using either:\n
            - the `segment_end_idxs` - the `window` size property of the
                `FeatureDescriptor` in this `FeatureCollection` (if `segment_end_idxs`
                is not None)
            - strided-window rolling on the data using `window` and `stride` of the
                `FeatureDescriptor` in this `FeatureCollection` (if `segment_end_idxs`
                 is also None). (Note that the `stride` argument of this method takes
                 precedence over the `stride` property of the `FeatureDescriptor`s).
            By default None.
        segment_end_idxs: Union[list, np.ndarray, pd.Series, pd.Index], optional
            The end indices for the segmented windows. If None, the end indices will be
            computed from the data using either:\n
            - the `segment_start_idxs` + the `window` size property of the
                `FeatureDescriptor` in this `FeatureCollection` (if `segment_start_idxs`
                is not None)
            - strided-window rolling on the data using `window` and `stride` of the
                `FeatureDescriptor` in this `FeatureCollection` (if `segment_start_idxs`
                 is also None). (Note that the `stride` argument of this method takes
                 precedence over the `stride` property of the `FeatureDescriptor`s).
            By default None.

            ..Note::
                When passing both `segment_start_idxs` and `segment_end_idxs`, these two
                arguments must have the same length and every start index must be <=
                than the corresponding end index.
                Note that passing both arguments, discards any meaning of the `window`
                and `stride` values (as these segment indices define the segmented data,
                and thus no strided-window rolling index calculation has to be executed).
                As such, the user can create variable-length segmented windows. However,
                in such cases, the user should be weary that the feature functions are
                invariant to these (potentially variable-length) windows.
        return_df : bool, optional
            Whether the output needs to be a DataFrame or a list thereof, by default
            False. If `True` the output dataframes will be merged to a DataFrame with an
            outer merge.
        window_idx : str, optional
            The window's index position which will be used as index for the
            feature_window aggregation. Must be either of: `["begin", "middle", "end"]`.
            by **default "end"**. All features in this collection will use the same
            window_idx.

            ..Note::
                `window_idx`="end" uses the window's end (= right open bound) as
                output index. \n
                `window_idx`="begin" uses the window's start idx (= left closed bound)
                as output index.
        include_final_window : bool, optional
            Whether the final (possibly incomplete) window should be included in the
            strided-window segmentation, by default False.

            .. Note::
                The remarks below apply when `include_final_window` is set to True.
                The user should be aware that the last window *might* be incomplete,
                i.e.;

                - when equally sampled, the last window *might* be smaller than the
                  the other windows.
                - when not equally sampled, the last window *might* not include all the
                  data points (as the begin-time + window-size comes after the last data
                  point).

                Note, that when equally sampled, the last window *will* be a full window
                when:

                - the stride is the sampling rate of the data (or stride = 1 for
                sample-based configurations).<br>
                **Remark**: that when `include_final_window` is set to False, the last
                window (which is a full) window will not be included!
                - *(len * sampling_rate - window_size) % stride = 0*. Remark that the
                  above case is a base case of this.
        group_by_all : str, optional
            The name of the column by which to perform grouping. For each group, the
            features will be calculated. The output that is returned contains this
            `group_by` column as index to allow identifying the groups.
            If this parameter is used, the parameters `stride`, `segment_start_idxs`,
            `segment_end_idxs`, `window_idx` and `include_final_window` will be ignored.
            Rows with NaN values for this column will not be considered (as pandas
            ignores these rows when grouping).
            .. note::
                This is similar as passing a `DataFrameGroupBy` object as `data`
                argument to the `calculate` method, where the `DataFrameGroupBy` object
                is created by calling `data.groupby(group_by_all)`.
        group_by_consecutive: str, optional
            The name of the column by which to perform consecutive grouping. A
            consecutive group is a group of values that are the same and are next to
            each other. For each consecutive group, the features will be calculated.
            The output that is returned contains this `group_by` column to allow
            identifying the groups, and also contains fields [`__start`, "__end"] which
            contain start and end time range for each result row.
            If this parameter is used, the parameters `stride`, `segment_start_idxs`,
            `segment_end_idxs`, `window_idx` and `include_final_window` will be ignored.
            Rows with NaN values for this column will not be considered (as we deem NaN
            not as a groupable value).
            Note that for consecutive grouping, groups can appear multiple times if they
            appear in different time-gaps.

            Example output:
            .. example::
        ```python
                    number_sold__sum__w=manual  store    __start      __end
                0          845                  0     2019-01-01 2019-01-01
                1          357                  3     2019-01-02 2019-01-02
                2          904                  6     2019-01-03 2019-01-03
                3          599                  3     2019-01-04 2019-01-05
                4          871                  0     2019-01-06 2019-01-06
                ...                           ...    ...        ...        ...
        ```
        bound_method: str, optional
            The start-end bound methodology which is used to generate the slice ranges
            when ``data`` consists of multiple series / columns.
            Must be either of: `["inner", "inner-outer", "outer"]`, by default "inner".

            * if ``inner``, the inner-bounds of the series are returned.
            * if ``inner-outer``, the left-inner and right-outer bounds of the series
              are returned.
            * if ``outer``, the outer-bounds of the series are returned.
        approve_sparsity: bool, optional
            Bool indicating whether the user acknowledges that there may be sparsity
            (i.e., irregularly sampled data), by default False.
            If False and sparsity is observed, a warning is raised.
        show_progress: bool, optional
            If True, the progress will be shown with a progressbar, by default False.
        logging_file_path : Union[str, Path], optional
            The file path where the logged messages are stored. If `None`, then no
            logging `FileHandler` will be used and the logging messages are only pushed
            to stdout. Otherwise, a logging `FileHandler` will write the logged messages
            to the given file path. See also the `tsflex.features.logger` module.
        n_jobs : int, optional
            The number of processes used for the feature calculation. If `None`, then
            the number returned by _os.cpu_count()_ is used, by default None. \n
            If n_jobs is either 0 or 1, the code will be executed sequentially without
            creating a process pool. This is very useful when debugging, as the stack
            trace will be more comprehensible.
            .. note::
                Multiprocessed execution is not supported on Windows. Even when,
                `n_jobs` is set > 1, the feature extraction will still be executed
                sequentially.
                Why do we not support multiprocessing on Windows; see this issue
                https://github.com/predict-idlab/tsflex/issues/51

            .. tip::
                It takes on avg. _300ms_ to schedule everything with
                multiprocessing. So if your sequential feature extraction code runs
                faster than ~1s, it might not be worth it to parallelize the process
                (and thus better leave `n_jobs` to 0 or 1).

        Returns
        -------
        Union[List[pd.DataFrame], pd.DataFrame]
            The calculated features.

        Raises
        ------
        KeyError
            Raised when a required key is not found in `data`.

        Notes
        ------
        * The (column-)names of the series in `data` represent the `series_names`.
        * If a `logging_file_path` is provided, the execution (time) info can be
          retrieved by calling `logger.get_feature_logs(logging_file_path)`.
          Be aware that the `logging_file_path` gets cleared before the logger pushes
          logged messages. Hence, one should use a separate logging file for each
          constructed processing and feature instance with this library.


        """

        # Check valid data
        if isinstance(data, list):
            assert all(
                isinstance(d, (pd.Series, pd.DataFrame)) for d in data
            ), "All elements of the data list must be either a Series or a DataFrame!"
        else:
            assert isinstance(
                data, (pd.Series, pd.DataFrame, pd.core.groupby.DataFrameGroupBy)
            ), "The data must be either a Series, a DataFrame or a DataFrameGroupBy!"

        # check valid group_by
        assert group_by_all is None or group_by_consecutive is None, (
            "Only max one of the following parameters can be set: "
            + "`group_by_all` or `group_by_consecutive`"
        )
        assert not (
            (group_by_all is not None or group_by_consecutive is not None)
            and isinstance(data, pd.core.groupby.DataFrameGroupBy)
        ), (
            "Cannot use `group_by_all` or `group_by_consecutive` when `data` is"
            + " already a grouped DataFrame!"
        )

        # Delete other logging handlers
        delete_logging_handlers(logger)
        # Add logging handler (if path provided)
        f_handler = None
        if logging_file_path:
            f_handler = add_logging_handler(logger, logging_file_path)

        if (
            group_by_all
            or group_by_consecutive
            or isinstance(data, pd.core.groupby.DataFrameGroupBy)
        ):
            self._check_no_multiple_windows(
                error_case="When using the groupby behavior"
            )

            # The grouping column must be part of the required series
            if group_by_all:
                # group_by_consecutive should be None (checked by asserts above)
                # data should not be a grouped DataFrame (checked by asserts above)
                assert (
                    group_by_all not in self.get_required_series()
                ), "The `group_by_all` column cannot be part of the required series!"
            elif group_by_consecutive:
                # group_by_all should be None (checked by asserts above)
                # data should not be a grouped DataFrame (checked by asserts above)
                assert group_by_consecutive not in self.get_required_series(), (
                    "The `group_by_consecutive` column cannot be part of the required "
                    + "series!"
                )
                # __start and __end should not be part of the output names
                assert "__start" not in self.get_required_series()
                assert "__end" not in self.get_required_series()

            # if any of the following params are not None, warn that they won't be of use
            # in the grouped calculation
            ignored_params = [
                ("stride", None),
                ("segment_start_idxs", None),
                ("segment_end_idxs", None),
                ("window_idx", "end"),
                ("include_final_window", False),
            ]
            local_params = locals()

            for ip, default_value in ignored_params:
                if local_params[ip] is not default_value:
                    warnings.warn(
                        f"Parameter `{ip}` will be ignored in case of GroupBy feature"
                        + " calculation."
                    )

            if group_by_consecutive:
                # Strided rollling feature extraction will take place
                return self._calculate_group_by_consecutive(
                    data,
                    group_by_consecutive,
                    return_df,
                    bound_method=bound_method,
                    approve_sparsity=approve_sparsity,
                    show_progress=show_progress,
                    logging_file_path=logging_file_path,
                    n_jobs=n_jobs,
                )
            else:
                # Grouped feature extraction will take place
                if not isinstance(data, pd.core.groupby.generic.DataFrameGroupBy):
                    # group_by_all should not be None (checked by asserts above)
                    assert group_by_all is not None
                    # 0. Transform to dataframe
                    series_dict = FeatureCollection._data_to_series_dict(
                        data, self.get_required_series() + [group_by_all]
                    )
                    # 1. Group by `group_by_all` column
                    data = self._group_by_all(series_dict, col_name=group_by_all)

                return self._calculate_group_by_all(
                    data,  # should be a DataFrameGroupBy
                    return_df,
                    show_progress=show_progress,
                    n_jobs=n_jobs,
                    f_handler=f_handler,
                )

        # Sort output index if segment indices are not provided
        sort_output_index = segment_start_idxs is None and segment_end_idxs is None

        # Convert to numpy array (if necessary)
        if segment_start_idxs is not None:
            segment_start_idxs = FeatureCollection._process_segment_idxs(
                segment_start_idxs
            )
        if segment_end_idxs is not None:
            segment_end_idxs = FeatureCollection._process_segment_idxs(segment_end_idxs)

        if segment_start_idxs is not None and segment_end_idxs is not None:
            # Check if segment indices have same length and whether every start idx
            # <= end idx
            _check_start_end_array(segment_start_idxs, segment_end_idxs)
            # Check if there is either 1 or No(ne) window value for every output name -
            # input_series combination
            self._check_no_multiple_windows(
                error_case="When using both `segment_start_idxs` and `segment_end_idxs`"
            )

        if segment_start_idxs is None or segment_end_idxs is None:
            assert all(
                fd.window is not None
                for fd in flatten(self._feature_desc_dict.values())
            ), (
                "Each feature descriptor must have a window when not both "
                + "segment_start_idxs and segment_end_idxs are provided"
            )

        if stride is None and segment_start_idxs is None and segment_end_idxs is None:
            assert all(
                fd.stride is not None
                for fd in flatten(self._feature_desc_dict.values())
            ), (
                "Each feature descriptor must have a stride when no stride or "
                + "segment indices are passed to this method!"
            )
        elif stride is not None and (
            segment_start_idxs is not None or segment_end_idxs is not None
        ):
            raise ValueError(
                "The stride and any segment index argument cannot be set together! "
                + "At least one of both should be None."
            )

        if stride is not None:
            # Verify whether the stride complies with the input data dtype
            stride = [
                parse_time_arg(s) if isinstance(s, str) else s for s in to_list(stride)
            ]
            self._check_feature_descriptors(skip_none=False, calc_stride=stride)

        # Convert the data to a series_dict
        series_dict = FeatureCollection._data_to_series_dict(
            data, self.get_required_series()
        )

        # Determine the bounds of the series dict items and slice on them
        # TODO: is dit wel nodig `hier? want we doen dat ook in de strided rolling
        start, end = _determine_bounds(bound_method, list(series_dict.values()))
        series_dict = {
            n: s.loc[
                s.index.dtype.type(start) : s.index.dtype.type(end)
            ]  # TODO: check memory efficiency of ths
            for n, s, in series_dict.items()
        }

        # Note: this variable has a global scope so this is shared in multiprocessing
        # TODO: try to make this more efficient (but is not really the bottleneck)
        global get_stroll_func
        get_stroll_func = self._stroll_feat_generator(
            series_dict,
            calc_stride=stride,
            segment_start_idxs=segment_start_idxs,
            segment_end_idxs=segment_end_idxs,
            start_idx=start,
            end_idx=end,
            window_idx=window_idx,
            include_final_window=include_final_window,
            approve_sparsity=approve_sparsity,
        )

        return self._calculate_feature_list(
            self._executor_stroll,
            n_jobs,
            show_progress,
            return_df,
            sort_output_index,
            f_handler,
        )

    def serialize(self, file_path: Union[str, Path]) -> None:
        """Serialize this FeatureCollection instance.

        Parameters
        ----------
        file_path : Union[str, Path]
            The path where the `FeatureCollection` will be serialized.

        Note
        -----
        As we use [Dill](https://github.com/uqfoundation/dill){:target="_blank"} to
        serialize the files, we can **also serialize functions which are defined in
        the local scope, like lambdas.**

        """
        with open(file_path, "wb") as f:
            dill.dump(self, f, recurse=True)

    def reduce(self, feat_cols_to_keep: List[str]) -> FeatureCollection:
        """Create a reduced FeatureCollection instance based on `feat_cols_to_keep`.

        For example, this is useful to optimize feature-extraction inference
        (for your selected features) after performing a feature-selection procedure.

        Parameters
        ----------
        feat_cols_to_keep: List[str]
            A subset of the feature collection instance its column names.
            This corresponds to the columns / names of the output from `calculate`
            method that you want to keep.

        Returns
        -------
        FeatureCollection
            A new FeatureCollection object, which only withholds the FeatureDescriptors
            which constitute the `feat_cols_to_keep` output.

        Note
        ----
        Some FeatureDescriptor objects may have multiple **output-names**.<br>
        Hence, if you only want to retain _a subset_ of that FeatureDescriptor its
        feature outputs, you will still get **all features** as the new
        FeatureCollection is constructed by applying a filter on de FeatureDescriptor
        list and we thus not alter these FeatureDescriptor objects themselves.

        """
        # dict in which we store all the { output_col_name : (UUID, FeatureDescriptor) }
        # items of our current FeatureCollection object
        manual_window = False
        if any(c.endswith("w=manual") for c in feat_cols_to_keep):
            assert all(c.endswith("w=manual") for c in feat_cols_to_keep)
            # As the windows are created manual, the FeatureCollection cannot contain
            # multiple windows for the same output name - input_series combination
            self._check_no_multiple_windows(
                error_case="When reducing a FeatureCollection with manual windows"
            )
            manual_window = True
        feat_col_fd_mapping: Dict[str, Tuple[str, FeatureDescriptor]] = {}
        for (s_names, window), fd_list in self._feature_desc_dict.items():
            window = "manual" if manual_window else self._ws_to_str(window)
            for fd in fd_list:
                # As a single FeatureDescriptor can have multiple output col names, we
                # create a unique identifier for each FeatureDescriptor (on which we
                # will apply set-like operations later on to only retain all the unique
                # FeatureDescriptors)
                uuid_str = str(uuid.uuid4())
                for output_name in fd.function.output_names:
                    # Reconstruct the feature column name
                    feat_col_name = StridedRolling.construct_output_index(
                        series_keys=s_names, feat_name=output_name, win_str=window
                    )
                    feat_col_fd_mapping[feat_col_name] = (uuid_str, fd)

        assert all(fc in feat_col_fd_mapping for fc in feat_cols_to_keep)

        # Collect (uuid, FeatureDescriptor) for the feat_cols_to_keep
        fd_subset: List[Tuple[str, FeatureDescriptor]] = [
            feat_col_fd_mapping[fc] for fc in feat_cols_to_keep
        ]

        # Reduce to unique feature descriptor objects (based on uuid) and create a new
        # FeatureCollection for their deepcopy's.
        seen_uuids = set()
        fds = []
        for uuid_str, fd in fd_subset:
            if uuid_str not in seen_uuids:
                seen_uuids.add(uuid_str)
                fds.append(deepcopy(fd))

        return FeatureCollection(feature_descriptors=fds)

    @staticmethod
    def _ws_to_str(window_or_stride: Any) -> str:
        """Convert the window/stride value to a (shortend) string representation."""
        if isinstance(window_or_stride, pd.Timedelta):
            return timedelta_to_str(window_or_stride)
        else:
            return str(window_or_stride)

    def __repr__(self) -> str:
        """Representation string of a FeatureCollection."""
        feature_keys = sorted(set(k[0] for k in self._feature_desc_dict.keys()))
        output_str = ""
        for feature_key in feature_keys:
            output_str += f"{'|'.join(feature_key)}: ("
            keys = (x for x in self._feature_desc_dict.keys() if x[0] == feature_key)
            for _, win_size in keys:
                output_str += "\n\twin: "
                win_str = self._ws_to_str(win_size)
                output_str += f"{win_str:<6}: ["
                for feat_desc in self._feature_desc_dict[feature_key, win_size]:
                    stride_str = feat_desc.stride
                    if stride_str is not None:
                        stride_str = [self._ws_to_str(s) for s in stride_str]
                    output_str += f"\n\t\t{feat_desc._func_str}"
                    output_str += f"    stride: {stride_str},"
                output_str += "\n\t]"
            output_str += "\n)\n"
        return output_str

Classes

class FeatureCollection (feature_descriptors=None)
Expand source code
class FeatureCollection:
    """Create a FeatureCollection.

    Parameters
    ----------
    feature_descriptors : Union[FeatureDescriptor, MultipleFeatureDescriptors, FeatureCollection, List[Union[FeatureDescriptor, MultipleFeatureDescriptors, FeatureCollection]]], optional
        Initial (list of) feature(s) to add to collection, by default None

    Notes
    -----
    * The `series_name` property of the `FeatureDescriptor`s should **not withhold a "|"
      character**, since "|" is used to join the series names of features which use
      multiple series as input).<br>
      e.g.<br>
        * `ACC|x` is **not** allowed as series name, as this is ambiguous and could
          represent that this feature is constructed with a combination of the `ACC`
          and `x` signal.<br>
          Note that `max|feat` is allowed as feature output name.
    * Both the `series_name` and `output_name` property of the `FeatureDescriptor`s
      **should not withhold "__"** in its string representations. This constraint is
      mainly made for readability purposes.

    The two statements above will be asserted

    """

    def __init__(
        self,
        feature_descriptors: Optional[
            Union[
                FeatureDescriptor,
                MultipleFeatureDescriptors,
                FeatureCollection,
                Sequence[
                    Union[
                        FeatureDescriptor, MultipleFeatureDescriptors, FeatureCollection
                    ]
                ],
            ]
        ] = None,
    ):
        # The feature collection is a dict with keys of type:
        #   tuple(tuple(str), float OR pd.timedelta)
        # The outer tuple's values correspond to (series_key(s), window)
        self._feature_desc_dict: Dict[
            Tuple[Tuple[str, ...], Union[float, pd.Timedelta]], List[FeatureDescriptor]
        ] = {}

        if feature_descriptors:
            self.add(feature_descriptors)

    def get_required_series(self) -> List[str]:
        """Return all required series names for this feature collection.

        Return the list of series names that are required in order to calculate all the
        features (defined by the `FeatureDescriptor` objects) of this feature
        collection.

        Returns
        -------
        List[str]
            List of all the required series names.

        """
        return list(
            set(flatten([fr_key[0] for fr_key in self._feature_desc_dict.keys()]))
        )

    def get_nb_output_features(self) -> int:
        """Return the number of output features in this feature collection.

        Returns
        -------
        int
            The number of output features in this feature collection.

        """
        fd_list: Iterable[FeatureDescriptor] = flatten(self._feature_desc_dict.values())
        return sum(fd.get_nb_output_features() for fd in fd_list)

    def _get_nb_output_features_without_window(self) -> int:
        """Return the number of output features in this feature collection, without
        using the window as a unique identifier.

        This is relevant for when the window value(s) are overridden by passing
        `segment_start_idxs` and `segment_end_idxs` to the `calculate` method.

        Returns
        -------
        int:
            The number of output features in this feature collection without using the
            window as a unique identifier.

        """
        return len(
            set(
                (series, o)
                for (series, _), fd_list in self._feature_desc_dict.items()
                for fd in fd_list
                for o in fd.function.output_names
            )
        )

    def _get_nb_feat_funcs(self) -> int:
        return sum(map(len, self._feature_desc_dict.values()))

    @staticmethod
    def _get_collection_key(
        feature: FeatureDescriptor,
    ) -> Tuple[Tuple[str, ...], Union[pd.Timedelta, float, None]]:
        # Note: `window` property can be either a pd.Timedelta or a float or None
        # assert feature.window is not None
        return feature.series_name, feature.window

    def _check_feature_descriptors(
        self,
        skip_none: bool,
        calc_stride: Optional[Union[float, pd.Timedelta, None]] = None,
    ) -> None:
        """Verify whether all added FeatureDescriptors imply the same-input data type.

        If this condition is not met, a warning will be raised.

        Parameters
        ----------
        skip_none: bool
            Whether to include None stride values in the checks.
        calc_stride: Union[float, pd.Timedelta, None], optional
            The `FeatureCollection.calculate` its stride argument, by default None.
            This stride takes precedence over a `FeatureDescriptor` its stride when
            it is not None.

        """
        dtype_set = set()
        for series_names, win in self._feature_desc_dict.keys():
            for fd in self._feature_desc_dict[(series_names, win)]:
                stride = calc_stride if calc_stride is not None else fd.stride
                if skip_none and stride is None:
                    dtype_set.add(AttributeParser.determine_type(win))
                else:
                    dtype_set.add(
                        AttributeParser.determine_type([win] + to_list(stride))
                    )

        if len(dtype_set) > 1:
            warnings.warn(
                "There are multiple FeatureDescriptor window-stride "
                + f"datatypes present in this FeatureCollection, i.e.: {dtype_set}",
                category=RuntimeWarning,
            )

    def _add_feature(self, feature: FeatureDescriptor) -> None:
        """Add a `FeatureDescriptor` instance to the collection.

        Parameters
        ----------
        feature : FeatureDescriptor
            The feature that will be added to this feature collection.

        """
        # Check whether the `|` is not present in the series
        assert not any("|" in s_name for s_name in feature.get_required_series())
        # Check whether the '__" is not present in the series and function output names
        assert not any(
            "__" in output_name for output_name in feature.function.output_names
        )
        assert not any("__" in s_name for s_name in feature.get_required_series())

        series_win_stride_key = self._get_collection_key(feature)
        if series_win_stride_key in self._feature_desc_dict.keys():
            added_output_names = flatten(
                f.function.output_names
                for f in self._feature_desc_dict[series_win_stride_key]
            )
            # Check that not a feature with the same output_name(s) is already added
            # for the series_win_stride_key
            assert not any(
                output_name in added_output_names
                for output_name in feature.function.output_names
            ), f"Feature with output name(s) {feature.function.output_names} duplicated"
            self._feature_desc_dict[series_win_stride_key].append(feature)
        else:
            self._feature_desc_dict[series_win_stride_key] = [feature]

    def add(
        self,
        features: Union[
            FeatureDescriptor,
            MultipleFeatureDescriptors,
            FeatureCollection,
            Sequence[
                Union[FeatureDescriptor, MultipleFeatureDescriptors, FeatureCollection]
            ],
        ],
    ) -> None:
        """Add feature(s) to the FeatureCollection.

        Parameters
        ----------
        features : Union[FeatureDescriptor, MultipleFeatureDescriptors, FeatureCollection, List[Union[FeatureDescriptor, MultipleFeatureDescriptors, FeatureCollection]]]
            Feature(s) (containers) whose contained features will be added.

        Raises
        ------
        TypeError
            Raised when an item within `features` is not an instance of
            [`MultipleFeatureDescriptors`, `FeatureDescriptors`, `FeatureCollection`].

        """
        # Convert to list if necessary
        features = to_list(features)

        for feature in features:
            if isinstance(feature, MultipleFeatureDescriptors):
                self.add(feature.feature_descriptions)
            elif isinstance(feature, FeatureDescriptor):
                self._add_feature(feature)
            elif isinstance(feature, FeatureCollection):
                # List needs to be flattened
                self.add(list(flatten(feature._feature_desc_dict.values())))
            else:
                raise TypeError(f"type: {type(feature)} is not supported - {feature}")

        # After adding the features, check whether the descriptors are compatible
        self._check_feature_descriptors(skip_none=True)

    @staticmethod
    def _executor_stroll(idx: int) -> pd.DataFrame:
        """Executor function for the StridedRolling.apply_func method.

        Strided rolling feature calculation occurs when either;
        - a `window` and `stride` argument are stored in the `FeatureDescriptor` object
        - the `window` is stored in the `FeatureDescriptor` object and the `stride`
          argument is passed to the `calculate` method, potentially overriding the
          `stride`
        - segment indices are passed to the `calculate` method
        - a `group_by_consecutive` argument is passed to the `calculate` method (since
          we calculate the segment indices for the consecutive groups)

        This method uses the global `get_stroll_func` function, which returns the
        StridedRolling object and the function that needs to be applied to the
        StridedRolling object. Using a global function is necessary to facilitate
        multiprocessing.
        """
        # Uses the global get_stroll_func
        stroll, function = get_stroll_func(idx)
        return stroll.apply_func(function)  # execution time is logged in apply_func

    @staticmethod
    def _executor_grouped(idx: int) -> pd.DataFrame:
        """Executor function for grouped feature calculation.

        Grouped feature calculation occurs when either;
        - a `group_by_all` argument is passed to the `calculate` method
        - a `DataFrameGroupBy` is passed as `data` argument to the `calculate` method

        Note that passing a `group_by_consecutive` argument to the `calculate` method
        will not use this executor function, but will use the `_executor_stroll` as
        executor function (since we calculate the segment indices for the consecutive
        groups).

        This method uses the global `get_group_func` function, which returns a
        pd.DataFrame (containing only the necessary data for the function) and the
        function that needs to be applied to the pd.DataFrame. In addition, the global
        `group_indices` and `group_id_name` are used to access the grouped data and the
        group id name respectively. Using a global function is necessary to facilitate
        multiprocessing.
        """
        # Uses the global get_group_func, group_indices, and group_id_name
        data, function = get_group_func(idx)
        group_ids = group_indices.keys()  # group_ids are the keys of the group_indices
        cols_tuple = tuple(data.columns.values)

        t_start = time.perf_counter()

        # Wrap the function to handle multiple inputs and convert the inputs to numpy
        # array if necessary
        f = function
        if function.input_type is np.array:

            def f(x: pd.DataFrame) -> Any:
                # pass the inputs as positional arguments of numpy array type
                return function(*[x[c].values for c in cols_tuple])

        else:  # function.input_type is pd.Series

            def f(x: pd.DataFrame) -> Any:
                # pass the inputs as positional arguments of pd.Series type
                return function(*[x[c] for c in cols_tuple])

        # Function execution over the grouped data (accessed by using the group_indices)
        out = np.array(list(map(f, [data.iloc[idx] for idx in group_indices.values()])))

        # Aggregate function output in a dictionary
        output_names = [
            StridedRolling.construct_output_index(
                cols_tuple, feat_name, win_str="manual"
            )
            for feat_name in function.output_names
        ]
        feat_out = _process_func_output(out, group_ids, output_names, str(function))

        # Log the function execution time
        _log_func_execution(
            t_start, function, cols_tuple, "manual", "manual", output_names
        )

        return pd.DataFrame(feat_out, index=group_ids).rename_axis(index=group_id_name)

    def _stroll_feat_generator(
        self,
        series_dict: Dict[str, pd.Series],
        calc_stride: Union[List[Union[float, pd.Timedelta]], None],
        segment_start_idxs: Union[np.ndarray, None],
        segment_end_idxs: Union[np.ndarray, None],
        start_idx: Any,
        end_idx: Any,
        window_idx: str,
        include_final_window: bool,
        approve_sparsity: bool,
    ) -> Callable[[int], Tuple[StridedRolling, FuncWrapper]]:
        # --- Future work ---
        # We could also make the StridedRolling creation multithreaded
        # Very low priority because the STROLL __init__ is rather efficient!
        keys_wins_strides = list(self._feature_desc_dict.keys())
        lengths = np.cumsum(
            [len(self._feature_desc_dict[k]) for k in keys_wins_strides]
        )

        def get_stroll_function(idx: int) -> Tuple[StridedRolling, FuncWrapper]:
            key_idx = np.searchsorted(lengths, idx, "right")  # right bc idx starts at 0
            key, win = keys_wins_strides[key_idx]

            feature = self._feature_desc_dict[keys_wins_strides[key_idx]][
                idx - lengths[key_idx]
            ]
            stride = feature.stride if calc_stride is None else calc_stride
            function: FuncWrapper = feature.function
            # The factory method will instantiate the right StridedRolling object
            stroll_arg_dict = dict(
                data=[series_dict[k] for k in key],
                window=win,
                strides=stride,
                segment_start_idxs=segment_start_idxs,
                segment_end_idxs=segment_end_idxs,
                start_idx=start_idx,
                end_idx=end_idx,
                window_idx=window_idx,
                include_final_window=include_final_window,
                approve_sparsity=approve_sparsity,
                func_data_type=function.input_type,
            )
            stroll = StridedRollingFactory.get_segmenter(**stroll_arg_dict)
            return stroll, function

        return get_stroll_function

    def _group_feat_generator(
        self,
        grouped_df: pd.api.typing.DataFrameGroupBy,
    ) -> Callable[[int], Tuple[pd.api.typing.DataFrameGroupBy, FuncWrapper,],]:
        """Return a function that returns the necessary columns of the grouped data and
        the function that needs to be applied to the grouped data.

        Note that the function does not return groups, but rather the necessary columns
        of the grouped data (i.e. the data on which the function needs to be applied).
        To access the groups, the global `group_indices` and `group_id_name` are used.
        """
        keys_wins = list(self._feature_desc_dict.keys())
        lengths = np.cumsum([len(self._feature_desc_dict[k]) for k in keys_wins])

        def get_group_function(
            idx: int,
        ) -> Tuple[pd.api.typing.DataFrameGroupBy, FuncWrapper,]:
            key_idx = np.searchsorted(lengths, idx, "right")  # right bc idx starts at 0
            key, win = keys_wins[key_idx]

            feature = self._feature_desc_dict[keys_wins[key_idx]][
                idx - lengths[key_idx]
            ]
            function: FuncWrapper = feature.function
            return grouped_df.obj[list(key)], function

        return get_group_function

    def _check_no_multiple_windows(self, error_case: str) -> None:
        """Check whether there are no multiple windows in the feature collection.

        Parameters
        ----------
        error_case : str
            The case in which no multiple windows are allowed.

        """
        assert (
            self._get_nb_output_features_without_window()
            == self.get_nb_output_features()
        ), (
            error_case
            + "; each output name - series_input combination can only have 1 window"
            + " (or None)"
        )

    @staticmethod
    def _data_to_series_dict(
        data: Union[pd.DataFrame, pd.Series, List[Union[pd.Series, pd.DataFrame]]],
        required_series: List[str],
    ) -> Dict[str, pd.Series]:
        series_dict: Dict[str, pd.Series] = {}
        for s in to_series_list(data):
            if not s.index.is_monotonic_increasing:
                warnings.warn(
                    f"The index of series '{s.name}' is not monotonic increasing. "
                    + "The series will be sorted by the index.",
                    RuntimeWarning,
                )
                s = s.sort_index(ascending=True, inplace=False, ignore_index=False)

            # Assert the assumptions we make!
            assert s.index.is_monotonic_increasing

            if s.name in required_series:
                series_dict[str(s.name)] = s

        return series_dict

    @staticmethod
    def _process_segment_idxs(
        segment_idxs: Union[list, np.ndarray, pd.Series, pd.Index]
    ) -> np.ndarray:
        if hasattr(segment_idxs, "values"):
            segment_idxs = segment_idxs.values
        segment_idxs = np.asarray(segment_idxs)
        if segment_idxs.ndim > 1:
            segment_idxs = segment_idxs.squeeze()  # remove singleton dimensions
        return segment_idxs

    @staticmethod
    def _group_by_all(
        series_dict: Dict[str, pd.Series], col_name: str
    ) -> pd.api.typing.DataFrameGroupBy:
        """Group all `column_name` values and return the grouped data.

        GroupBy ignores all rows with NaN values for the column on which we group.

        Parameters
        ----------
        series_dict : Dict[str, pd.Series]
            Input data.
        col_name : str
            The column name on which the grouping will need to take place.

        Returns
        -------
        pd.api.typing.DataFrameGroupBy
            A `DataFrameGroupBy` object, with the group names as keys and the indices
            as values.

        """
        df = pd.DataFrame(series_dict)
        assert col_name in df.columns

        # Check if there are nan values in the column on which we group
        if df[col_name].isna().any():
            warnings.warn(
                f"NaN values were found in the column '{col_name}' (when expanding the "
                + f"data to a pd.DataFrame which contains {df.columns}. "
                + "Rows with NaN values for the grouping column will be ignored.",
                RuntimeWarning,
            )

        # GroupBy ignores all rows with NaN values for the column on which we group
        return df.groupby(col_name)

    def _calculate_group_by_all(
        self,
        grouped_data: pd.api.typing.DataFrameGroupBy,
        return_df: bool,
        show_progress: bool,
        n_jobs: Optional[int],
        f_handler: Optional[logging.FileHandler],
    ) -> Union[List[pd.DataFrame], pd.DataFrame]:
        """Calculate features on each group of the grouped data.

        Parameters
        ----------
        grouped_data : pd.api.typing.DataFrameGroupBy
            The grouped data.
        return_df: bool, optional
            Whether the output needs to be a DataFrame or a list thereof.
        show_progress: bool, optional
            Whether to show a progress bar.
        n_jobs: int, optional
            The number of jobs to run in parallel.
        f_handler: logging.FileHandler, optional
            The file handler that is used to log the function execution times.

        .. Note::
            Is comparable to following pseudo-SQL code:
            ```sql
            SELECT func(x)
            FROM `data`
            GROUP BY ...
            ```
            where `func` is the FeatureDescriptor function and `x` is the name
            on which the FeatureDescriptor operates. The group by is already done by
            passing a `DataFrameGroupBy` object to this method.
        """
        global group_indices, group_id_name, get_group_func
        group_indices = grouped_data.indices  # dict - group_id as key; indices as value
        # since in future versions of pandas grouper will be deprecated
        group_attr = "_grouper" if hasattr(grouped_data, "_grouper") else "grouper"
        group_id_name = getattr(grouped_data, group_attr).names  # name of group col(s)
        get_group_func = self._group_feat_generator(grouped_data)

        # sort_output_index can be set to False, since we want to keep the same order as
        # the group_indices
        return self._calculate_feature_list(
            self._executor_grouped, n_jobs, show_progress, return_df, False, f_handler
        )

    @staticmethod
    def _group_by_consecutive(
        df: Union[pd.Series, pd.DataFrame], col_name: Optional[str] = None
    ) -> pd.DataFrame:
        """Group consecutive `col_name` values in a single DataFrame.

        This is especially useful if you want to represent sparse data in a more
        compact format.

        Parameters
        ----------
        df : Union[pd.Series, pd.DataFrame]
            Input data.
        col_name : str, optional
            If a dataFrame is passed, you will need to specify the `col_name` on which
            the consecutive-grouping will need to take place.

        Returns
        -------
        pd.DataFrame
            A new `DataFrame` view, with columns:
            [`start`, `end`, `col_name`], representing the
            start- and endtime of the consecutive range, and the col_name's consecutive
            values.
        """
        if type(df) == pd.Series:
            col_name = df.name
            df = df.to_frame()

        assert col_name in df.columns
        assert col_name not in [
            "start",
            "end",
        ], "Grouping column cannot be 'start' or 'end'"

        # Check if there are nan values in the column on which we group
        if df[col_name].isna().any():
            warnings.warn(
                f"NaN values were found in the column '{col_name}' (when expanding the "
                + f"data to a pd.DataFrame which contains {df.columns}. "
                + "Rows with NaN values for the grouping column will be ignored.",
                RuntimeWarning,
            )

        # Drop all rows with NaN values for the column on which we group
        df.dropna(subset=[col_name], inplace=True)

        df_cum = (
            (df[col_name] != df[col_name].shift(1))
            .astype("int")
            .cumsum()
            .rename("value_grp")
            .to_frame()
        )
        df_cum["sequence_idx"] = df.index
        df_cum[col_name] = df[col_name]

        df_cum_grouped = df_cum.groupby("value_grp")
        df_grouped = pd.DataFrame(
            {
                "start": df_cum_grouped["sequence_idx"].first(),
                "end": df_cum_grouped["sequence_idx"].last(),
                col_name: df_cum_grouped[col_name].first(),
            }
        ).reset_index(drop=True)

        return df_grouped

    def _calculate_group_by_consecutive(  # type: ignore[no-untyped-def]
        self,
        data: Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]],
        group_by: str,
        return_df: bool = False,
        **calculate_kwargs,
    ) -> Union[List[pd.DataFrame], pd.DataFrame]:
        """Calculate features on each consecutive group of the data.

        Parameters
        ----------
        data : Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]]]
            Must be time-indexed!
        group_by: str
            Name of column by which to group values.
        return_df: bool, optional
            Whether the output needs to be a DataFrame or a list thereof, by default
            False. If `True` the output dataframes will be merged to a DataFrame with an
            outer merge.
        **calculate_kwargs:
            Keyword arguments that will be passed to the `calculate` method.

        .. Note::
            Is comparable to following pseudo-SQL code:
            ```sql
            SELECT func(x)
            FROM `data`
            GROUP BY `group_by`
            ```
            where `func` is the FeatureDescriptor function and `x` is the name
            on which the FeatureDescriptor operates. Note however that the grouping is
            done on consecutive values of `group_by` (i.e. `group_by` values that are
            the same and are next to each other are grouped together).
        """
        # 0. Transform to dataframe
        series_dict = FeatureCollection._data_to_series_dict(
            data, self.get_required_series() + [group_by]
        )
        df = pd.DataFrame(series_dict)
        # 1. Group by `group_by` column
        consecutive_grouped_by_df = self._group_by_consecutive(df, col_name=group_by)
        # 2. Get start and end idxs of consecutive groups
        start_segment_idxs = consecutive_grouped_by_df["start"]
        end_segment_idxs = start_segment_idxs.shift(-1)
        # fill the nan value with the last end idx
        end_segment_idxs.iloc[-1] = consecutive_grouped_by_df["end"].iloc[-1]
        # because segment end idxs are exclusive, we need to add an offset to the last
        # end idx so that all data gets used
        segment_vals = end_segment_idxs.values
        if is_datetime64_any_dtype(segment_vals):
            segment_vals[-1] += pd.Timedelta(days=1)
        else:
            segment_vals[-1] += 1
        # 3. Calculate features
        try:
            # Filter out the warnings that are raised when segment indices are passed
            # (since users expect irregular window sizes when grouping)
            with warnings.catch_warnings():
                warnings.filterwarnings(
                    "ignore", category=RuntimeWarning, message="^.*segment indexes.*$"
                )
                warnings.filterwarnings(
                    "ignore", category=RuntimeWarning, message="^.*gaps.*$"
                )
                # 3.1. Calculate features using the groupby segment idxs
                calc_results = self.calculate(
                    data=df,
                    segment_start_idxs=start_segment_idxs,
                    segment_end_idxs=end_segment_idxs,
                    **calculate_kwargs,
                )

            # 3.2 Concatenate results and add the group_by column as well as the
            # start and end idxs of the segments
            calc_result = pd.concat(calc_results, join="outer", copy=False, axis=1)
            calc_result.reset_index(inplace=True, drop=True)
            calc_result[group_by] = consecutive_grouped_by_df[group_by]
            calc_result["__start"] = consecutive_grouped_by_df["start"]
            calc_result["__end"] = consecutive_grouped_by_df["end"]

            if return_df:
                return calc_result
            else:
                return [calc_result[col] for col in calc_result.columns]

        except Exception as e:
            raise RuntimeError(
                f"An exception was raised during feature extraction:\n{e}"
            )

    @staticmethod
    def _process_njobs(n_jobs: Union[int, None], nb_funcs: int) -> int:
        """Process the number of jobs to run in parallel.

        On Windows no multiprocessing is supported, see
        https://github.com/predict-idlab/tsflex/issues/51

        Parameters
        ----------
        n_jobs : Union[int, None]
            The number of jobs to run in parallel.
        nb_funcs : int
            The number of feature functions.

        Returns
        -------
        int
            The number of jobs to run in parallel.

        """
        if os.name == "nt":  # On Windows no multiprocessing is supported
            n_jobs = 1
        else:
            n_jobs = parse_n_jobs(n_jobs)
        return min(n_jobs, nb_funcs)

    def _calculate_feature_list(
        self,
        executor: Callable[[int], pd.DataFrame],
        n_jobs: Union[int, None],
        show_progress: bool,
        return_df: bool,
        sort_output_index: bool,
        f_handler: Optional[logging.FileHandler],
    ) -> Union[List[pd.DataFrame], pd.DataFrame]:
        """Calculate the features for the given executor.

        Parameters
        ----------
        executor : Callable[[int], pd.DataFrame]
            The executor function that will be used to calculate the features.
        n_jobs : Union[int, None]
            The number of jobs to run in parallel.
        show_progress : bool
            Whether to show a progress bar.
        return_df : bool
            Whether to return a DataFrame or a list of DataFrames.
        sort_output_index : bool
            Whether to sort the output index. Note that this is only relevant when
            `return_df` is set to `True`.
        f_handler : logging.FileHandler
            The file handler that is used to log the function execution times.

        Returns
        -------
        Union[List[pd.DataFrame], pd.DataFrame]
            The calculated features.

        """
        nb_feat_funcs = self._get_nb_feat_funcs()
        n_jobs = FeatureCollection._process_njobs(n_jobs, nb_feat_funcs)

        calculated_feature_list: Optional[List[pd.DataFrame]] = None

        if n_jobs in [0, 1]:
            # No multiprocessing
            idxs = range(nb_feat_funcs)
            if show_progress:
                idxs = tqdm(idxs)
            try:
                calculated_feature_list = [executor(idx) for idx in idxs]
            except Exception:
                traceback.print_exc()
        else:
            # Multiprocessing
            with Pool(processes=n_jobs) as pool:
                results = pool.imap_unordered(executor, range(nb_feat_funcs))
                if show_progress:
                    results = tqdm(results, total=nb_feat_funcs)
                try:
                    calculated_feature_list = [f for f in results]
                except Exception:
                    traceback.print_exc()
                    pool.terminate()
                finally:
                    # Close & join because: https://github.com/uqfoundation/pathos/issues/131
                    pool.close()
                    pool.terminate()
                    pool.join()

        # Close the file handler (this avoids PermissionError: [WinError 32])
        if f_handler is not None:
            f_handler.close()
            logger.removeHandler(f_handler)

        if calculated_feature_list is None:
            raise RuntimeError(
                "Feature Extraction halted due to error while extracting one "
                + "(or multiple) feature(s)! See stack trace above."
            )

        if return_df:
            # Concatenate & sort the columns
            df = pd.concat(
                calculated_feature_list,
                axis=1,
                join="outer",
                copy=False,
                sort=sort_output_index,
            )
            return df.reindex(sorted(df.columns), axis=1)
        else:
            return calculated_feature_list

    def calculate(
        self,
        data: Union[
            pd.Series,
            pd.DataFrame,
            List[Union[pd.Series, pd.DataFrame]],
            pd.core.groupby.DataFrameGroupby,
        ],
        stride: Optional[Union[float, str, pd.Timedelta, List, None]] = None,
        segment_start_idxs: Optional[
            Union[list, np.ndarray, pd.Series, pd.Index]
        ] = None,
        segment_end_idxs: Optional[Union[list, np.ndarray, pd.Series, pd.Index]] = None,
        return_df: bool = False,
        window_idx: str = "end",
        include_final_window: bool = False,
        group_by_all: Optional[str] = None,  # TODO: support multiple columns
        group_by_consecutive: Optional[str] = None,  # TODO: support multiple columns
        bound_method: str = "inner",
        approve_sparsity: bool = False,
        show_progress: bool = False,
        logging_file_path: Optional[Union[str, Path]] = None,
        n_jobs: Optional[int] = None,
    ) -> Union[List[pd.DataFrame], pd.DataFrame]:
        """Calculate features on the passed data.

        Parameters
        ----------
        data : Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]], pd.core.groupby.DataFrameGroupby]
            Dataframe or Series or list thereof, with all the required data for the
            feature calculation. \n
            **Assumptions**: \n
            * each Series / DataFrame must have a sortable index. This index represents
            the sequence position of the corresponding values, the index can be either
            numeric or a ``pd.DatetimeIndex``.
            * each Series / DataFrame index must be comparable with all others
            * we assume that each series-name / dataframe-column-name is unique.
            Can also be a `DataFrameGroupBy` object, in which case the expected
            behaviour is similar to grouping by all values in `group_by_all` (i.e.,
            for each group, the features are calculated on the group's data).
        stride: Union[float, str, pd.Timedelta, List[Union[float, str, pd.Timedelta], None], optional
            The stride size. By default None. This argument supports multiple types: \n
            * If None, the stride of the `FeatureDescriptor` objects will be used.
            * If the type is an `float` or an `int`, its value represents the series:\n
                - its stride **range** when a **non time-indexed** series is passed.
                - the stride in **number of samples**, when a **time-indexed** series
                is passed (must then be and `int`)
            * If the stride's type is a `pd.Timedelta`, the stride size represents
            the stride-time delta. The passed data **must have a time-index**.
            * If a `str`, it must represent a stride-time-delta-string. Hence, the
            **passed data must have a time-index**. \n
            .. Note::
                When set, this stride argument takes precedence over the stride property
                of the `FeatureDescriptor`s in this `FeatureCollection` (i.e., when a
                not None value for `stride` passed to this method).
        segment_start_idxs: Union[list, np.ndarray, pd.Series, pd.Index], optional
            The start indices of the segments. If None, the start indices will be
            computed from the data using either:\n
            - the `segment_end_idxs` - the `window` size property of the
                `FeatureDescriptor` in this `FeatureCollection` (if `segment_end_idxs`
                is not None)
            - strided-window rolling on the data using `window` and `stride` of the
                `FeatureDescriptor` in this `FeatureCollection` (if `segment_end_idxs`
                 is also None). (Note that the `stride` argument of this method takes
                 precedence over the `stride` property of the `FeatureDescriptor`s).
            By default None.
        segment_end_idxs: Union[list, np.ndarray, pd.Series, pd.Index], optional
            The end indices for the segmented windows. If None, the end indices will be
            computed from the data using either:\n
            - the `segment_start_idxs` + the `window` size property of the
                `FeatureDescriptor` in this `FeatureCollection` (if `segment_start_idxs`
                is not None)
            - strided-window rolling on the data using `window` and `stride` of the
                `FeatureDescriptor` in this `FeatureCollection` (if `segment_start_idxs`
                 is also None). (Note that the `stride` argument of this method takes
                 precedence over the `stride` property of the `FeatureDescriptor`s).
            By default None.

            ..Note::
                When passing both `segment_start_idxs` and `segment_end_idxs`, these two
                arguments must have the same length and every start index must be <=
                than the corresponding end index.
                Note that passing both arguments, discards any meaning of the `window`
                and `stride` values (as these segment indices define the segmented data,
                and thus no strided-window rolling index calculation has to be executed).
                As such, the user can create variable-length segmented windows. However,
                in such cases, the user should be weary that the feature functions are
                invariant to these (potentially variable-length) windows.
        return_df : bool, optional
            Whether the output needs to be a DataFrame or a list thereof, by default
            False. If `True` the output dataframes will be merged to a DataFrame with an
            outer merge.
        window_idx : str, optional
            The window's index position which will be used as index for the
            feature_window aggregation. Must be either of: `["begin", "middle", "end"]`.
            by **default "end"**. All features in this collection will use the same
            window_idx.

            ..Note::
                `window_idx`="end" uses the window's end (= right open bound) as
                output index. \n
                `window_idx`="begin" uses the window's start idx (= left closed bound)
                as output index.
        include_final_window : bool, optional
            Whether the final (possibly incomplete) window should be included in the
            strided-window segmentation, by default False.

            .. Note::
                The remarks below apply when `include_final_window` is set to True.
                The user should be aware that the last window *might* be incomplete,
                i.e.;

                - when equally sampled, the last window *might* be smaller than the
                  the other windows.
                - when not equally sampled, the last window *might* not include all the
                  data points (as the begin-time + window-size comes after the last data
                  point).

                Note, that when equally sampled, the last window *will* be a full window
                when:

                - the stride is the sampling rate of the data (or stride = 1 for
                sample-based configurations).<br>
                **Remark**: that when `include_final_window` is set to False, the last
                window (which is a full) window will not be included!
                - *(len * sampling_rate - window_size) % stride = 0*. Remark that the
                  above case is a base case of this.
        group_by_all : str, optional
            The name of the column by which to perform grouping. For each group, the
            features will be calculated. The output that is returned contains this
            `group_by` column as index to allow identifying the groups.
            If this parameter is used, the parameters `stride`, `segment_start_idxs`,
            `segment_end_idxs`, `window_idx` and `include_final_window` will be ignored.
            Rows with NaN values for this column will not be considered (as pandas
            ignores these rows when grouping).
            .. note::
                This is similar as passing a `DataFrameGroupBy` object as `data`
                argument to the `calculate` method, where the `DataFrameGroupBy` object
                is created by calling `data.groupby(group_by_all)`.
        group_by_consecutive: str, optional
            The name of the column by which to perform consecutive grouping. A
            consecutive group is a group of values that are the same and are next to
            each other. For each consecutive group, the features will be calculated.
            The output that is returned contains this `group_by` column to allow
            identifying the groups, and also contains fields [`__start`, "__end"] which
            contain start and end time range for each result row.
            If this parameter is used, the parameters `stride`, `segment_start_idxs`,
            `segment_end_idxs`, `window_idx` and `include_final_window` will be ignored.
            Rows with NaN values for this column will not be considered (as we deem NaN
            not as a groupable value).
            Note that for consecutive grouping, groups can appear multiple times if they
            appear in different time-gaps.

            Example output:
            .. example::
        ```python
                    number_sold__sum__w=manual  store    __start      __end
                0          845                  0     2019-01-01 2019-01-01
                1          357                  3     2019-01-02 2019-01-02
                2          904                  6     2019-01-03 2019-01-03
                3          599                  3     2019-01-04 2019-01-05
                4          871                  0     2019-01-06 2019-01-06
                ...                           ...    ...        ...        ...
        ```
        bound_method: str, optional
            The start-end bound methodology which is used to generate the slice ranges
            when ``data`` consists of multiple series / columns.
            Must be either of: `["inner", "inner-outer", "outer"]`, by default "inner".

            * if ``inner``, the inner-bounds of the series are returned.
            * if ``inner-outer``, the left-inner and right-outer bounds of the series
              are returned.
            * if ``outer``, the outer-bounds of the series are returned.
        approve_sparsity: bool, optional
            Bool indicating whether the user acknowledges that there may be sparsity
            (i.e., irregularly sampled data), by default False.
            If False and sparsity is observed, a warning is raised.
        show_progress: bool, optional
            If True, the progress will be shown with a progressbar, by default False.
        logging_file_path : Union[str, Path], optional
            The file path where the logged messages are stored. If `None`, then no
            logging `FileHandler` will be used and the logging messages are only pushed
            to stdout. Otherwise, a logging `FileHandler` will write the logged messages
            to the given file path. See also the `tsflex.features.logger` module.
        n_jobs : int, optional
            The number of processes used for the feature calculation. If `None`, then
            the number returned by _os.cpu_count()_ is used, by default None. \n
            If n_jobs is either 0 or 1, the code will be executed sequentially without
            creating a process pool. This is very useful when debugging, as the stack
            trace will be more comprehensible.
            .. note::
                Multiprocessed execution is not supported on Windows. Even when,
                `n_jobs` is set > 1, the feature extraction will still be executed
                sequentially.
                Why do we not support multiprocessing on Windows; see this issue
                https://github.com/predict-idlab/tsflex/issues/51

            .. tip::
                It takes on avg. _300ms_ to schedule everything with
                multiprocessing. So if your sequential feature extraction code runs
                faster than ~1s, it might not be worth it to parallelize the process
                (and thus better leave `n_jobs` to 0 or 1).

        Returns
        -------
        Union[List[pd.DataFrame], pd.DataFrame]
            The calculated features.

        Raises
        ------
        KeyError
            Raised when a required key is not found in `data`.

        Notes
        ------
        * The (column-)names of the series in `data` represent the `series_names`.
        * If a `logging_file_path` is provided, the execution (time) info can be
          retrieved by calling `logger.get_feature_logs(logging_file_path)`.
          Be aware that the `logging_file_path` gets cleared before the logger pushes
          logged messages. Hence, one should use a separate logging file for each
          constructed processing and feature instance with this library.


        """

        # Check valid data
        if isinstance(data, list):
            assert all(
                isinstance(d, (pd.Series, pd.DataFrame)) for d in data
            ), "All elements of the data list must be either a Series or a DataFrame!"
        else:
            assert isinstance(
                data, (pd.Series, pd.DataFrame, pd.core.groupby.DataFrameGroupBy)
            ), "The data must be either a Series, a DataFrame or a DataFrameGroupBy!"

        # check valid group_by
        assert group_by_all is None or group_by_consecutive is None, (
            "Only max one of the following parameters can be set: "
            + "`group_by_all` or `group_by_consecutive`"
        )
        assert not (
            (group_by_all is not None or group_by_consecutive is not None)
            and isinstance(data, pd.core.groupby.DataFrameGroupBy)
        ), (
            "Cannot use `group_by_all` or `group_by_consecutive` when `data` is"
            + " already a grouped DataFrame!"
        )

        # Delete other logging handlers
        delete_logging_handlers(logger)
        # Add logging handler (if path provided)
        f_handler = None
        if logging_file_path:
            f_handler = add_logging_handler(logger, logging_file_path)

        if (
            group_by_all
            or group_by_consecutive
            or isinstance(data, pd.core.groupby.DataFrameGroupBy)
        ):
            self._check_no_multiple_windows(
                error_case="When using the groupby behavior"
            )

            # The grouping column must be part of the required series
            if group_by_all:
                # group_by_consecutive should be None (checked by asserts above)
                # data should not be a grouped DataFrame (checked by asserts above)
                assert (
                    group_by_all not in self.get_required_series()
                ), "The `group_by_all` column cannot be part of the required series!"
            elif group_by_consecutive:
                # group_by_all should be None (checked by asserts above)
                # data should not be a grouped DataFrame (checked by asserts above)
                assert group_by_consecutive not in self.get_required_series(), (
                    "The `group_by_consecutive` column cannot be part of the required "
                    + "series!"
                )
                # __start and __end should not be part of the output names
                assert "__start" not in self.get_required_series()
                assert "__end" not in self.get_required_series()

            # if any of the following params are not None, warn that they won't be of use
            # in the grouped calculation
            ignored_params = [
                ("stride", None),
                ("segment_start_idxs", None),
                ("segment_end_idxs", None),
                ("window_idx", "end"),
                ("include_final_window", False),
            ]
            local_params = locals()

            for ip, default_value in ignored_params:
                if local_params[ip] is not default_value:
                    warnings.warn(
                        f"Parameter `{ip}` will be ignored in case of GroupBy feature"
                        + " calculation."
                    )

            if group_by_consecutive:
                # Strided rollling feature extraction will take place
                return self._calculate_group_by_consecutive(
                    data,
                    group_by_consecutive,
                    return_df,
                    bound_method=bound_method,
                    approve_sparsity=approve_sparsity,
                    show_progress=show_progress,
                    logging_file_path=logging_file_path,
                    n_jobs=n_jobs,
                )
            else:
                # Grouped feature extraction will take place
                if not isinstance(data, pd.core.groupby.generic.DataFrameGroupBy):
                    # group_by_all should not be None (checked by asserts above)
                    assert group_by_all is not None
                    # 0. Transform to dataframe
                    series_dict = FeatureCollection._data_to_series_dict(
                        data, self.get_required_series() + [group_by_all]
                    )
                    # 1. Group by `group_by_all` column
                    data = self._group_by_all(series_dict, col_name=group_by_all)

                return self._calculate_group_by_all(
                    data,  # should be a DataFrameGroupBy
                    return_df,
                    show_progress=show_progress,
                    n_jobs=n_jobs,
                    f_handler=f_handler,
                )

        # Sort output index if segment indices are not provided
        sort_output_index = segment_start_idxs is None and segment_end_idxs is None

        # Convert to numpy array (if necessary)
        if segment_start_idxs is not None:
            segment_start_idxs = FeatureCollection._process_segment_idxs(
                segment_start_idxs
            )
        if segment_end_idxs is not None:
            segment_end_idxs = FeatureCollection._process_segment_idxs(segment_end_idxs)

        if segment_start_idxs is not None and segment_end_idxs is not None:
            # Check if segment indices have same length and whether every start idx
            # <= end idx
            _check_start_end_array(segment_start_idxs, segment_end_idxs)
            # Check if there is either 1 or No(ne) window value for every output name -
            # input_series combination
            self._check_no_multiple_windows(
                error_case="When using both `segment_start_idxs` and `segment_end_idxs`"
            )

        if segment_start_idxs is None or segment_end_idxs is None:
            assert all(
                fd.window is not None
                for fd in flatten(self._feature_desc_dict.values())
            ), (
                "Each feature descriptor must have a window when not both "
                + "segment_start_idxs and segment_end_idxs are provided"
            )

        if stride is None and segment_start_idxs is None and segment_end_idxs is None:
            assert all(
                fd.stride is not None
                for fd in flatten(self._feature_desc_dict.values())
            ), (
                "Each feature descriptor must have a stride when no stride or "
                + "segment indices are passed to this method!"
            )
        elif stride is not None and (
            segment_start_idxs is not None or segment_end_idxs is not None
        ):
            raise ValueError(
                "The stride and any segment index argument cannot be set together! "
                + "At least one of both should be None."
            )

        if stride is not None:
            # Verify whether the stride complies with the input data dtype
            stride = [
                parse_time_arg(s) if isinstance(s, str) else s for s in to_list(stride)
            ]
            self._check_feature_descriptors(skip_none=False, calc_stride=stride)

        # Convert the data to a series_dict
        series_dict = FeatureCollection._data_to_series_dict(
            data, self.get_required_series()
        )

        # Determine the bounds of the series dict items and slice on them
        # TODO: is dit wel nodig `hier? want we doen dat ook in de strided rolling
        start, end = _determine_bounds(bound_method, list(series_dict.values()))
        series_dict = {
            n: s.loc[
                s.index.dtype.type(start) : s.index.dtype.type(end)
            ]  # TODO: check memory efficiency of ths
            for n, s, in series_dict.items()
        }

        # Note: this variable has a global scope so this is shared in multiprocessing
        # TODO: try to make this more efficient (but is not really the bottleneck)
        global get_stroll_func
        get_stroll_func = self._stroll_feat_generator(
            series_dict,
            calc_stride=stride,
            segment_start_idxs=segment_start_idxs,
            segment_end_idxs=segment_end_idxs,
            start_idx=start,
            end_idx=end,
            window_idx=window_idx,
            include_final_window=include_final_window,
            approve_sparsity=approve_sparsity,
        )

        return self._calculate_feature_list(
            self._executor_stroll,
            n_jobs,
            show_progress,
            return_df,
            sort_output_index,
            f_handler,
        )

    def serialize(self, file_path: Union[str, Path]) -> None:
        """Serialize this FeatureCollection instance.

        Parameters
        ----------
        file_path : Union[str, Path]
            The path where the `FeatureCollection` will be serialized.

        Note
        -----
        As we use [Dill](https://github.com/uqfoundation/dill){:target="_blank"} to
        serialize the files, we can **also serialize functions which are defined in
        the local scope, like lambdas.**

        """
        with open(file_path, "wb") as f:
            dill.dump(self, f, recurse=True)

    def reduce(self, feat_cols_to_keep: List[str]) -> FeatureCollection:
        """Create a reduced FeatureCollection instance based on `feat_cols_to_keep`.

        For example, this is useful to optimize feature-extraction inference
        (for your selected features) after performing a feature-selection procedure.

        Parameters
        ----------
        feat_cols_to_keep: List[str]
            A subset of the feature collection instance its column names.
            This corresponds to the columns / names of the output from `calculate`
            method that you want to keep.

        Returns
        -------
        FeatureCollection
            A new FeatureCollection object, which only withholds the FeatureDescriptors
            which constitute the `feat_cols_to_keep` output.

        Note
        ----
        Some FeatureDescriptor objects may have multiple **output-names**.<br>
        Hence, if you only want to retain _a subset_ of that FeatureDescriptor its
        feature outputs, you will still get **all features** as the new
        FeatureCollection is constructed by applying a filter on de FeatureDescriptor
        list and we thus not alter these FeatureDescriptor objects themselves.

        """
        # dict in which we store all the { output_col_name : (UUID, FeatureDescriptor) }
        # items of our current FeatureCollection object
        manual_window = False
        if any(c.endswith("w=manual") for c in feat_cols_to_keep):
            assert all(c.endswith("w=manual") for c in feat_cols_to_keep)
            # As the windows are created manual, the FeatureCollection cannot contain
            # multiple windows for the same output name - input_series combination
            self._check_no_multiple_windows(
                error_case="When reducing a FeatureCollection with manual windows"
            )
            manual_window = True
        feat_col_fd_mapping: Dict[str, Tuple[str, FeatureDescriptor]] = {}
        for (s_names, window), fd_list in self._feature_desc_dict.items():
            window = "manual" if manual_window else self._ws_to_str(window)
            for fd in fd_list:
                # As a single FeatureDescriptor can have multiple output col names, we
                # create a unique identifier for each FeatureDescriptor (on which we
                # will apply set-like operations later on to only retain all the unique
                # FeatureDescriptors)
                uuid_str = str(uuid.uuid4())
                for output_name in fd.function.output_names:
                    # Reconstruct the feature column name
                    feat_col_name = StridedRolling.construct_output_index(
                        series_keys=s_names, feat_name=output_name, win_str=window
                    )
                    feat_col_fd_mapping[feat_col_name] = (uuid_str, fd)

        assert all(fc in feat_col_fd_mapping for fc in feat_cols_to_keep)

        # Collect (uuid, FeatureDescriptor) for the feat_cols_to_keep
        fd_subset: List[Tuple[str, FeatureDescriptor]] = [
            feat_col_fd_mapping[fc] for fc in feat_cols_to_keep
        ]

        # Reduce to unique feature descriptor objects (based on uuid) and create a new
        # FeatureCollection for their deepcopy's.
        seen_uuids = set()
        fds = []
        for uuid_str, fd in fd_subset:
            if uuid_str not in seen_uuids:
                seen_uuids.add(uuid_str)
                fds.append(deepcopy(fd))

        return FeatureCollection(feature_descriptors=fds)

    @staticmethod
    def _ws_to_str(window_or_stride: Any) -> str:
        """Convert the window/stride value to a (shortend) string representation."""
        if isinstance(window_or_stride, pd.Timedelta):
            return timedelta_to_str(window_or_stride)
        else:
            return str(window_or_stride)

    def __repr__(self) -> str:
        """Representation string of a FeatureCollection."""
        feature_keys = sorted(set(k[0] for k in self._feature_desc_dict.keys()))
        output_str = ""
        for feature_key in feature_keys:
            output_str += f"{'|'.join(feature_key)}: ("
            keys = (x for x in self._feature_desc_dict.keys() if x[0] == feature_key)
            for _, win_size in keys:
                output_str += "\n\twin: "
                win_str = self._ws_to_str(win_size)
                output_str += f"{win_str:<6}: ["
                for feat_desc in self._feature_desc_dict[feature_key, win_size]:
                    stride_str = feat_desc.stride
                    if stride_str is not None:
                        stride_str = [self._ws_to_str(s) for s in stride_str]
                    output_str += f"\n\t\t{feat_desc._func_str}"
                    output_str += f"    stride: {stride_str},"
                output_str += "\n\t]"
            output_str += "\n)\n"
        return output_str

Create a FeatureCollection.

Parameters

feature_descriptors : Union[FeatureDescriptor, MultipleFeatureDescriptors, FeatureCollection, List[Union[FeatureDescriptor, MultipleFeatureDescriptors, FeatureCollection]]], optional
Initial (list of) feature(s) to add to collection, by default None

Notes

  • The series_name property of the FeatureDescriptors should not withhold a "|" character, since "|" is used to join the series names of features which use multiple series as input).
    e.g.
    • ACC|x is not allowed as series name, as this is ambiguous and could represent that this feature is constructed with a combination of the ACC and x signal.
      Note that max|feat is allowed as feature output name.
  • Both the series_name and output_name property of the FeatureDescriptors should not withhold "__" in its string representations. This constraint is mainly made for readability purposes.

The two statements above will be asserted

Methods

def get_required_series(self)
Expand source code
def get_required_series(self) -> List[str]:
    """Return all required series names for this feature collection.

    Return the list of series names that are required in order to calculate all the
    features (defined by the `FeatureDescriptor` objects) of this feature
    collection.

    Returns
    -------
    List[str]
        List of all the required series names.

    """
    return list(
        set(flatten([fr_key[0] for fr_key in self._feature_desc_dict.keys()]))
    )

Return all required series names for this feature collection.

Return the list of series names that are required in order to calculate all the features (defined by the FeatureDescriptor objects) of this feature collection.

Returns

List[str]
List of all the required series names.
def get_nb_output_features(self)
Expand source code
def get_nb_output_features(self) -> int:
    """Return the number of output features in this feature collection.

    Returns
    -------
    int
        The number of output features in this feature collection.

    """
    fd_list: Iterable[FeatureDescriptor] = flatten(self._feature_desc_dict.values())
    return sum(fd.get_nb_output_features() for fd in fd_list)

Return the number of output features in this feature collection.

Returns

int
The number of output features in this feature collection.
def add(self, features)
Expand source code
def add(
    self,
    features: Union[
        FeatureDescriptor,
        MultipleFeatureDescriptors,
        FeatureCollection,
        Sequence[
            Union[FeatureDescriptor, MultipleFeatureDescriptors, FeatureCollection]
        ],
    ],
) -> None:
    """Add feature(s) to the FeatureCollection.

    Parameters
    ----------
    features : Union[FeatureDescriptor, MultipleFeatureDescriptors, FeatureCollection, List[Union[FeatureDescriptor, MultipleFeatureDescriptors, FeatureCollection]]]
        Feature(s) (containers) whose contained features will be added.

    Raises
    ------
    TypeError
        Raised when an item within `features` is not an instance of
        [`MultipleFeatureDescriptors`, `FeatureDescriptors`, `FeatureCollection`].

    """
    # Convert to list if necessary
    features = to_list(features)

    for feature in features:
        if isinstance(feature, MultipleFeatureDescriptors):
            self.add(feature.feature_descriptions)
        elif isinstance(feature, FeatureDescriptor):
            self._add_feature(feature)
        elif isinstance(feature, FeatureCollection):
            # List needs to be flattened
            self.add(list(flatten(feature._feature_desc_dict.values())))
        else:
            raise TypeError(f"type: {type(feature)} is not supported - {feature}")

    # After adding the features, check whether the descriptors are compatible
    self._check_feature_descriptors(skip_none=True)

Add feature(s) to the FeatureCollection.

Parameters

features : Union[FeatureDescriptor, MultipleFeatureDescriptors, FeatureCollection, List[Union[FeatureDescriptor, MultipleFeatureDescriptors, FeatureCollection]]]
Feature(s) (containers) whose contained features will be added.

Raises

TypeError
Raised when an item within features is not an instance of [MultipleFeatureDescriptors, FeatureDescriptors, FeatureCollection].
def calculate(self, data, stride=None, segment_start_idxs=None, segment_end_idxs=None, return_df=False, window_idx='end', include_final_window=False, group_by_all=None, group_by_consecutive=None, bound_method='inner', approve_sparsity=False, show_progress=False, logging_file_path=None, n_jobs=None)
Expand source code
def calculate(
    self,
    data: Union[
        pd.Series,
        pd.DataFrame,
        List[Union[pd.Series, pd.DataFrame]],
        pd.core.groupby.DataFrameGroupby,
    ],
    stride: Optional[Union[float, str, pd.Timedelta, List, None]] = None,
    segment_start_idxs: Optional[
        Union[list, np.ndarray, pd.Series, pd.Index]
    ] = None,
    segment_end_idxs: Optional[Union[list, np.ndarray, pd.Series, pd.Index]] = None,
    return_df: bool = False,
    window_idx: str = "end",
    include_final_window: bool = False,
    group_by_all: Optional[str] = None,  # TODO: support multiple columns
    group_by_consecutive: Optional[str] = None,  # TODO: support multiple columns
    bound_method: str = "inner",
    approve_sparsity: bool = False,
    show_progress: bool = False,
    logging_file_path: Optional[Union[str, Path]] = None,
    n_jobs: Optional[int] = None,
) -> Union[List[pd.DataFrame], pd.DataFrame]:
    """Calculate features on the passed data.

    Parameters
    ----------
    data : Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]], pd.core.groupby.DataFrameGroupby]
        Dataframe or Series or list thereof, with all the required data for the
        feature calculation. \n
        **Assumptions**: \n
        * each Series / DataFrame must have a sortable index. This index represents
        the sequence position of the corresponding values, the index can be either
        numeric or a ``pd.DatetimeIndex``.
        * each Series / DataFrame index must be comparable with all others
        * we assume that each series-name / dataframe-column-name is unique.
        Can also be a `DataFrameGroupBy` object, in which case the expected
        behaviour is similar to grouping by all values in `group_by_all` (i.e.,
        for each group, the features are calculated on the group's data).
    stride: Union[float, str, pd.Timedelta, List[Union[float, str, pd.Timedelta], None], optional
        The stride size. By default None. This argument supports multiple types: \n
        * If None, the stride of the `FeatureDescriptor` objects will be used.
        * If the type is an `float` or an `int`, its value represents the series:\n
            - its stride **range** when a **non time-indexed** series is passed.
            - the stride in **number of samples**, when a **time-indexed** series
            is passed (must then be and `int`)
        * If the stride's type is a `pd.Timedelta`, the stride size represents
        the stride-time delta. The passed data **must have a time-index**.
        * If a `str`, it must represent a stride-time-delta-string. Hence, the
        **passed data must have a time-index**. \n
        .. Note::
            When set, this stride argument takes precedence over the stride property
            of the `FeatureDescriptor`s in this `FeatureCollection` (i.e., when a
            not None value for `stride` passed to this method).
    segment_start_idxs: Union[list, np.ndarray, pd.Series, pd.Index], optional
        The start indices of the segments. If None, the start indices will be
        computed from the data using either:\n
        - the `segment_end_idxs` - the `window` size property of the
            `FeatureDescriptor` in this `FeatureCollection` (if `segment_end_idxs`
            is not None)
        - strided-window rolling on the data using `window` and `stride` of the
            `FeatureDescriptor` in this `FeatureCollection` (if `segment_end_idxs`
             is also None). (Note that the `stride` argument of this method takes
             precedence over the `stride` property of the `FeatureDescriptor`s).
        By default None.
    segment_end_idxs: Union[list, np.ndarray, pd.Series, pd.Index], optional
        The end indices for the segmented windows. If None, the end indices will be
        computed from the data using either:\n
        - the `segment_start_idxs` + the `window` size property of the
            `FeatureDescriptor` in this `FeatureCollection` (if `segment_start_idxs`
            is not None)
        - strided-window rolling on the data using `window` and `stride` of the
            `FeatureDescriptor` in this `FeatureCollection` (if `segment_start_idxs`
             is also None). (Note that the `stride` argument of this method takes
             precedence over the `stride` property of the `FeatureDescriptor`s).
        By default None.

        ..Note::
            When passing both `segment_start_idxs` and `segment_end_idxs`, these two
            arguments must have the same length and every start index must be <=
            than the corresponding end index.
            Note that passing both arguments, discards any meaning of the `window`
            and `stride` values (as these segment indices define the segmented data,
            and thus no strided-window rolling index calculation has to be executed).
            As such, the user can create variable-length segmented windows. However,
            in such cases, the user should be weary that the feature functions are
            invariant to these (potentially variable-length) windows.
    return_df : bool, optional
        Whether the output needs to be a DataFrame or a list thereof, by default
        False. If `True` the output dataframes will be merged to a DataFrame with an
        outer merge.
    window_idx : str, optional
        The window's index position which will be used as index for the
        feature_window aggregation. Must be either of: `["begin", "middle", "end"]`.
        by **default "end"**. All features in this collection will use the same
        window_idx.

        ..Note::
            `window_idx`="end" uses the window's end (= right open bound) as
            output index. \n
            `window_idx`="begin" uses the window's start idx (= left closed bound)
            as output index.
    include_final_window : bool, optional
        Whether the final (possibly incomplete) window should be included in the
        strided-window segmentation, by default False.

        .. Note::
            The remarks below apply when `include_final_window` is set to True.
            The user should be aware that the last window *might* be incomplete,
            i.e.;

            - when equally sampled, the last window *might* be smaller than the
              the other windows.
            - when not equally sampled, the last window *might* not include all the
              data points (as the begin-time + window-size comes after the last data
              point).

            Note, that when equally sampled, the last window *will* be a full window
            when:

            - the stride is the sampling rate of the data (or stride = 1 for
            sample-based configurations).<br>
            **Remark**: that when `include_final_window` is set to False, the last
            window (which is a full) window will not be included!
            - *(len * sampling_rate - window_size) % stride = 0*. Remark that the
              above case is a base case of this.
    group_by_all : str, optional
        The name of the column by which to perform grouping. For each group, the
        features will be calculated. The output that is returned contains this
        `group_by` column as index to allow identifying the groups.
        If this parameter is used, the parameters `stride`, `segment_start_idxs`,
        `segment_end_idxs`, `window_idx` and `include_final_window` will be ignored.
        Rows with NaN values for this column will not be considered (as pandas
        ignores these rows when grouping).
        .. note::
            This is similar as passing a `DataFrameGroupBy` object as `data`
            argument to the `calculate` method, where the `DataFrameGroupBy` object
            is created by calling `data.groupby(group_by_all)`.
    group_by_consecutive: str, optional
        The name of the column by which to perform consecutive grouping. A
        consecutive group is a group of values that are the same and are next to
        each other. For each consecutive group, the features will be calculated.
        The output that is returned contains this `group_by` column to allow
        identifying the groups, and also contains fields [`__start`, "__end"] which
        contain start and end time range for each result row.
        If this parameter is used, the parameters `stride`, `segment_start_idxs`,
        `segment_end_idxs`, `window_idx` and `include_final_window` will be ignored.
        Rows with NaN values for this column will not be considered (as we deem NaN
        not as a groupable value).
        Note that for consecutive grouping, groups can appear multiple times if they
        appear in different time-gaps.

        Example output:
        .. example::
    ```python
                number_sold__sum__w=manual  store    __start      __end
            0          845                  0     2019-01-01 2019-01-01
            1          357                  3     2019-01-02 2019-01-02
            2          904                  6     2019-01-03 2019-01-03
            3          599                  3     2019-01-04 2019-01-05
            4          871                  0     2019-01-06 2019-01-06
            ...                           ...    ...        ...        ...
    ```
    bound_method: str, optional
        The start-end bound methodology which is used to generate the slice ranges
        when ``data`` consists of multiple series / columns.
        Must be either of: `["inner", "inner-outer", "outer"]`, by default "inner".

        * if ``inner``, the inner-bounds of the series are returned.
        * if ``inner-outer``, the left-inner and right-outer bounds of the series
          are returned.
        * if ``outer``, the outer-bounds of the series are returned.
    approve_sparsity: bool, optional
        Bool indicating whether the user acknowledges that there may be sparsity
        (i.e., irregularly sampled data), by default False.
        If False and sparsity is observed, a warning is raised.
    show_progress: bool, optional
        If True, the progress will be shown with a progressbar, by default False.
    logging_file_path : Union[str, Path], optional
        The file path where the logged messages are stored. If `None`, then no
        logging `FileHandler` will be used and the logging messages are only pushed
        to stdout. Otherwise, a logging `FileHandler` will write the logged messages
        to the given file path. See also the `tsflex.features.logger` module.
    n_jobs : int, optional
        The number of processes used for the feature calculation. If `None`, then
        the number returned by _os.cpu_count()_ is used, by default None. \n
        If n_jobs is either 0 or 1, the code will be executed sequentially without
        creating a process pool. This is very useful when debugging, as the stack
        trace will be more comprehensible.
        .. note::
            Multiprocessed execution is not supported on Windows. Even when,
            `n_jobs` is set > 1, the feature extraction will still be executed
            sequentially.
            Why do we not support multiprocessing on Windows; see this issue
            https://github.com/predict-idlab/tsflex/issues/51

        .. tip::
            It takes on avg. _300ms_ to schedule everything with
            multiprocessing. So if your sequential feature extraction code runs
            faster than ~1s, it might not be worth it to parallelize the process
            (and thus better leave `n_jobs` to 0 or 1).

    Returns
    -------
    Union[List[pd.DataFrame], pd.DataFrame]
        The calculated features.

    Raises
    ------
    KeyError
        Raised when a required key is not found in `data`.

    Notes
    ------
    * The (column-)names of the series in `data` represent the `series_names`.
    * If a `logging_file_path` is provided, the execution (time) info can be
      retrieved by calling `logger.get_feature_logs(logging_file_path)`.
      Be aware that the `logging_file_path` gets cleared before the logger pushes
      logged messages. Hence, one should use a separate logging file for each
      constructed processing and feature instance with this library.


    """

    # Check valid data
    if isinstance(data, list):
        assert all(
            isinstance(d, (pd.Series, pd.DataFrame)) for d in data
        ), "All elements of the data list must be either a Series or a DataFrame!"
    else:
        assert isinstance(
            data, (pd.Series, pd.DataFrame, pd.core.groupby.DataFrameGroupBy)
        ), "The data must be either a Series, a DataFrame or a DataFrameGroupBy!"

    # check valid group_by
    assert group_by_all is None or group_by_consecutive is None, (
        "Only max one of the following parameters can be set: "
        + "`group_by_all` or `group_by_consecutive`"
    )
    assert not (
        (group_by_all is not None or group_by_consecutive is not None)
        and isinstance(data, pd.core.groupby.DataFrameGroupBy)
    ), (
        "Cannot use `group_by_all` or `group_by_consecutive` when `data` is"
        + " already a grouped DataFrame!"
    )

    # Delete other logging handlers
    delete_logging_handlers(logger)
    # Add logging handler (if path provided)
    f_handler = None
    if logging_file_path:
        f_handler = add_logging_handler(logger, logging_file_path)

    if (
        group_by_all
        or group_by_consecutive
        or isinstance(data, pd.core.groupby.DataFrameGroupBy)
    ):
        self._check_no_multiple_windows(
            error_case="When using the groupby behavior"
        )

        # The grouping column must be part of the required series
        if group_by_all:
            # group_by_consecutive should be None (checked by asserts above)
            # data should not be a grouped DataFrame (checked by asserts above)
            assert (
                group_by_all not in self.get_required_series()
            ), "The `group_by_all` column cannot be part of the required series!"
        elif group_by_consecutive:
            # group_by_all should be None (checked by asserts above)
            # data should not be a grouped DataFrame (checked by asserts above)
            assert group_by_consecutive not in self.get_required_series(), (
                "The `group_by_consecutive` column cannot be part of the required "
                + "series!"
            )
            # __start and __end should not be part of the output names
            assert "__start" not in self.get_required_series()
            assert "__end" not in self.get_required_series()

        # if any of the following params are not None, warn that they won't be of use
        # in the grouped calculation
        ignored_params = [
            ("stride", None),
            ("segment_start_idxs", None),
            ("segment_end_idxs", None),
            ("window_idx", "end"),
            ("include_final_window", False),
        ]
        local_params = locals()

        for ip, default_value in ignored_params:
            if local_params[ip] is not default_value:
                warnings.warn(
                    f"Parameter `{ip}` will be ignored in case of GroupBy feature"
                    + " calculation."
                )

        if group_by_consecutive:
            # Strided rollling feature extraction will take place
            return self._calculate_group_by_consecutive(
                data,
                group_by_consecutive,
                return_df,
                bound_method=bound_method,
                approve_sparsity=approve_sparsity,
                show_progress=show_progress,
                logging_file_path=logging_file_path,
                n_jobs=n_jobs,
            )
        else:
            # Grouped feature extraction will take place
            if not isinstance(data, pd.core.groupby.generic.DataFrameGroupBy):
                # group_by_all should not be None (checked by asserts above)
                assert group_by_all is not None
                # 0. Transform to dataframe
                series_dict = FeatureCollection._data_to_series_dict(
                    data, self.get_required_series() + [group_by_all]
                )
                # 1. Group by `group_by_all` column
                data = self._group_by_all(series_dict, col_name=group_by_all)

            return self._calculate_group_by_all(
                data,  # should be a DataFrameGroupBy
                return_df,
                show_progress=show_progress,
                n_jobs=n_jobs,
                f_handler=f_handler,
            )

    # Sort output index if segment indices are not provided
    sort_output_index = segment_start_idxs is None and segment_end_idxs is None

    # Convert to numpy array (if necessary)
    if segment_start_idxs is not None:
        segment_start_idxs = FeatureCollection._process_segment_idxs(
            segment_start_idxs
        )
    if segment_end_idxs is not None:
        segment_end_idxs = FeatureCollection._process_segment_idxs(segment_end_idxs)

    if segment_start_idxs is not None and segment_end_idxs is not None:
        # Check if segment indices have same length and whether every start idx
        # <= end idx
        _check_start_end_array(segment_start_idxs, segment_end_idxs)
        # Check if there is either 1 or No(ne) window value for every output name -
        # input_series combination
        self._check_no_multiple_windows(
            error_case="When using both `segment_start_idxs` and `segment_end_idxs`"
        )

    if segment_start_idxs is None or segment_end_idxs is None:
        assert all(
            fd.window is not None
            for fd in flatten(self._feature_desc_dict.values())
        ), (
            "Each feature descriptor must have a window when not both "
            + "segment_start_idxs and segment_end_idxs are provided"
        )

    if stride is None and segment_start_idxs is None and segment_end_idxs is None:
        assert all(
            fd.stride is not None
            for fd in flatten(self._feature_desc_dict.values())
        ), (
            "Each feature descriptor must have a stride when no stride or "
            + "segment indices are passed to this method!"
        )
    elif stride is not None and (
        segment_start_idxs is not None or segment_end_idxs is not None
    ):
        raise ValueError(
            "The stride and any segment index argument cannot be set together! "
            + "At least one of both should be None."
        )

    if stride is not None:
        # Verify whether the stride complies with the input data dtype
        stride = [
            parse_time_arg(s) if isinstance(s, str) else s for s in to_list(stride)
        ]
        self._check_feature_descriptors(skip_none=False, calc_stride=stride)

    # Convert the data to a series_dict
    series_dict = FeatureCollection._data_to_series_dict(
        data, self.get_required_series()
    )

    # Determine the bounds of the series dict items and slice on them
    # TODO: is dit wel nodig `hier? want we doen dat ook in de strided rolling
    start, end = _determine_bounds(bound_method, list(series_dict.values()))
    series_dict = {
        n: s.loc[
            s.index.dtype.type(start) : s.index.dtype.type(end)
        ]  # TODO: check memory efficiency of ths
        for n, s, in series_dict.items()
    }

    # Note: this variable has a global scope so this is shared in multiprocessing
    # TODO: try to make this more efficient (but is not really the bottleneck)
    global get_stroll_func
    get_stroll_func = self._stroll_feat_generator(
        series_dict,
        calc_stride=stride,
        segment_start_idxs=segment_start_idxs,
        segment_end_idxs=segment_end_idxs,
        start_idx=start,
        end_idx=end,
        window_idx=window_idx,
        include_final_window=include_final_window,
        approve_sparsity=approve_sparsity,
    )

    return self._calculate_feature_list(
        self._executor_stroll,
        n_jobs,
        show_progress,
        return_df,
        sort_output_index,
        f_handler,
    )

Calculate features on the passed data.

Parameters

data : Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]], pd.core.groupby.DataFrameGroupby]

Dataframe or Series or list thereof, with all the required data for the feature calculation.

Assumptions:

  • each Series / DataFrame must have a sortable index. This index represents the sequence position of the corresponding values, the index can be either numeric or a pd.DatetimeIndex.
  • each Series / DataFrame index must be comparable with all others
  • we assume that each series-name / dataframe-column-name is unique. Can also be a DataFrameGroupBy object, in which case the expected behaviour is similar to grouping by all values in group_by_all (i.e., for each group, the features are calculated on the group's data).
stride : Union[float, str, pd.Timedelta, List[Union[float, str, pd.Timedelta], None], optional

The stride size. By default None. This argument supports multiple types:

  • If None, the stride of the FeatureDescriptor objects will be used.
  • If the type is an float or an int, its value represents the series:

    • its stride range when a non time-indexed series is passed.
    • the stride in number of samples, when a time-indexed series is passed (must then be and int)
      • If the stride's type is a pd.Timedelta, the stride size represents the stride-time delta. The passed data must have a time-index.
      • If a str, it must represent a stride-time-delta-string. Hence, the passed data must have a time-index.

Note

When set, this stride argument takes precedence over the stride property of the FeatureDescriptors in this FeatureCollection (i.e., when a not None value for stride passed to this method).

segment_start_idxs : Union[list, np.ndarray, pd.Series, pd.Index], optional

The start indices of the segments. If None, the start indices will be computed from the data using either:

  • the segment_end_idxs - the window size property of the FeatureDescriptor in this FeatureCollection (if segment_end_idxs is not None)
  • strided-window rolling on the data using window and stride of the FeatureDescriptor in this FeatureCollection (if segment_end_idxs is also None). (Note that the stride argument of this method takes precedence over the stride property of the FeatureDescriptors). By default None.
segment_end_idxs : Union[list, np.ndarray, pd.Series, pd.Index], optional

The end indices for the segmented windows. If None, the end indices will be computed from the data using either:

  • the segment_start_idxs + the window size property of the FeatureDescriptor in this FeatureCollection (if segment_start_idxs is not None)
  • strided-window rolling on the data using window and stride of the FeatureDescriptor in this FeatureCollection (if segment_start_idxs is also None). (Note that the stride argument of this method takes precedence over the stride property of the FeatureDescriptors). By default None.

Note

When passing both segment_start_idxs and segment_end_idxs, these two arguments must have the same length and every start index must be <= than the corresponding end index. Note that passing both arguments, discards any meaning of the window and stride values (as these segment indices define the segmented data, and thus no strided-window rolling index calculation has to be executed). As such, the user can create variable-length segmented windows. However, in such cases, the user should be weary that the feature functions are invariant to these (potentially variable-length) windows.

return_df : bool, optional
Whether the output needs to be a DataFrame or a list thereof, by default False. If True the output dataframes will be merged to a DataFrame with an outer merge.
window_idx : str, optional

The window's index position which will be used as index for the feature_window aggregation. Must be either of: ["begin", "middle", "end"]. by default "end". All features in this collection will use the same window_idx.

Note

window_idx="end" uses the window's end (= right open bound) as output index.

window_idx="begin" uses the window's start idx (= left closed bound) as output index.

include_final_window : bool, optional

Whether the final (possibly incomplete) window should be included in the strided-window segmentation, by default False.

Note

The remarks below apply when include_final_window is set to True. The user should be aware that the last window might be incomplete, i.e.;

  • when equally sampled, the last window might be smaller than the the other windows.
  • when not equally sampled, the last window might not include all the data points (as the begin-time + window-size comes after the last data point).

Note, that when equally sampled, the last window will be a full window when:

  • the stride is the sampling rate of the data (or stride = 1 for sample-based configurations).
    Remark: that when include_final_window is set to False, the last window (which is a full) window will not be included!
  • (len * sampling_rate - window_size) % stride = 0. Remark that the above case is a base case of this.
group_by_all : str, optional
The name of the column by which to perform grouping. For each group, the features will be calculated. The output that is returned contains this group_by column as index to allow identifying the groups. If this parameter is used, the parameters stride, segment_start_idxs, segment_end_idxs, window_idx and include_final_window will be ignored. Rows with NaN values for this column will not be considered (as pandas ignores these rows when grouping).

Note

This is similar as passing a DataFrameGroupBy object as data argument to the calculate method, where the DataFrameGroupBy object is created by calling data.groupby(group_by_all).
group_by_consecutive : str, optional

The name of the column by which to perform consecutive grouping. A consecutive group is a group of values that are the same and are next to each other. For each consecutive group, the features will be calculated. The output that is returned contains this group_by column to allow identifying the groups, and also contains fields [__start, "__end"] which contain start and end time range for each result row. If this parameter is used, the parameters stride, segment_start_idxs, segment_end_idxs, window_idx and include_final_window will be ignored. Rows with NaN values for this column will not be considered (as we deem NaN not as a groupable value). Note that for consecutive grouping, groups can appear multiple times if they appear in different time-gaps.

Example output:

Example

            number_sold__sum__w=manual  store    __start      __end
        0          845                  0     2019-01-01 2019-01-01
        1          357                  3     2019-01-02 2019-01-02
        2          904                  6     2019-01-03 2019-01-03
        3          599                  3     2019-01-04 2019-01-05
        4          871                  0     2019-01-06 2019-01-06
        ...                           ...    ...        ...        ...
bound_method : str, optional

The start-end bound methodology which is used to generate the slice ranges when data consists of multiple series / columns. Must be either of: ["inner", "inner-outer", "outer"], by default "inner".

  • if inner, the inner-bounds of the series are returned.
  • if inner-outer, the left-inner and right-outer bounds of the series are returned.
  • if outer, the outer-bounds of the series are returned.
approve_sparsity : bool, optional
Bool indicating whether the user acknowledges that there may be sparsity (i.e., irregularly sampled data), by default False. If False and sparsity is observed, a warning is raised.
show_progress : bool, optional
If True, the progress will be shown with a progressbar, by default False.
logging_file_path : Union[str, Path], optional
The file path where the logged messages are stored. If None, then no logging FileHandler will be used and the logging messages are only pushed to stdout. Otherwise, a logging FileHandler will write the logged messages to the given file path. See also the .logger module.
n_jobs : int, optional

The number of processes used for the feature calculation. If None, then the number returned by os.cpu_count() is used, by default None.

If n_jobs is either 0 or 1, the code will be executed sequentially without creating a process pool. This is very useful when debugging, as the stack trace will be more comprehensible.

Note

Multiprocessed execution is not supported on Windows. Even when, n_jobs is set > 1, the feature extraction will still be executed sequentially. Why do we not support multiprocessing on Windows; see this issue https://github.com/predict-idlab/tsflex/issues/51

Tip

It takes on avg. 300ms to schedule everything with multiprocessing. So if your sequential feature extraction code runs faster than ~1s, it might not be worth it to parallelize the process (and thus better leave n_jobs to 0 or 1).

Returns

Union[List[pd.DataFrame], pd.DataFrame]
The calculated features.

Raises

KeyError
Raised when a required key is not found in data.

Notes

  • The (column-)names of the series in data represent the series_names.
  • If a logging_file_path is provided, the execution (time) info can be retrieved by calling logger.get_feature_logs(logging_file_path). Be aware that the logging_file_path gets cleared before the logger pushes logged messages. Hence, one should use a separate logging file for each constructed processing and feature instance with this library.
def serialize(self, file_path)
Expand source code
def serialize(self, file_path: Union[str, Path]) -> None:
    """Serialize this FeatureCollection instance.

    Parameters
    ----------
    file_path : Union[str, Path]
        The path where the `FeatureCollection` will be serialized.

    Note
    -----
    As we use [Dill](https://github.com/uqfoundation/dill){:target="_blank"} to
    serialize the files, we can **also serialize functions which are defined in
    the local scope, like lambdas.**

    """
    with open(file_path, "wb") as f:
        dill.dump(self, f, recurse=True)

Serialize this FeatureCollection instance.

Parameters

file_path : Union[str, Path]
The path where the FeatureCollection will be serialized.

Note

As we use Dill to serialize the files, we can also serialize functions which are defined in the local scope, like lambdas.

def reduce(self, feat_cols_to_keep)
Expand source code
def reduce(self, feat_cols_to_keep: List[str]) -> FeatureCollection:
    """Create a reduced FeatureCollection instance based on `feat_cols_to_keep`.

    For example, this is useful to optimize feature-extraction inference
    (for your selected features) after performing a feature-selection procedure.

    Parameters
    ----------
    feat_cols_to_keep: List[str]
        A subset of the feature collection instance its column names.
        This corresponds to the columns / names of the output from `calculate`
        method that you want to keep.

    Returns
    -------
    FeatureCollection
        A new FeatureCollection object, which only withholds the FeatureDescriptors
        which constitute the `feat_cols_to_keep` output.

    Note
    ----
    Some FeatureDescriptor objects may have multiple **output-names**.<br>
    Hence, if you only want to retain _a subset_ of that FeatureDescriptor its
    feature outputs, you will still get **all features** as the new
    FeatureCollection is constructed by applying a filter on de FeatureDescriptor
    list and we thus not alter these FeatureDescriptor objects themselves.

    """
    # dict in which we store all the { output_col_name : (UUID, FeatureDescriptor) }
    # items of our current FeatureCollection object
    manual_window = False
    if any(c.endswith("w=manual") for c in feat_cols_to_keep):
        assert all(c.endswith("w=manual") for c in feat_cols_to_keep)
        # As the windows are created manual, the FeatureCollection cannot contain
        # multiple windows for the same output name - input_series combination
        self._check_no_multiple_windows(
            error_case="When reducing a FeatureCollection with manual windows"
        )
        manual_window = True
    feat_col_fd_mapping: Dict[str, Tuple[str, FeatureDescriptor]] = {}
    for (s_names, window), fd_list in self._feature_desc_dict.items():
        window = "manual" if manual_window else self._ws_to_str(window)
        for fd in fd_list:
            # As a single FeatureDescriptor can have multiple output col names, we
            # create a unique identifier for each FeatureDescriptor (on which we
            # will apply set-like operations later on to only retain all the unique
            # FeatureDescriptors)
            uuid_str = str(uuid.uuid4())
            for output_name in fd.function.output_names:
                # Reconstruct the feature column name
                feat_col_name = StridedRolling.construct_output_index(
                    series_keys=s_names, feat_name=output_name, win_str=window
                )
                feat_col_fd_mapping[feat_col_name] = (uuid_str, fd)

    assert all(fc in feat_col_fd_mapping for fc in feat_cols_to_keep)

    # Collect (uuid, FeatureDescriptor) for the feat_cols_to_keep
    fd_subset: List[Tuple[str, FeatureDescriptor]] = [
        feat_col_fd_mapping[fc] for fc in feat_cols_to_keep
    ]

    # Reduce to unique feature descriptor objects (based on uuid) and create a new
    # FeatureCollection for their deepcopy's.
    seen_uuids = set()
    fds = []
    for uuid_str, fd in fd_subset:
        if uuid_str not in seen_uuids:
            seen_uuids.add(uuid_str)
            fds.append(deepcopy(fd))

    return FeatureCollection(feature_descriptors=fds)

Create a reduced FeatureCollection instance based on feat_cols_to_keep.

For example, this is useful to optimize feature-extraction inference (for your selected features) after performing a feature-selection procedure.

Parameters

feat_cols_to_keep : List[str]
A subset of the feature collection instance its column names. This corresponds to the columns / names of the output from calculate method that you want to keep.

Returns

FeatureCollection
A new FeatureCollection object, which only withholds the FeatureDescriptors which constitute the feat_cols_to_keep output.

Note

Some FeatureDescriptor objects may have multiple output-names.
Hence, if you only want to retain a subset of that FeatureDescriptor its feature outputs, you will still get all features as the new FeatureCollection is constructed by applying a filter on de FeatureDescriptor list and we thus not alter these FeatureDescriptor objects themselves.