from sqlalchemy import create_engine, text from sqlalchemy.engine import URL, Engine import os import clickhouse_connect import polars as pd import pyarrow from log import log def load_to_clickhouse( client: Client, table_name: str, df: pl.DataFrame, ) -> None: """ Load a Polars DataFrame into ClickHouse using Arrow. """ if df.is_empty(): log.warning(f"{table_name}: DataFrame is empty. Skipping.") return chunk_size = 10000 for start in range(0, len(df), chunk_size): end = start + chunk_size chunk_df = df.slice(start, chunk_size) arrow_table = chunk_df.to_arrow() client.insert_arrow( table=table_name, arrow_table=arrow_table, ) log.info( f"Inserted rows {start:,} to {min(end, len(df)):,}" ) log.info( f"{table_name}: inserted {len(df):,} rows into ClickHouse" )