Module tsflex.chunking

Utilities for chunking time-series data before feeding it to the operators.

Expand source code
"""Utilities for chunking time-series data before feeding it to the operators.
"""

__author__ = "Jonas Van Der Donckt"

from .chunking import chunk_data

__all__ = ["chunk_data"]

API reference of tsflex.chunking

.chunking

(Advanced) tsflex utilities for chunking sequence data.

Functions

def chunk_data(data, fs_dict=None, chunk_range_margin=None, min_chunk_dur=None, max_chunk_dur=None, sub_chunk_overlap=None, copy=True, verbose=False)
Expand source code
def chunk_data(
    data: Union[
        pd.Series,
        pd.DataFrame,
        List[Union[pd.Series, pd.DataFrame]],
        Dict[str, pd.DataFrame],
    ],
    fs_dict: Optional[Dict[str, float]] = None,
    chunk_range_margin: Optional[Union[float, str, pd.Timedelta]] = None,
    min_chunk_dur: Optional[Union[float, str, pd.Timedelta]] = None,
    max_chunk_dur: Optional[Union[float, str, pd.Timedelta]] = None,
    sub_chunk_overlap: Optional[Union[float, str, pd.Timedelta]] = None,
    copy: bool = True,
    verbose: bool = False,
) -> List[List[pd.Series]]:
    """Divide the time-series `data` in same time/sequence-range chunks.

    Does 2 things:

    1. Detecting gaps in the `data`(-list) sequence series.
    2. Divides the `data` into chunks, according to the parameter
       configuration and the detected gaps.

    Notes
    -----
    * When you set `fs_dict`, the assumption is made that **each item** in `data`
      has a **fixed sample frequency**. If you do not set `fs_dict`, this variable
      will use the 1 / max time-diff of the corresponding series as key-value pair.
    * All subsequent series-chunks are matched against the time-ranges of the first
      series. This implies that **the first item in `data` serves as a reference**
      for gap-matching.
    * The term `sub-chunk` refers to the chunks who exceed the `max_chunk_duration`
      parameter and are therefore further divided into sub-chunks.

    Example
    -------
    ```python
    df_acc  # cols ['ACC_x', 'ACC_y`, 'ACC_z`, 'ACC_SMV`] - 32 Hz
    df_gyro # cols ['gyro_x', 'gyro_y`, 'gyro_z`, 'gyro_area`] - 100 Hz
    chunk_data({'acc': df_acc, 'g': df_gyro}, fs_dict={'acc': 32, 'g': 100})
    ```
    <br>

    .. Note::
        If `chunk_range_margin` / `min_chunk_dur` / `max_chunk_dur` /
        `sub_chunk_overlap` is a int/float, it will be interpreted as numerical
        sequence range  and a numerical-indexed `data` will be assumed.
        **These attributes must be all either time-based or numerical and match
        the data its index dtype**

    Parameters
    -----------
    data: Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]], Dict[str, pd.DataFrame]]
        The sequence data which will be chunked. Each item in `data` must have a
        monotonically increasing index. We assume that each `item` in data
        has a _nearly-constant_ sample frequency (when there are no gaps) and all
        indices have the same dtype.
    fs_dict: Dict[str, int], optional
        The sample frequency dict. If set, this dict must at least withhold all the keys
        from the items in `data`.
        .. note::
            if you passed a **_DataFrame-dict_** (i.e., a dict with key=str;
            value=DataFrame) to `data`, then you can **use** the **corresponding
            dataframe str-key** to describe the `fs` for all the DataFrame its columns
            with the `fs_dict` attribute. See also the example above
    chunk_range_margin: Union[float, str, pd.Timedelta], optional
        The allowed margin for each `ts` chunk their start and end time to be seen as
        same time-range chunks with other `ts`. If `None` the margin will be set as:

            2 / min(fs_dict.intersection(data.names).values())

        Which is equivalent to twice the min-fs (= max-period) of the passed `data`,
        by default None.\n
        * if `pd.Timedelta`, it will be interpreted as a time-range margin
        * if `int` or `float`, it will be interpreted as a numerical range margin
    min_chunk_dur : Union[float, str, pd.Timedelta], optional
        The minimum duration of a chunk, by default None.
        Chunks with durations smaller than this will be discarded (and not returned).\n
        * if `pd.Timedelta`, it will be interpreted as a time-range margin
        * if `int` or `float`, it will be interpreted as a numerical range margin
    max_chunk_dur : Union[float, str, pd.Timedelta], optional
        The maximum duration of a chunk, by default None.
        Chunks with durations larger than this will be chunked in smaller `sub_chunks`
        where each sub-chunk has a maximum duration of `max_chunk_dur`.\n
        * if `pd.Timedelta`, it will be interpreted as a time-range margin
        * if `int` or `float`, it will be interpreted as a numerical range margin
    sub_chunk_overlap: Union[float, str, pd.Timedelta], optional
        The sub-chunk boundary overlap. If available, **this
        margin / 2 will be added to either side of the `sub_chunk`**.
        This is especially useful to not lose inter-`sub_chunk` data (as each
        `sub_chunk` is in fact a continuous chunk) when window-based aggregations
        are performed on these same time range output (sub_)chunks.
        This argument is only relevant if `max_chunk_dur` is set.\n
        * if `pd.Timedelta`, it will be interpreted as a time-range margin
        * if `int` or `float`, it will be interpreted as a numerical range margin
    copy: boolean, optional
        If set True will return a new view (on which you won't get a
        `SettingWithCopyWarning` if you change the content), by default False.
    verbose : bool, optional
        If set, will print more verbose output, by default False

    Returns
    -------
    List[List[pd.Series]]
        A list of same time range chunks.

    """
    if isinstance(data, dict):
        if isinstance(fs_dict, dict):
            out_dict = {}
            for k, fs in fs_dict.items():
                if k in data and isinstance(data[k], pd.DataFrame):
                    out_dict.update({c_name: fs for c_name in data[k].columns})
            fs_dict.update(out_dict)

        # make `data` `to_series_list` convertable()
        data = list(data.values())

    # Convert the input data
    series_list = to_series_list(data)

    # Assert that there are no duplicate series names
    assert len(series_list) == len(set([s.name for s in series_list]))

    # Assert that the index increases monotonically
    assert all(s.index.is_monotonic_increasing for s in series_list)

    return _dtype_to_chunk_method[AttributeParser.determine_type(data)](
        series_list,
        fs_dict,
        chunk_range_margin,  # type: ignore[arg-type]
        min_chunk_dur,  # type: ignore[arg-type]
        max_chunk_dur,  # type: ignore[arg-type]
        sub_chunk_overlap,  # type: ignore[arg-type]
        copy,
        verbose,
    )

Divide the time-series data in same time/sequence-range chunks.

Does 2 things:

  1. Detecting gaps in the data(-list) sequence series.
  2. Divides the data into chunks, according to the parameter configuration and the detected gaps.

Notes

  • When you set fs_dict, the assumption is made that each item in data has a fixed sample frequency. If you do not set fs_dict, this variable will use the 1 / max time-diff of the corresponding series as key-value pair.
  • All subsequent series-chunks are matched against the time-ranges of the first series. This implies that the first item in data serves as a reference for gap-matching.
  • The term sub-chunk refers to the chunks who exceed the max_chunk_duration parameter and are therefore further divided into sub-chunks.

Example

df_acc  # cols ['ACC_x', 'ACC_y`, 'ACC_z`, 'ACC_SMV`] - 32 Hz
df_gyro # cols ['gyro_x', 'gyro_y`, 'gyro_z`, 'gyro_area`] - 100 Hz
chunk_data({'acc': df_acc, 'g': df_gyro}, fs_dict={'acc': 32, 'g': 100})


Note

If chunk_range_margin / min_chunk_dur / max_chunk_dur / sub_chunk_overlap is a int/float, it will be interpreted as numerical sequence range and a numerical-indexed data will be assumed. These attributes must be all either time-based or numerical and match the data its index dtype

Parameters

data : Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]], Dict[str, pd.DataFrame]]
The sequence data which will be chunked. Each item in data must have a monotonically increasing index. We assume that each item in data has a nearly-constant sample frequency (when there are no gaps) and all indices have the same dtype.
fs_dict : Dict[str, int], optional
The sample frequency dict. If set, this dict must at least withhold all the keys from the items in data.

Note

if you passed a DataFrame-dict (i.e., a dict with key=str; value=DataFrame) to data, then you can use the corresponding dataframe str-key to describe the fs for all the DataFrame its columns with the fs_dict attribute. See also the example above
chunk_range_margin : Union[float, str, pd.Timedelta], optional

The allowed margin for each ts chunk their start and end time to be seen as same time-range chunks with other ts. If None the margin will be set as:

2 / min(fs_dict.intersection(data.names).values())

Which is equivalent to twice the min-fs (= max-period) of the passed data, by default None.

  • if pd.Timedelta, it will be interpreted as a time-range margin
  • if int or float, it will be interpreted as a numerical range margin
min_chunk_dur : Union[float, str, pd.Timedelta], optional

The minimum duration of a chunk, by default None. Chunks with durations smaller than this will be discarded (and not returned).

  • if pd.Timedelta, it will be interpreted as a time-range margin
  • if int or float, it will be interpreted as a numerical range margin
max_chunk_dur : Union[float, str, pd.Timedelta], optional

The maximum duration of a chunk, by default None. Chunks with durations larger than this will be chunked in smaller sub_chunks where each sub-chunk has a maximum duration of max_chunk_dur.

  • if pd.Timedelta, it will be interpreted as a time-range margin
  • if int or float, it will be interpreted as a numerical range margin
sub_chunk_overlap : Union[float, str, pd.Timedelta], optional

The sub-chunk boundary overlap. If available, this margin / 2 will be added to either side of the sub_chunk. This is especially useful to not lose inter-sub_chunk data (as each sub_chunk is in fact a continuous chunk) when window-based aggregations are performed on these same time range output (sub_)chunks. This argument is only relevant if max_chunk_dur is set.

  • if pd.Timedelta, it will be interpreted as a time-range margin
  • if int or float, it will be interpreted as a numerical range margin
copy : boolean, optional
If set True will return a new view (on which you won't get a SettingWithCopyWarning if you change the content), by default False.
verbose : bool, optional
If set, will print more verbose output, by default False

Returns

List[List[pd.Series]]
A list of same time range chunks.