Source code for tstore.archive.ts.readers.dask

#!/usr/bin/env python3
"""
Created on Mon Jun 12 22:24:07 2023.

@author: ghiggi
"""

import dask.dataframe as dd
import pandas as pd

from tstore.archive.ts.utility import get_time_filters


[docs] def open_ts( fpath, partitions, columns=None, start_time=None, end_time=None, inclusive=None, split_row_groups=False, # Dask-specific calculate_divisions=True, ignore_metadata_file=False, **kwargs, ): """Open a time series into a Dask.DataFrame.""" # Define filters argument filters = get_time_filters(start_time=start_time, end_time=end_time, inclusive=inclusive) # Define Apache Arrow settings arrow_to_pandas = { "zero_copy_only": False, # Default is False. If True, raise error if doing copies "strings_to_categorical": False, "date_as_object": False, # Default is True. If False convert to datetime64[ns] "timestamp_as_object": False, # Default is True. If False convert to np.datetime64[ns] "use_threads": True, # parallelize the conversion using multiple threads. "safe": True, "split_blocks": False, "ignore_metadata": False, # Default False. If False, use the 'pandas' metadata to get the Index "types_mapper": pd.ArrowDtype, # Ensure pandas is created with Arrow dtype } # Read Apache Parquet df = dd.read_parquet( fpath, engine="pyarrow", dtype_backend="pyarrow", index=None, # None --> Read back original time-index # Filtering columns=columns, # Specify columns to load filters=filters, # Row-filtering at read-time # Metadata options calculate_divisions=calculate_divisions, # Calculate divisions from metadata ignore_metadata_file=ignore_metadata_file, # True can slowdown a lot reading # Partitioning split_row_groups=split_row_groups, # False --> Each file a partition # Arrow options arrow_to_pandas=arrow_to_pandas, # Other options, **kwargs, ) # Drop partitioning columns df = df.drop(columns=partitions) return df