"""Module defining the TSDF abstract wrapper for a dataframe of TSArray objects."""
from typing import TYPE_CHECKING, Optional
import dask.dataframe as dd
import geopandas as gpd
from tstore.archive.attributes.geopandas import read_geometry
from tstore.archive.metadata.readers import read_tstore_metadata
from tstore.backend import (
Backend,
DataFrame,
GeoPandasDataFrame,
PandasDataFrame,
dataframe_types,
remove_dataframe_index,
)
from tstore.tsdf.reader import _read_tsarrays
from tstore.tsdf.ts_dtype import TSDtype
from tstore.tsdf.writer import write_tstore
from tstore.tswrapper.tswrapper import TSWrapper
if TYPE_CHECKING:
# To avoid circular imports
from tstore.tslong.dask import TSLongDask
from tstore.tslong.tslong import TSLong
from tstore.tswide.tswide import TSWide
[docs]
class TSDF(TSWrapper):
"""Wrapper for a DataFrame of TSArray objects."""
def __init__(
self,
df: DataFrame,
id_var: str,
) -> None:
"""Wrap a DataFrame of TSArrays as a TSDF object.
Args:
df (pd.DataFrame): DataFrame to wrap.
id_var (str): Name of the column containing the identifier variable.
"""
if not isinstance(df, PandasDataFrame):
raise TypeError(
"The input dataframe must be a Pandas DataFrame. Inner TS objects can contain dataframes of any type.",
)
df = remove_dataframe_index(df)
super().__init__(df)
# Set attributes using __dict__ to not trigger __setattr__
self.__dict__.update(
{
"_tstore_id_var": id_var,
},
)
def __new__(cls, *args, **kwargs) -> "TSDF":
"""When calling TSDF() directly, return the appropriate subclass."""
if cls is TSDF:
return TSDF.wrap(*args, **kwargs)
return super().__new__(cls)
[docs]
def change_ts_backend(self, new_backend: Backend, ts_cols: Optional[list[str]] = None) -> "TSDF":
"""Return a new TSDF object with dataframes wrapped in internal TS objects converted to a different backend.
Args:
new_backend (Backend): New backend to use for the TS objects.
ts_cols (Optional[list[str]]): List of columns to convert. If None, convert all TS columns.
"""
df = self._obj.copy()
all_ts_cols = get_ts_columns(df)
if ts_cols is None:
ts_cols = all_ts_cols
inexistent_cols = set(ts_cols) - set(all_ts_cols)
if inexistent_cols:
raise ValueError(f"TS columns {inexistent_cols} do not exist in the TSDF object.")
df[ts_cols] = df[ts_cols].map(lambda ts_obj: ts_obj.change_backend(new_backend))
df[ts_cols] = df[ts_cols].astype(TSDtype(dataframe_types[new_backend]))
return self.wrap(df, self._tstore_id_var)
[docs]
def get_ts_backend(self, ts_col: str) -> Backend:
"""Return the current backend of a wrapped dataframe."""
df = self._obj.copy()
ts_object = df[ts_col].iloc[0]
return ts_object.current_backend
[docs]
@staticmethod
@TSWrapper.copy_signature(__init__)
def wrap(df: DataFrame, *args, **kwargs) -> "TSDF":
"""Wrap a DataFrame in the appropriate TSDF subclass."""
# Lazy import to avoid circular imports
from tstore.tsdf.geopandas import TSDFGeoPandas
from tstore.tsdf.pandas import TSDFPandas
if isinstance(df, GeoPandasDataFrame):
return TSDFGeoPandas(df, *args, **kwargs)
if isinstance(df, PandasDataFrame):
return TSDFPandas(df, *args, **kwargs)
type_path = f"{type(df).__module__}.{type(df).__qualname__}"
raise TypeError(f"Cannot wrap type {type_path} as a TSDF object.")
[docs]
def to_tstore(
self,
base_dir,
partitioning=None,
tstore_structure="id-var",
overwrite=True, # append functionality?
# geometry
):
"""Write TStore from TSDF object."""
tsdf = self.change_ts_backend(new_backend="dask")
tsdf._to_tstore_dask(
base_dir=base_dir,
partitioning=partitioning,
tstore_structure=tstore_structure,
overwrite=overwrite,
)
def _to_tstore_dask(
self,
base_dir,
partitioning=None,
tstore_structure="id-var",
overwrite=True, # append functionality?
# geometry
):
"""Write TStore from TSDF object with Dask backend for TS objects."""
write_tstore(
self._obj,
base_dir=base_dir,
id_var=self._tstore_id_var,
partitioning=partitioning,
tstore_structure=tstore_structure,
overwrite=overwrite,
)
[docs]
@staticmethod
def from_tstore(base_dir: str, backend: Backend = "dask") -> "TSDF":
"""Read TStore into TSDF object."""
dask_tsdf = TSDF._from_tstore_dask(base_dir)
if backend == "dask":
return dask_tsdf
return dask_tsdf.change_ts_backend(new_backend=backend)
@staticmethod
def _from_tstore_dask(base_dir: str) -> "TSDF":
"""Read TStore into TSDF object with Dask backend for TS objects."""
# TODO: enable specify subset of TSArrays, attribute columns and rows to load
# TODO: read_attributes using geopandas --> geoparquet
# TODO: separate TSDF class if geoparquet (TSDF inherit from geopandas.GeoDataFrame ?)
from tstore.archive.attributes.pandas import read_attributes
# Read TStore metadata
metadata = read_tstore_metadata(base_dir=base_dir)
id_var = metadata["id_var"]
# Read TStore attributes
df = read_attributes(base_dir).set_index(id_var)
# Get list of TSArrays
list_ts_series = _read_tsarrays(base_dir, metadata)
# Join TSArrays to dataframe
for ts_series in list_ts_series:
df = df.join(ts_series, how="left")
# pd.merge(df_attrs, df_series, left_index=True, right_index=True)
# Read geometry data
geometry = read_geometry(
base_dir=base_dir,
id_var=id_var,
)
if geometry is not None:
df = df.drop(columns=geometry.geometry.name)
df = df.merge(geometry, on=id_var, how="left")
df = gpd.GeoDataFrame(df, geometry=geometry.geometry.name)
# Return the TSDF
return TSDF(
df,
id_var=metadata["id_var"],
)
# Method that return identifier column
# Method that return the timeseries columns (TSArrays)
# Add compute method
# Add wrappers to methods iloc, loc or join to return TSDF class
# Remove methods that are not supported by TSArray
# --> min, ...
[docs]
def to_tslong(self, backend: Backend = "dask") -> "TSLong":
"""Convert the wrapper into a TSLong object."""
dask_tsdf = self.change_ts_backend(new_backend="dask")
dask_tslong = dask_tsdf._to_tslong_dask()
tslong = dask_tslong.change_backend(new_backend=backend)
return tslong
def _to_tslong_dask(self) -> "TSLongDask":
"""Convert the wrapper into a TSLong object."""
from tstore.tslong.dask import TSLongDask
df = None
tstore_ids = self._obj[self._tstore_id_var].unique()
long_rows = [self._get_long_rows(tstore_id) for tstore_id in tstore_ids]
df = dd.concat(long_rows)
time_var = df.index.name
return TSLongDask(
df.reset_index(),
id_var=self._tstore_id_var,
time_var=time_var,
ts_vars=self._tstore_ts_vars,
static_vars=self._tstore_static_vars,
)
def _get_long_rows(self, tstore_id: str) -> dd.DataFrame:
"""Return a long form DataFrame for a single tstore_id."""
ts_df = self._obj
ts_df = ts_df[ts_df[self._tstore_id_var] == tstore_id]
ts_cols = get_ts_columns(ts_df)
# Add time series
df = None
for ts_col in ts_cols:
new_df = ts_df[ts_col].iloc[0]._obj
df = new_df if df is None else df.join(new_df, how="outer")
# Add static variables
df = df.assign(**{self._tstore_id_var: tstore_id})
for static_var in self._tstore_static_vars:
static_value = ts_df[static_var].iloc[0]
df = df.assign(**{static_var: static_value})
return df
[docs]
def to_tswide(self, backend: Backend = "dask") -> "TSWide":
"""Convert the wrapper into a TSWide object."""
return self.to_tslong(backend=backend).to_tswide()
@property
def _tstore_ts_vars(self) -> dict[str, list[str]]:
"""Return the dictionary of time-series column names."""
df = self._obj
ts_cols = get_ts_columns(df)
ts_objects = {col: df[col].iloc[0] for col in ts_cols}
return {
col: [var for var in ts_obj._tstore_columns if var != ts_obj._tstore_time_var]
for col, ts_obj in ts_objects.items()
}
@property
def _tstore_static_vars(self) -> list[str]:
"""Return the list of static column names."""
df = self._obj
return [col for col in df.columns if col != self._tstore_id_var and not isinstance(df[col].dtype, TSDtype)]
[docs]
def get_ts_columns(df: PandasDataFrame) -> list[str]:
"""Return the list of columns containing TS objects."""
return [col for col in df.columns if isinstance(df[col].dtype, TSDtype)]