Source code for tstore.tsdf.tsarray

#!/usr/bin/env python3
"""
Created on Sun Jun 11 22:47:54 2023.

@author: ghiggi
"""

import dask.dataframe as dd
import numpy as np
import pandas as pd
from pandas.api.extensions import ExtensionArray

from tstore.tsdf.ts_class import TS
from tstore.tsdf.ts_dtype import TSDtype

#### Notes
# https://pandas.pydata.org/pandas-docs/stable/reference/extensions.html
# https://pandas.pydata.org/pandas-docs/stable/development/extending.html#extensionarray
# https://pandas.pydata.org/pandas-docs/stable/development/extending.html#extension-types
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.api.extensions.ExtensionArray.html
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.api.extensions.ExtensionDtype.html

# https://github.com/geopandas/geopandas/blob/83ab5c63890a2575b95ede6b8a8ef469753c9605/geopandas/array.py#L33
# https://github.com/geopandas/geopandas/blob/83ab5c63890a2575b95ede6b8a8ef469753c9605/geopandas/array.py#L261
# https://stackoverflow.com/questions/68893521/simple-example-of-pandas-extensionarray

# https://itnext.io/guide-to-pandas-extension-types-and-how-to-create-your-own-3b213d689c86


####--------------------------------------------------------------------------.
[docs] def get_tabular_object_type(obj): """Get inner class of the TS object.""" if isinstance(obj, dd.DataFrame): return "dask.DataFrame" if isinstance(obj, pd.DataFrame): return "dask.Series" if isinstance(obj, pd.DataFrame): return "pandas.DataFrame" if isinstance(obj, pd.Series): return "pandas.Series" return type(obj).__name__
[docs] class TSArray(ExtensionArray): """An ExtensionArray for TS objects, holding the array-based implementations.""" _dtype = TSDtype() def __init__(self, data, copy: bool = False): self._data = np.array(data, copy=copy) @property def _class(self): """Define inner TS class.""" # Infer TS class from TS objects # TODO: Now first. In future loop and check unique ... #### BUG HERE # If no matching index, when joining during open_tsdf, we get # array([nan, nan, nan, nan], dtype=object) here # --> Need to create empty object ! # TS[empty] ts_object = self._data[0] tabular_object = getattr(ts_object, "_obj", pd.Series()) ts_class = get_tabular_object_type(tabular_object) return ts_class def __str__(self): """String representation.""" # TODO print return str(self._data) def __repr__(self): """Repr representation.""" n = len(self._data) return f"TSArray composed of {n} TS objects." # Required for all ExtensionArray subclasses def __getitem__(self, index: int): """Select a subset of self.""" if isinstance(index, int): return self._data[index] # Check index for TestGetitemTests index = pd.core.indexers.check_array_indexer(self, index) return type(self)(self._data[index]) # TestSetitemTests def __setitem__(self, index: int, value: TS) -> None: """Set one or more values in-place.""" # Check index for TestSetitemTests index = pd.core.indexers.check_array_indexer(self, index) # Upcast to value's type (if needed) for TestMethodsTests if self._data.dtype < type(value): self._data = self._data.astype(type(value)) # TODO: Validate value for TestSetitemTests # value = self._validate_setitem_value(value) self._data[index] = value # Required for all ExtensionArray subclasses def __len__(self) -> int: """Length of this array.""" return len(self._data) # Required for all ExtensionArray subclasses @pd.core.ops.unpack_zerodim_and_defer("__eq__") def __eq__(self, other): """Equality behaviour.""" # TODO: how to compare list of TS objects return False # Required for all ExtensionArray subclasses @classmethod def _from_sequence(cls, data, dtype=None, copy: bool = False): """Construct a new TSArray from a sequence of TS.""" if dtype is None: dtype = TSDtype() if not isinstance(dtype, TSDtype): msg = f"'{cls.__name__}' only supports 'TSDtype' dtype" raise ValueError(msg) return cls(data, copy=copy) # Required for all ExtensionArray subclasses @classmethod def _concat_same_type(cls, to_concat): """Concatenate multiple TSArrays.""" # Ensure same TS class counts = pd.value_counts([array.dtype.ts_class for array in to_concat]) if counts.size > 1: raise ValueError("The TS objects must all be of the same type.") return cls(np.concatenate(to_concat)) # Required for all ExtensionArray subclasses @property def dtype(self): """An instance of TSDtype.""" return TSDtype(self._class) # Required for all ExtensionArray subclasses @property def nbytes(self) -> int: """The number of bytes needed to store this object in memory.""" return self._data.nbytes @property def ts_class(self): """TS inner class.""" return self.dtype.ts_class # Required for all ExtensionArray subclasses
[docs] def isna(self): """A 1-D array indicating if the TS is missing.""" return pd.isna(self._data)
# Required for all ExtensionArray subclasses
[docs] def copy(self): """Return a copy of the array.""" copied = self._data.copy() return type(self)(copied)
# Required for all ExtensionArray subclasses
[docs] def take(self, indices, allow_fill=False, fill_value=None): """Take elements from an array.""" if allow_fill and fill_value is None: fill_value = self.dtype.na_value result = pd.core.algorithms.take( self._data, indices, allow_fill=allow_fill, fill_value=fill_value, ) return self._from_sequence(result)