Source code for tstore.tsdf.writer

#!/usr/bin/env python3
"""
Created on Mon Jun 12 15:49:54 2023.

@author: ghiggi
"""

import numpy as np

from tstore.archive.io import (
    check_tstore_directory,
    check_tstore_structure,
    define_tsarray_filepath,
)
from tstore.archive.partitions import check_partitioning
from tstore.tsdf.ts_dtype import TSDtype


def _get_ts_variables(df):
    """Get list of TSArray columns."""
    columns = np.array(list(df.columns))
    ts_variables = columns[[isinstance(df[column].dtype, TSDtype) for column in columns]].tolist()
    return ts_variables


def _get_static_columns(df):
    """Get list of non-TSArray columns."""
    columns = list(df.columns)
    ts_variables = _get_ts_variables(df)
    static_columns = list(set(columns).symmetric_difference(ts_variables))
    return static_columns


def _write_attributes(df, base_dir):
    """Write TSDF static attributes."""
    from tstore.archive.attributes.pandas import write_attributes

    static_columns = _get_static_columns(df)
    df_attributes = df[static_columns]
    df_attributes.index.name = "tstore_id"
    write_attributes(df=df_attributes, base_dir=base_dir)


def _write_ts_series(ts_series, tstore_ids, base_dir, partitioning_str, tstore_structure):
    """Write TSDF TSArray."""
    ts_variable = ts_series.name
    for tstore_id, ts in zip(tstore_ids, ts_series):
        if ts:  # not empty
            ts_fpath = define_tsarray_filepath(
                base_dir=base_dir,
                tstore_id=tstore_id,
                ts_variable=ts_variable,
                tstore_structure=tstore_structure,
            )
            ts.to_disk(ts_fpath, partitioning_str=partitioning_str)


def _write_tsarrays(df, id_var, base_dir, partitioning, tstore_structure):
    """Write TSDF TSArrays."""
    tsarray_columns = _get_ts_variables(df)
    for column in tsarray_columns:
        partitioning_str = partitioning[column]
        _write_ts_series(
            ts_series=df[column],
            tstore_ids=df[id_var],
            base_dir=base_dir,
            partitioning_str=partitioning_str,
            tstore_structure=tstore_structure,
        )


def _write_metadata(base_dir, tstore_structure, id_var, ts_variables, partitioning):
    """Write TStore metadata."""
    from tstore.archive.metadata.writers import write_tstore_metadata

    write_tstore_metadata(
        base_dir=base_dir,
        id_var=id_var,
        ts_vars=ts_variables,
        tstore_structure=tstore_structure,
        partitioning=partitioning,
    )


[docs] def write_tstore( df, base_dir, id_var, partitioning, tstore_structure="id-var", overwrite=True, ): """Write TStore from TSDF object.""" # Checks tstore_structure = check_tstore_structure(tstore_structure) base_dir = check_tstore_directory(base_dir, overwrite=overwrite) ts_variables = _get_ts_variables(df) partitioning = check_partitioning(partitioning, ts_variables=ts_variables) # Write static attributes _write_attributes(df, base_dir=base_dir) # Write TSArrays _write_tsarrays(df, id_var, base_dir=base_dir, partitioning=partitioning, tstore_structure=tstore_structure) # Write TSArrays metadata ts_variables = _get_ts_variables(df) _write_metadata( base_dir=base_dir, tstore_structure=tstore_structure, ts_variables=ts_variables, id_var=id_var, partitioning=partitioning, )