Source code for tstore.archive.ts.writers.pyarrow

#!/usr/bin/env python3
"""
Created on Mon Apr  8 17:26:02 2024.

@author: ghiggi
"""

import math
import os

import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq


[docs] def write_partitioned_dataset( base_dir, table, partitioning=None, row_group_size="400MB", max_file_size="2GB", compression="snappy", compression_level=None, # Computing options max_open_files=0, use_threads=True, ): """Write partitioned Parquet Dataset. https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html """ # row_group_size="400MB" # max_file_size="2GB" # compression="snappy" # compression_level=None # # Computing options # max_open_files=0 # use_threads=True if partitioning is None: partitioning = [] # Estimate row_group_size (in number of rows) if isinstance(row_group_size, str): # "200 MB" row_group_size = estimate_row_group_size(table, size=row_group_size) max_rows_per_group = row_group_size min_rows_per_group = row_group_size # Estimate maximum number of file row (in number of rows) max_rows_per_file = estimate_row_group_size(table, size=max_file_size) # Define table schema (without partitioning columns) table_schema = table.drop_columns(partitioning).schema # Define file visitor for metadata collection metadata_collector = [] def file_visitor(written_file): metadata_collector.append(written_file.metadata) # Define file options file_options = {} file_options["compression"] = compression file_options["compression_level"] = compression_level file_options["write_statistics"] = True parquet_format = ds.ParquetFileFormat() file_options = parquet_format.make_write_options(**file_options) # Rewrite dataset ds.write_dataset( table, base_dir=base_dir, format="parquet", partitioning=partitioning, partitioning_flavor="hive", # TODO: maybe enable DirectoryPartitioning and store info on metadata.yaml file # Directory options create_dir=True, existing_data_behavior="overwrite_or_ignore", # Options use_threads=use_threads, file_options=file_options, file_visitor=file_visitor, # Options for files size/rows max_rows_per_file=max_rows_per_file, min_rows_per_group=min_rows_per_group, max_rows_per_group=max_rows_per_group, # Options to control open connections max_open_files=max_open_files, ) # Maybe only to write if partitioning is not None? # Write the ``_common_metadata`` parquet file without row groups statistics pq.write_metadata(table_schema, os.path.join(base_dir, "_common_metadata")) # Write the ``_metadata`` parquet file with row groups statistics of all files pq.write_metadata( table_schema, os.path.join(base_dir, "_metadata"), metadata_collector=metadata_collector, )
[docs] def convert_size_to_bytes(size): """Convert size to bytes.""" if not isinstance(size, (str, int)): raise TypeError("Expecting a string (i.e. 200MB) or the integer number of bytes.") if isinstance(size, int): return size try: size = _convert_size_to_bytes(size) except Exception: raise ValueError("Impossible to parse {size_str} to the number of bytes.") return size
[docs] def estimate_row_group_size(df, size="200MB"): """Estimate row_group_size parameter based on the desired row group memory size. row_group_size is a Parquet argument controlling the number of rows in each Apache Parquet File Row Group. """ if isinstance(df, pa.Table): memory_used = df.nbytes elif isinstance(df, pd.DataFrame): memory_used = df.memory_usage().sum() else: raise NotImplementedError("Unrecognized dataframe type") size_bytes = convert_size_to_bytes(size) n_rows = len(df) memory_per_row = memory_used / n_rows return math.floor(size_bytes / memory_per_row)
def _convert_size_to_bytes(size_str): """Convert human filesizes to bytes. Special cases: - singular units, e.g., "1 byte" - byte vs b - yottabytes, zetabytes, etc. - with & without spaces between & around units. - floats ("5.2 mb") To reverse this, see hurry.filesize or the Django filesizeformat template filter. :param size_str: A human-readable string representing a file size, e.g., "22 megabytes". :return: The number of bytes represented by the string. """ multipliers = { "kilobyte": 1024, "megabyte": 1024**2, "gigabyte": 1024**3, "terabyte": 1024**4, "petabyte": 1024**5, "exabyte": 1024**6, "zetabyte": 1024**7, "yottabyte": 1024**8, "kb": 1024, "mb": 1024**2, "gb": 1024**3, "tb": 1024**4, "pb": 1024**5, "eb": 1024**6, "zb": 1024**7, "yb": 1024**8, } for suffix in multipliers: size_str = size_str.lower().strip().strip("s") if size_str.lower().endswith(suffix): return int(float(size_str[0 : -len(suffix)]) * multipliers[suffix]) if size_str.endswith("b"): size_str = size_str[0:-1] elif size_str.endswith("byte"): size_str = size_str[0:-4] return int(size_str) # def test_filesize_conversions(self): # """Can we convert human filesizes to bytes?""" # qa_pairs = [ # ('58 kb', 59392), # ('117 kb', 119808), # ('117kb', 119808), # ('1 byte', 1), # ('1 b', 1), # ('117 bytes', 117), # ('117 bytes', 117), # (' 117 bytes ', 117), # ('117b', 117), # ('117bytes', 117), # ('1 kilobyte', 1024), # ('117 kilobytes', 119808), # ('0.7 mb', 734003), # ('1mb', 1048576), # ('5.2 mb', 5452595), # ] # for qa in qa_pairs: # print("Converting '%s' to bytes..." % qa[0], end='') # self.assertEqual(convert_size_to_bytes(qa[0]), qa[1]) # print('✓')