Source code for tstore.tslong.dask

"""Module defining the TSLongDask wrapper."""

from typing import TYPE_CHECKING

import geopandas as gpd
import pandas as pd

from tstore.tsdf.ts_class import TS
from tstore.tsdf.tsarray import TSArray
from tstore.tslong.pyarrow import TSLongPyArrow
from tstore.tslong.tslong import TSLong

if TYPE_CHECKING:
    # To avoid circular imports
    from tstore.tsdf.tsdf import TSDF
    from tstore.tswide.dask import TSWideDask


[docs] class TSLongDask(TSLong): """Wrapper for a long-form Dask timeseries dataframe."""
[docs] def to_tstore( self, base_dir, partitioning=None, tstore_structure="id-var", overwrite=True, ): """Write the wrapped dataframe as a TStore structure.""" pandas_tslong = self.change_backend(new_backend="pandas") pandas_tslong.to_tstore( base_dir=base_dir, partitioning=partitioning, tstore_structure=tstore_structure, overwrite=overwrite, )
[docs] @staticmethod def from_tstore( base_dir, ts_variables=None, start_time=None, end_time=None, inclusive=None, tstore_ids=None, columns=None, filesystem=None, use_threads=True, ) -> "TSLongDask": """Open a TStore file structure as a TSLongDask wrapper around a Pandas long dataframe.""" # Read exploiting pyarrow tslong_pyarrow = TSLongPyArrow.from_tstore( base_dir, ts_variables=ts_variables, start_time=start_time, end_time=end_time, inclusive=inclusive, tstore_ids=tstore_ids, columns=columns, filesystem=filesystem, use_threads=use_threads, ) # Conversion to pandas return tslong_pyarrow.change_backend(new_backend="dask")
[docs] def to_tsdf(self) -> "TSDF": """Convert the wrapper into a TSDF object with Dask TS objects.""" from tstore.tsdf.tsdf import TSDF tstore_ids = self._get_tstore_ids() ts_arrays = self._get_ts_arrays() pd_series = {ts_variable: pd.Series(ts_array, index=tstore_ids) for ts_variable, ts_array in ts_arrays.items()} static_values = self._get_static_values() data = {**pd_series, **static_values, self._tstore_id_var: tstore_ids} df = pd.DataFrame(data) if self._tstore_geometry is not None: df = df.merge(self._tstore_geometry, on=self._tstore_id_var, how="left") df = gpd.GeoDataFrame(df, geometry=self._tstore_geometry.geometry.name) return TSDF( df, id_var=self._tstore_id_var, )
def _get_tstore_ids(self) -> list: """Retrieve the list of tstore ids.""" return sorted(self._obj[self._tstore_id_var].unique()) def _get_ts_arrays(self) -> dict[str, TSArray]: """Create a dictionary of TSArrays.""" return {ts_variable: self._get_ts_array(variables) for ts_variable, variables in self._tstore_ts_vars.items()} def _get_ts_array(self, variables: list[str]) -> TSArray: """Create a TSArray for a set of variables.""" tstore_ids = self._get_tstore_ids() ts_list = [self._get_ts(tstore_id, variables) for tstore_id in tstore_ids] return TSArray(ts_list) def _get_ts(self, tstore_id: str, variables: list[str]) -> TS: """Create a TS object for a given tstore_id and a set of variables.""" df = self._obj # filter df by id and select only provided variables (and time) return TS(df[df[self._tstore_id_var] == tstore_id][[*variables, self._tstore_time_var]]) def _get_static_values(self) -> dict[str, list]: """Retrieve the static values.""" df = self._obj tstore_ids = self._get_tstore_ids() static_values = {} for static_var in self._tstore_static_vars: var_static_values = [] for tstore_id in tstore_ids: values = df[df[self._tstore_id_var] == tstore_id][static_var].unique().compute() if len(values) > 1: raise ValueError( f"Static variables should be unique per tstore_id. Found {list(values)} for {static_var}.", ) var_static_values.append(values[0]) static_values[static_var] = var_static_values return static_values
[docs] def to_tswide(self) -> "TSWideDask": """Convert the wrapper into a TSWideDask object.""" from tstore.tswide.dask import TSWideDask df = self._obj df = df.reset_index() df[self._tstore_id_var] = df[self._tstore_id_var].astype("category").compute() df = df.pivot_table( index=self._tstore_time_var, columns=self._tstore_id_var, values=df.columns.difference([self._tstore_id_var]), aggfunc="first", ) return TSWideDask( df, id_var=self._tstore_id_var, time_var=self._tstore_time_var, ts_vars=self._tstore_ts_vars, static_vars=self._tstore_static_vars, geometry=self._tstore_geometry, )