"""Module defining the TSLong abstract wrapper."""
from typing import TYPE_CHECKING, Optional, Union
from tstore.backend import (
Backend,
DaskDataFrame,
DataFrame,
GeoPandasDataFrame,
PandasDataFrame,
PolarsDataFrame,
PyArrowDataFrame,
cast_column_to_large_string,
change_backend,
get_column_names,
get_dataframe_index,
)
from tstore.tswrapper.tswrapper import TSWrapper
if TYPE_CHECKING:
# To avoid circular imports
from tstore.tsdf.tsdf import TSDF
from tstore.tswide.tswide import TSWide
[docs]
class TSLong(TSWrapper):
"""Abstract wrapper for a long-form timeseries DataFrame."""
def __init__(
self,
df: DataFrame,
id_var: str,
time_var: str = "time",
ts_vars: Union[dict[str, list[str]], list[str], None] = None,
static_vars: Optional[list[str]] = None,
geometry: Optional[GeoPandasDataFrame] = None,
) -> None:
"""Wrap a long-form timeseries DataFrame as a TSLong object.
Args:
df (DataFrame): DataFrame to wrap.
id_var (str): Name of the column containing the identifier variable.
time_var (str): Name of the column containing the time variable. Defaults to "time".
ts_vars (Union[dict[str, list[str]], list[str], None]): Dictionary of named groups of column names or list
of column names (which will create one group per entry).
Defaults to None, which will group all columns not in `static_vars` together under a group called
"ts_variable".
static_vars (Optional[list[str]]): List of column names that are static across time. Defaults to None.
geometry (Optional[GeoPandasDataFrame]): GeoPandas DataFrame containing geometry information for each id.
Defaults to None.
"""
_check_id_var(id_var=id_var, df=df)
_check_time_var(time_var=time_var, df=df, id_var=id_var)
if static_vars is None:
static_vars = []
else:
_check_static_vars(
static_vars=static_vars,
df=df,
id_var=id_var,
time_var=time_var,
)
ts_vars = _ts_vars_as_checked_dict(
ts_vars=ts_vars,
df=df,
id_var=id_var,
time_var=time_var,
static_vars=static_vars,
)
_check_geometry(geometry=geometry, df=df, id_var=id_var)
df = cast_column_to_large_string(df, id_var)
if geometry is not None:
geometry = cast_column_to_large_string(geometry, id_var)
# if ensure_time_index:
# df = re_set_dataframe_index(df, index_var=time_var)
super().__init__(df)
# Set attributes using __dict__ to not trigger __setattr__
self.__dict__.update(
{
"_tstore_id_var": id_var,
"_tstore_time_var": time_var,
"_tstore_ts_vars": ts_vars,
"_tstore_static_vars": static_vars,
"_tstore_geometry": geometry,
},
)
def __new__(cls, *args, **kwargs) -> "TSLong":
"""When calling TSLong() directly, return the appropriate subclass."""
if cls is TSLong:
return TSLong.wrap(*args, **kwargs)
return super().__new__(cls)
[docs]
def change_backend(self, new_backend: Backend) -> "TSLong":
"""Return a new wrapper with the dataframe converted to a different backend."""
# new_df = change_backend(self._obj, new_backend, index_var=self._tstore_time_var)
new_df = change_backend(self._obj, new_backend)
return self._rewrap(new_df)
[docs]
@staticmethod
@TSWrapper.copy_signature(__init__)
def wrap(df: DataFrame, *args, **kwargs) -> "TSLong":
"""Wrap a DataFrame in the appropriate TSLong subclass.
Takes the same arguments as the TSLong constructor.
"""
# Lazy import to avoid circular imports
from tstore.tslong.dask import TSLongDask
from tstore.tslong.pandas import TSLongPandas
from tstore.tslong.polars import TSLongPolars
from tstore.tslong.pyarrow import TSLongPyArrow
if isinstance(df, DaskDataFrame):
return TSLongDask(df, *args, **kwargs)
if isinstance(df, PandasDataFrame):
return TSLongPandas(df, *args, **kwargs)
if isinstance(df, PolarsDataFrame):
return TSLongPolars(df, *args, **kwargs)
if isinstance(df, PyArrowDataFrame):
return TSLongPyArrow(df, *args, **kwargs)
type_path = f"{type(df).__module__}.{type(df).__qualname__}"
raise TypeError(f"Cannot wrap type {type_path} as a TSLong object.")
[docs]
def to_tsdf(self) -> "TSDF":
"""Convert the wrapper into a TSDF object."""
dask_tslong = self.change_backend(new_backend="dask")
dask_tsdf = dask_tslong.to_tsdf()
tsdf = dask_tsdf.change_ts_backend(new_backend=self.current_backend)
return tsdf
[docs]
def to_tswide(self) -> "TSWide":
"""Convert the wrapper into a TSWide object."""
dask_tslong = self.change_backend(new_backend="dask")
dask_tswide = dask_tslong.to_tswide()
tswide = dask_tswide.change_backend(new_backend=self.current_backend)
return tswide
def _check_id_var(id_var: str, df: DataFrame) -> None:
"""Check that the `id_var` argument is a column in the DataFrame.
Raises
------
ValueError: If the `id_var` argument is not a column in the DataFrame.
"""
cols = get_column_names(df)
if id_var not in cols:
raise ValueError(f"Column name {id_var} is not available in the DataFrame.")
def _check_time_var(
time_var: str,
df: DataFrame,
id_var: str,
) -> None:
"""Check that the `time_var` argument is a column in the DataFrame or the index.
Raises
------
ValueError: If the `time_var` argument is not an available column or the index in the DataFrame.
"""
available_cols = set(get_column_names(df)) | {get_dataframe_index(df)} - {id_var}
if time_var not in available_cols:
raise ValueError(f"Column name {time_var} is not available in the DataFrame.")
def _check_static_vars(
static_vars: list[str],
df: DataFrame,
id_var: str,
time_var: str,
) -> None:
"""Check that the `static_vars` contains only columns available in the DataFrame, excluding `id_var` and `time_var`.
Raises
------
ValueError: If the `static_vars` argument contains column names not available in the DataFrame.
"""
available_cols = set(get_column_names(df)) - {id_var, time_var}
if set(static_vars) - available_cols:
raise ValueError(f"Column names {set(static_vars) - available_cols} are not available in the DataFrame.")
def _ts_vars_as_checked_dict(
ts_vars: Union[dict[str, list[str]], list[str], None],
df: DataFrame,
id_var: str,
time_var: str,
static_vars: list[str],
) -> dict[str, list[str]]:
"""Convert the `ts_vars` argument to a dictionary if it is not already and check column names."""
if ts_vars is None:
return {
"ts_variable": [
col for col in get_column_names(df) if col != id_var and col != time_var and col not in static_vars
],
}
if isinstance(ts_vars, list):
ts_vars = {col: [col] for col in ts_vars}
_check_ts_vars(
ts_vars=ts_vars,
df=df,
id_var=id_var,
time_var=time_var,
static_vars=static_vars,
)
return ts_vars
def _check_ts_vars(
ts_vars: dict[str, list[str]],
df: DataFrame,
id_var: str,
time_var: str,
static_vars: list[str],
) -> None:
"""Check that the `ts_vars` argument does not contain repeated or unavailable column names.
Raises
------
ValueError: If the `ts_vars` argument contains repeated or unavailable column names.
"""
available_cols = set(get_column_names(df)) - {id_var, time_var} - set(static_vars)
requested_cols = set()
for cols in ts_vars.values():
new_cols = set(cols)
if new_cols & requested_cols:
raise ValueError(f"Column names {new_cols & available_cols} is duplicated in the `ts_vars` argument.")
requested_cols.update(new_cols)
if requested_cols - available_cols:
raise ValueError(f"Column names {requested_cols - available_cols} are not available in the DataFrame.")
if available_cols - requested_cols:
raise ValueError(f"Column names {available_cols - requested_cols} are not specified in the arguments.")
def _check_geometry(
geometry: GeoPandasDataFrame,
df: DataFrame,
id_var: str,
) -> None:
"""Check that the `geometry` has the same `id_var` as the DataFrame.
Raises
------
TypeError: If the `geometry` argument is not a GeoPandas DataFrame.
ValueError: If the `geometry` argument has a different `id_var` than the DataFrame.
"""
if geometry is None:
return
if isinstance(df, PyArrowDataFrame):
ids_df = set(change_backend(df[id_var], "pandas").unique())
else:
ids_df = set(df[id_var].unique())
ids_geo = set(geometry[id_var].unique())
if not isinstance(geometry, GeoPandasDataFrame):
raise TypeError("The `geometry` argument must be a GeoPandas DataFrame.")
if ids_df != ids_geo:
raise ValueError("The `geometry` argument does not have the same identifiers as the DataFrame.")
if len(geometry) != len(ids_geo):
raise ValueError("The `geometry` argument has duplicated identifiers.")