Module tsflex.chunking.chunking
(Advanced) tsflex utilities for chunking sequence data.
Expand source code
"""(Advanced) tsflex utilities for chunking sequence data."""
__author__ = "Jonas Van Der Donckt"
from datetime import timedelta
from typing import Dict, List, Optional, Tuple, Union
import pandas as pd
from ..utils.argument_parsing import parse_time_arg
from ..utils.attribute_parsing import AttributeParser, DataType
from ..utils.data import to_series_list
def _chunk_time_data(
series_list: List[pd.Series],
fs_dict: Optional[Dict[str, float]] = None,
chunk_range_margin: Optional[Union[str, pd.Timedelta]] = None,
min_chunk_dur: Optional[Union[str, pd.Timedelta]] = None,
max_chunk_dur: Optional[Union[str, pd.Timedelta]] = None,
sub_chunk_overlap: Optional[Union[str, pd.Timedelta]] = None,
copy: bool = True,
verbose: bool = False,
) -> List[List[pd.Series]]:
if min_chunk_dur is not None:
min_chunk_dur = parse_time_arg(min_chunk_dur)
if max_chunk_dur is not None:
max_chunk_dur = parse_time_arg(max_chunk_dur)
if sub_chunk_overlap is None:
sub_chunk_overlap = pd.Timedelta(0)
sub_chunk_overlap = parse_time_arg(sub_chunk_overlap)
# Default arg -> set the chunk range margin to 2x the min-freq its period
if chunk_range_margin is None:
if fs_dict is not None:
chunk_range_margin = pd.Timedelta(
seconds=2 / min([fs_dict[str(s.name)] for s in series_list])
)
else:
raise ValueError("Chunk range margin must be set if fs_dict is not set!")
chunk_range_margin = parse_time_arg(chunk_range_margin)
assert chunk_range_margin.total_seconds() > 0, "chunk_range_margin must be > 0"
# if fs_dict is not set -> set it to the max time-diff for the corresponding series
if fs_dict is None:
if verbose:
print("fs is none -> using 1 / max time diff for each series as fs")
fs_dict = {
s.name: (1 / s.index.to_series().diff().max().total_seconds())
for s in series_list
}
# Assert the names reside in fs_dict
assert all([str(s.name) in fs_dict for s in series_list])
# Some range asserts
assert sub_chunk_overlap.total_seconds() >= 0, "sub_chunk_overlap_s must be > 0"
if max_chunk_dur is not None:
assert max_chunk_dur.total_seconds() > 0, "max_chunk_dur_ must be > 0"
# Variable in which the same time-range chunks are stored
# Each list item can be seen as (t_start_chunk, t_end_chunk, chunk_list)
same_range_chunks: List[Tuple[pd.Timestamp, pd.Timestamp, List[pd.Series]]] = []
def print_verbose_time(
sig: pd.Series, t_begin: pd.Timestamp, t_end: pd.Timestamp, msg: str = ""
) -> None:
fmt = "%Y-%m-%d %H:%M"
if not verbose:
return
print(
f"slice {sig.name} {t_begin.strftime(fmt):<10} - {t_end.strftime(fmt):<10} "
f"- shape: {sig[t_begin:t_end].shape}"
)
if len(msg):
print(f"\tâââ> {msg}")
def slice_time(
sig: pd.Series, t_begin: pd.Timestamp, t_end: pd.Timestamp
) -> pd.Series:
"""Slice the sig Series."""
if copy:
return sig[t_begin:t_end].copy()
else:
return sig[t_begin:t_end]
def insert_chunk(chunk: pd.Series) -> None:
"""Insert the chunk into `same_range_chunks`."""
t_chunk_start, t_chunk_end = chunk.index[[0, -1]]
# Iterate over the same-(time)range-chunk (src) collection
for src_start, src_end, src_chunks in same_range_chunks:
# Check for overlap
if (
abs(src_start - t_chunk_start) <= chunk_range_margin
and abs(src_end - t_chunk_end) <= chunk_range_margin
):
# Check series name not in src_chunks
if chunk.name not in [src_c.name for src_c in src_chunks]:
src_chunks.append(chunk)
print_verbose_time(
chunk, t_chunk_start, t_chunk_end, "INSERT chunk"
)
return
else:
# There already exists a sub_chunk of this series name
raise ValueError(
"There already exists a chunk with this series"
f" name - {chunk.name}"
)
same_range_chunks.append((t_chunk_start, t_chunk_end, [chunk]))
print_verbose_time(chunk, t_chunk_start, t_chunk_end, "APPEND sub chunk")
for series in series_list:
# 1. Some checks
if len(series) < 2:
if verbose:
print(f"Too small series: {series.name} - shape: {series.shape} ")
continue
# Allowed offset (in seconds) is sample_period + 0.5*sample_period
fs_sig = fs_dict[str(series.name)]
gaps_mask = series.index.to_series().diff() > timedelta(
seconds=(1 + 0.5) / fs_sig
)
# Set the first and last timestamp to True
gaps_mask.iloc[[0, -1]] = True
gaps: List[pd.Timestamp] = series[gaps_mask].index.to_list()
if verbose:
print("-" * 10, " detected gaps", "-" * 10)
print(*gaps, sep="\n")
for t_begin_c, t_end_c in zip(gaps, gaps[1:]):
# The t_end is the t_start of the new time range -> hence [:-1]
# => cut on [t_start_c(hunk), t_end_c(hunk)[
sig_chunk = series[t_begin_c:t_end_c]
if t_end_c < gaps[-1]:
# Note: we doe not adjust when t_end_c = gaps[-1]
# (as gaps-[-1] is not really a gap)
sig_chunk = sig_chunk[:-1]
if len(sig_chunk) > 2: # re-adjust the t_end
t_end_c = sig_chunk.index[-1]
else:
print_verbose_time(series, t_begin_c, t_end_c, "too small df_chunk")
continue
# Check for min duration
chunk_range_s = int(len(sig_chunk) / fs_sig)
if (
min_chunk_dur is not None
and chunk_range_s < min_chunk_dur.total_seconds()
):
print_verbose_time(
sig_chunk,
t_begin_c,
t_end_c,
f"Too small chunk min_dur {min_chunk_dur} > {sig_chunk.shape}",
)
continue
# Divide the chunk into sub_chunks (sc's)
if (
max_chunk_dur is not None
and chunk_range_s > max_chunk_dur.total_seconds()
):
print_verbose_time(
sig_chunk, t_begin_c, t_end_c, "Dividing in sub-chunks"
)
t_begin_sc = t_begin_c
while t_begin_sc < t_end_c:
# Calculate the end sub-chunk time
t_end_sc = t_begin_sc + max_chunk_dur
# Get the end and begin sub-chunk margin
t_end_sc_m = t_end_sc + sub_chunk_overlap / 2
t_end_sc_m = min(t_end_c, t_end_sc_m)
t_begin_sc_m = t_begin_sc - sub_chunk_overlap / 2
t_begin_sc_m = max(t_begin_c, t_begin_sc_m)
# Slice & add the sub-chunk to the list
insert_chunk(chunk=slice_time(series, t_begin_sc_m, t_end_sc_m))
# Update the condition's variable
t_begin_sc = t_end_sc
else:
insert_chunk(chunk=slice_time(series, t_begin_c, t_end_c))
return [chunk_range[-1] for chunk_range in same_range_chunks]
def _chunk_sequence_data(
series_list: List[pd.Series],
fs_dict: Optional[Dict[str, float]] = None,
chunk_range_margin: Optional[float] = None,
min_chunk_dur: Optional[float] = None,
max_chunk_dur: Optional[float] = None,
sub_chunk_overlap: Optional[float] = None,
copy: bool = True,
verbose: bool = False,
) -> List[List[pd.Series]]:
raise NotImplementedError("Not implemented yet")
_dtype_to_chunk_method = {
DataType.TIME: _chunk_time_data,
DataType.SEQUENCE: _chunk_sequence_data,
}
def chunk_data(
data: Union[
pd.Series,
pd.DataFrame,
List[Union[pd.Series, pd.DataFrame]],
Dict[str, pd.DataFrame],
],
fs_dict: Optional[Dict[str, float]] = None,
chunk_range_margin: Optional[Union[float, str, pd.Timedelta]] = None,
min_chunk_dur: Optional[Union[float, str, pd.Timedelta]] = None,
max_chunk_dur: Optional[Union[float, str, pd.Timedelta]] = None,
sub_chunk_overlap: Optional[Union[float, str, pd.Timedelta]] = None,
copy: bool = True,
verbose: bool = False,
) -> List[List[pd.Series]]:
"""Divide the time-series `data` in same time/sequence-range chunks.
Does 2 things:
1. Detecting gaps in the `data`(-list) sequence series.
2. Divides the `data` into chunks, according to the parameter
configuration and the detected gaps.
Notes
-----
* When you set `fs_dict`, the assumption is made that **each item** in `data`
has a **fixed sample frequency**. If you do not set `fs_dict`, this variable
will use the 1 / max time-diff of the corresponding series as key-value pair.
* All subsequent series-chunks are matched against the time-ranges of the first
series. This implies that **the first item in `data` serves as a reference**
for gap-matching.
* The term `sub-chunk` refers to the chunks who exceed the `max_chunk_duration`
parameter and are therefore further divided into sub-chunks.
Example
-------
```python
df_acc # cols ['ACC_x', 'ACC_y`, 'ACC_z`, 'ACC_SMV`] - 32 Hz
df_gyro # cols ['gyro_x', 'gyro_y`, 'gyro_z`, 'gyro_area`] - 100 Hz
chunk_data({'acc': df_acc, 'g': df_gyro}, fs_dict={'acc': 32, 'g': 100})
```
<br>
.. Note::
If `chunk_range_margin` / `min_chunk_dur` / `max_chunk_dur` /
`sub_chunk_overlap` is a int/float, it will be interpreted as numerical
sequence range and a numerical-indexed `data` will be assumed.
**These attributes must be all either time-based or numerical and match
the data its index dtype**
Parameters
-----------
data: Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]], Dict[str, pd.DataFrame]]
The sequence data which will be chunked. Each item in `data` must have a
monotonically increasing index. We assume that each `item` in data
has a _nearly-constant_ sample frequency (when there are no gaps) and all
indices have the same dtype.
fs_dict: Dict[str, int], optional
The sample frequency dict. If set, this dict must at least withhold all the keys
from the items in `data`.
.. note::
if you passed a **_DataFrame-dict_** (i.e., a dict with key=str;
value=DataFrame) to `data`, then you can **use** the **corresponding
dataframe str-key** to describe the `fs` for all the DataFrame its columns
with the `fs_dict` attribute. See also the example above
chunk_range_margin: Union[float, str, pd.Timedelta], optional
The allowed margin for each `ts` chunk their start and end time to be seen as
same time-range chunks with other `ts`. If `None` the margin will be set as:
2 / min(fs_dict.intersection(data.names).values())
Which is equivalent to twice the min-fs (= max-period) of the passed `data`,
by default None.\n
* if `pd.Timedelta`, it will be interpreted as a time-range margin
* if `int` or `float`, it will be interpreted as a numerical range margin
min_chunk_dur : Union[float, str, pd.Timedelta], optional
The minimum duration of a chunk, by default None.
Chunks with durations smaller than this will be discarded (and not returned).\n
* if `pd.Timedelta`, it will be interpreted as a time-range margin
* if `int` or `float`, it will be interpreted as a numerical range margin
max_chunk_dur : Union[float, str, pd.Timedelta], optional
The maximum duration of a chunk, by default None.
Chunks with durations larger than this will be chunked in smaller `sub_chunks`
where each sub-chunk has a maximum duration of `max_chunk_dur`.\n
* if `pd.Timedelta`, it will be interpreted as a time-range margin
* if `int` or `float`, it will be interpreted as a numerical range margin
sub_chunk_overlap: Union[float, str, pd.Timedelta], optional
The sub-chunk boundary overlap. If available, **this
margin / 2 will be added to either side of the `sub_chunk`**.
This is especially useful to not lose inter-`sub_chunk` data (as each
`sub_chunk` is in fact a continuous chunk) when window-based aggregations
are performed on these same time range output (sub_)chunks.
This argument is only relevant if `max_chunk_dur` is set.\n
* if `pd.Timedelta`, it will be interpreted as a time-range margin
* if `int` or `float`, it will be interpreted as a numerical range margin
copy: boolean, optional
If set True will return a new view (on which you won't get a
`SettingWithCopyWarning` if you change the content), by default False.
verbose : bool, optional
If set, will print more verbose output, by default False
Returns
-------
List[List[pd.Series]]
A list of same time range chunks.
"""
if isinstance(data, dict):
if isinstance(fs_dict, dict):
out_dict = {}
for k, fs in fs_dict.items():
if k in data and isinstance(data[k], pd.DataFrame):
out_dict.update({c_name: fs for c_name in data[k].columns})
fs_dict.update(out_dict)
# make `data` `to_series_list` convertable()
data = list(data.values())
# Convert the input data
series_list = to_series_list(data)
# Assert that there are no duplicate series names
assert len(series_list) == len(set([s.name for s in series_list]))
# Assert that the index increases monotonically
assert all(s.index.is_monotonic_increasing for s in series_list)
return _dtype_to_chunk_method[AttributeParser.determine_type(data)](
series_list,
fs_dict,
chunk_range_margin, # type: ignore[arg-type]
min_chunk_dur, # type: ignore[arg-type]
max_chunk_dur, # type: ignore[arg-type]
sub_chunk_overlap, # type: ignore[arg-type]
copy,
verbose,
)
Functions
def chunk_data(data, fs_dict=None, chunk_range_margin=None, min_chunk_dur=None, max_chunk_dur=None, sub_chunk_overlap=None, copy=True, verbose=False)
-
Expand source code
def chunk_data( data: Union[ pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]], Dict[str, pd.DataFrame], ], fs_dict: Optional[Dict[str, float]] = None, chunk_range_margin: Optional[Union[float, str, pd.Timedelta]] = None, min_chunk_dur: Optional[Union[float, str, pd.Timedelta]] = None, max_chunk_dur: Optional[Union[float, str, pd.Timedelta]] = None, sub_chunk_overlap: Optional[Union[float, str, pd.Timedelta]] = None, copy: bool = True, verbose: bool = False, ) -> List[List[pd.Series]]: """Divide the time-series `data` in same time/sequence-range chunks. Does 2 things: 1. Detecting gaps in the `data`(-list) sequence series. 2. Divides the `data` into chunks, according to the parameter configuration and the detected gaps. Notes ----- * When you set `fs_dict`, the assumption is made that **each item** in `data` has a **fixed sample frequency**. If you do not set `fs_dict`, this variable will use the 1 / max time-diff of the corresponding series as key-value pair. * All subsequent series-chunks are matched against the time-ranges of the first series. This implies that **the first item in `data` serves as a reference** for gap-matching. * The term `sub-chunk` refers to the chunks who exceed the `max_chunk_duration` parameter and are therefore further divided into sub-chunks. Example ------- ```python df_acc # cols ['ACC_x', 'ACC_y`, 'ACC_z`, 'ACC_SMV`] - 32 Hz df_gyro # cols ['gyro_x', 'gyro_y`, 'gyro_z`, 'gyro_area`] - 100 Hz chunk_data({'acc': df_acc, 'g': df_gyro}, fs_dict={'acc': 32, 'g': 100}) ``` <br> .. Note:: If `chunk_range_margin` / `min_chunk_dur` / `max_chunk_dur` / `sub_chunk_overlap` is a int/float, it will be interpreted as numerical sequence range and a numerical-indexed `data` will be assumed. **These attributes must be all either time-based or numerical and match the data its index dtype** Parameters ----------- data: Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]], Dict[str, pd.DataFrame]] The sequence data which will be chunked. Each item in `data` must have a monotonically increasing index. We assume that each `item` in data has a _nearly-constant_ sample frequency (when there are no gaps) and all indices have the same dtype. fs_dict: Dict[str, int], optional The sample frequency dict. If set, this dict must at least withhold all the keys from the items in `data`. .. note:: if you passed a **_DataFrame-dict_** (i.e., a dict with key=str; value=DataFrame) to `data`, then you can **use** the **corresponding dataframe str-key** to describe the `fs` for all the DataFrame its columns with the `fs_dict` attribute. See also the example above chunk_range_margin: Union[float, str, pd.Timedelta], optional The allowed margin for each `ts` chunk their start and end time to be seen as same time-range chunks with other `ts`. If `None` the margin will be set as: 2 / min(fs_dict.intersection(data.names).values()) Which is equivalent to twice the min-fs (= max-period) of the passed `data`, by default None.\n * if `pd.Timedelta`, it will be interpreted as a time-range margin * if `int` or `float`, it will be interpreted as a numerical range margin min_chunk_dur : Union[float, str, pd.Timedelta], optional The minimum duration of a chunk, by default None. Chunks with durations smaller than this will be discarded (and not returned).\n * if `pd.Timedelta`, it will be interpreted as a time-range margin * if `int` or `float`, it will be interpreted as a numerical range margin max_chunk_dur : Union[float, str, pd.Timedelta], optional The maximum duration of a chunk, by default None. Chunks with durations larger than this will be chunked in smaller `sub_chunks` where each sub-chunk has a maximum duration of `max_chunk_dur`.\n * if `pd.Timedelta`, it will be interpreted as a time-range margin * if `int` or `float`, it will be interpreted as a numerical range margin sub_chunk_overlap: Union[float, str, pd.Timedelta], optional The sub-chunk boundary overlap. If available, **this margin / 2 will be added to either side of the `sub_chunk`**. This is especially useful to not lose inter-`sub_chunk` data (as each `sub_chunk` is in fact a continuous chunk) when window-based aggregations are performed on these same time range output (sub_)chunks. This argument is only relevant if `max_chunk_dur` is set.\n * if `pd.Timedelta`, it will be interpreted as a time-range margin * if `int` or `float`, it will be interpreted as a numerical range margin copy: boolean, optional If set True will return a new view (on which you won't get a `SettingWithCopyWarning` if you change the content), by default False. verbose : bool, optional If set, will print more verbose output, by default False Returns ------- List[List[pd.Series]] A list of same time range chunks. """ if isinstance(data, dict): if isinstance(fs_dict, dict): out_dict = {} for k, fs in fs_dict.items(): if k in data and isinstance(data[k], pd.DataFrame): out_dict.update({c_name: fs for c_name in data[k].columns}) fs_dict.update(out_dict) # make `data` `to_series_list` convertable() data = list(data.values()) # Convert the input data series_list = to_series_list(data) # Assert that there are no duplicate series names assert len(series_list) == len(set([s.name for s in series_list])) # Assert that the index increases monotonically assert all(s.index.is_monotonic_increasing for s in series_list) return _dtype_to_chunk_method[AttributeParser.determine_type(data)]( series_list, fs_dict, chunk_range_margin, # type: ignore[arg-type] min_chunk_dur, # type: ignore[arg-type] max_chunk_dur, # type: ignore[arg-type] sub_chunk_overlap, # type: ignore[arg-type] copy, verbose, )
Divide the time-series
data
in same time/sequence-range chunks.Does 2 things:
- Detecting gaps in the
data
(-list) sequence series. - Divides the
data
into chunks, according to the parameter configuration and the detected gaps.
Notes
- When you set
fs_dict
, the assumption is made that each item indata
has a fixed sample frequency. If you do not setfs_dict
, this variable will use the 1 / max time-diff of the corresponding series as key-value pair. - All subsequent series-chunks are matched against the time-ranges of the first
series. This implies that the first item in
data
serves as a reference for gap-matching. - The term
sub-chunk
refers to the chunks who exceed themax_chunk_duration
parameter and are therefore further divided into sub-chunks.
Example
df_acc # cols ['ACC_x', 'ACC_y`, 'ACC_z`, 'ACC_SMV`] - 32 Hz df_gyro # cols ['gyro_x', 'gyro_y`, 'gyro_z`, 'gyro_area`] - 100 Hz chunk_data({'acc': df_acc, 'g': df_gyro}, fs_dict={'acc': 32, 'g': 100})
Note
If
chunk_range_margin
/min_chunk_dur
/max_chunk_dur
/sub_chunk_overlap
is a int/float, it will be interpreted as numerical sequence range and a numerical-indexeddata
will be assumed. These attributes must be all either time-based or numerical and match the data its index dtypeParameters
data
:Union[pd.Series, pd.DataFrame, List[Union[pd.Series, pd.DataFrame]], Dict[str, pd.DataFrame]]
- The sequence data which will be chunked. Each item in
data
must have a monotonically increasing index. We assume that eachitem
in data has a nearly-constant sample frequency (when there are no gaps) and all indices have the same dtype. fs_dict
:Dict[str, int]
, optional- The sample frequency dict. If set, this dict must at least withhold all the keys
from the items in
data
.Note
if you passed a DataFrame-dict (i.e., a dict with key=str; value=DataFrame) todata
, then you can use the corresponding dataframe str-key to describe thefs
for all the DataFrame its columns with thefs_dict
attribute. See also the example above chunk_range_margin
:Union[float, str, pd.Timedelta]
, optional-
The allowed margin for each
ts
chunk their start and end time to be seen as same time-range chunks with otherts
. IfNone
the margin will be set as:2 / min(fs_dict.intersection(data.names).values())
Which is equivalent to twice the min-fs (= max-period) of the passed
data
, by default None.- if
pd.Timedelta
, it will be interpreted as a time-range margin - if
int
orfloat
, it will be interpreted as a numerical range margin
- if
min_chunk_dur
:Union[float, str, pd.Timedelta]
, optional-
The minimum duration of a chunk, by default None. Chunks with durations smaller than this will be discarded (and not returned).
- if
pd.Timedelta
, it will be interpreted as a time-range margin - if
int
orfloat
, it will be interpreted as a numerical range margin
- if
max_chunk_dur
:Union[float, str, pd.Timedelta]
, optional-
The maximum duration of a chunk, by default None. Chunks with durations larger than this will be chunked in smaller
sub_chunks
where each sub-chunk has a maximum duration ofmax_chunk_dur
.- if
pd.Timedelta
, it will be interpreted as a time-range margin - if
int
orfloat
, it will be interpreted as a numerical range margin
- if
sub_chunk_overlap
:Union[float, str, pd.Timedelta]
, optional-
The sub-chunk boundary overlap. If available, this margin / 2 will be added to either side of the
sub_chunk
. This is especially useful to not lose inter-sub_chunk
data (as eachsub_chunk
is in fact a continuous chunk) when window-based aggregations are performed on these same time range output (sub_)chunks. This argument is only relevant ifmax_chunk_dur
is set.- if
pd.Timedelta
, it will be interpreted as a time-range margin - if
int
orfloat
, it will be interpreted as a numerical range margin
- if
copy
:boolean
, optional- If set True will return a new view (on which you won't get a
SettingWithCopyWarning
if you change the content), by default False. verbose
:bool
, optional- If set, will print more verbose output, by default False
Returns
List[List[pd.Series]]
- A list of same time range chunks.
- Detecting gaps in the