Source code for tstore.archive.partitions

#!/usr/bin/env python3
"""
Created on Mon Apr  8 17:05:26 2024.

@author: ghiggi
"""


[docs] def get_datetime_properties(df, time_var): """Get datetime properties from time column or index.""" return df[time_var].dt if time_var in df.columns else df.index
[docs] def get_partitioning_mapping_dict(time_var, backend="pandas"): """Get partitioning mapping dict.""" # Mapping of partitioning components to corresponding pandas attributes if backend == "pandas": partitioning_mapping = { "year": lambda df: get_datetime_properties(df, time_var).year, "month": lambda df: get_datetime_properties(df, time_var).month, "day": lambda df: get_datetime_properties(df, time_var).day, "doy": lambda df: get_datetime_properties(df, time_var).dayofyear, "dow": lambda df: get_datetime_properties(df, time_var).dayofweek, # week TODO "hh": lambda df: get_datetime_properties(df, time_var).hour, "mm": lambda df: get_datetime_properties(df, time_var).minute, "ss": lambda df: get_datetime_properties(df, time_var).second, } elif backend == "polars": partitioning_mapping = { "year": lambda df: get_datetime_properties(df, time_var).year(), "month": lambda df: get_datetime_properties(df, time_var).month(), "day": lambda df: get_datetime_properties(df, time_var).day(), "doy": lambda df: get_datetime_properties(df, time_var).ordinal_day(), "dow": lambda df: get_datetime_properties(df, time_var).weekday(), # 'week': lambda df: get_time_var(df).week(), "hh": lambda df: get_datetime_properties(df, time_var).hour(), "mm": lambda df: get_datetime_properties(df, time_var).minute(), "ss": lambda df: get_datetime_properties(df, time_var).second(), } else: raise NotImplementedError(f"Backend {backend}") # TODO: add quarter, daysinmonth, month_name and relevant checks # TODO: partitioning_str: (YYYY/MM/DD) or (YYYY/DOY/HH). Or list ? # TODO: provide proxy for year(YYYY) and month (MM) ? But month conflicts with minutes ? # TODO: for polars return partitioning_mapping
[docs] def get_valid_partitions(): """Get valid partitioning components.""" return list(get_partitioning_mapping_dict(time_var="dummy"))
[docs] def check_partitions(partitioning_str): """Check partitioning components of partitinoning string. Return the partitioning components. """ if partitioning_str is None: return None # Parse the partitioning string to extract partitioning components partitioning_components = partitioning_str.split("/") # Get valid partitions valid_partitions = get_valid_partitions() # Check specified partitions partitions = [] for component in partitioning_components: if component.lower() not in valid_partitions: raise ValueError(f"Invalid partitioning component '{component}'") partitions.append(component.lower()) # Ensure month/day or doy is specified if "month" in partitions and "doy" in partitions: raise ValueError("Either specify 'month' or 'doy' (day of year).") if "day" in partitions and "doy" in partitions: raise ValueError("Either specify 'day' or 'doy' (day of year).") return partitions
[docs] def check_partitioning(partitioning, ts_variables): """Check to_tstore partitioning values.""" if not isinstance(partitioning, (dict, str, type(None))): raise TypeError("") if isinstance(partitioning, str) or partitioning is None: partitioning = {ts_variable: partitioning for ts_variable in ts_variables} for ts_variable, partitioning_str in partitioning.items(): try: partitions = check_partitions(partitioning_str) if partitions is not None: partitioning[ts_variable] = "/".join(partitions) except Exception as e: raise ValueError(f"Invalid partitioning for {ts_variable}: {e}") return partitioning
[docs] def add_partitioning_columns(df, partitioning_str, time_var, backend): """Add partitioning columns to the dataframe based on the partitioning string.""" if partitioning_str is None: return df, None partitions = check_partitions(partitioning_str) partitioning_mapping = get_partitioning_mapping_dict(time_var=time_var, backend=backend) for component in partitions: if backend in ["pandas"]: df[component] = partitioning_mapping[component](df) elif backend == "polars": df_series = partitioning_mapping[component](df) df = df.with_columns(df_series.alias(component)) else: raise NotImplementedError return df, partitions