Files
Ankit Malik 1d5ad2d793 1st commit
2026-06-25 11:20:22 +05:30

42 lines
939 B
Python

from sqlalchemy import create_engine, text
from sqlalchemy.engine import URL, Engine
import os
import clickhouse_connect
import polars as pd
import pyarrow
from log import log
def load_to_clickhouse(
client: Client,
table_name: str,
df: pl.DataFrame,
) -> None:
"""
Load a Polars DataFrame into ClickHouse using Arrow.
"""
if df.is_empty():
log.warning(f"{table_name}: DataFrame is empty. Skipping.")
return
chunk_size = 10000
for start in range(0, len(df), chunk_size):
end = start + chunk_size
chunk_df = df.slice(start, chunk_size)
arrow_table = chunk_df.to_arrow()
client.insert_arrow(
table=table_name,
arrow_table=arrow_table,
)
log.info(
f"Inserted rows {start:,} to {min(end, len(df)):,}"
)
log.info(
f"{table_name}: inserted {len(df):,} rows into ClickHouse"
)