from __future__ import annotations from time import sleep import sys from datetime import date, datetime, timedelta import polars as pl import yaml from log import log from clickhouse_task.create_table import create_clickhouse_table from clickhouse_task.delete_task import ( delete_existing_data, truncate_table, ) from clickhouse_task.load_table import load_to_clickhouse from db_con.connection import ( build_sql_server_engine, build_clickhouse_engine, get_clickhouse_client, ) from mids import ( MID_TABLE_COV, MID_TABLE_COV1, ) from src.bridge import * from src.fact_updated import * from src.dim import * # ========================================================== # Helpers # ========================================================== def get_dates_from_yaml(filename: str): with open(filename, "r") as file: data = yaml.safe_load(file) start_date = date.fromisoformat( str(data["pipeline"]["start_date"]) ) end_date = date.fromisoformat( str(data["pipeline"]["end_date"]) ) flag=str(data["pipeline"]["flag"]) return start_date, end_date , flag def write_table_to_yaml( data: dict, run_date: date, filename: str | None = None ): """Write table data to a YAML file.""" if filename is None: filename = f"elt_pipeline_{run_date}.yml" with open(filename, "w") as file: yaml.dump( data, file, default_flow_style=False, sort_keys=False ) print(f"Table written to {filename}") def table_exists( client, table_name: str, ) -> bool: return bool( client.command( f"EXISTS TABLE {table_name}" ) ) # ========================================================== # elt () # ========================================================== def elt(run_date : date , config: dict): log.info("=" * 80) log.info("Hello from data-move Python data pipeline!") # ------------------------------------------------------ # Run Date # ------------------------------------------------------ log.info( "Pipeline Run Date: %s", run_date, ) # ------------------------------------------------------ # Connections # ------------------------------------------------------ log.info( "Connecting to databases..." ) sql_engine = build_sql_server_engine() clickhouse_engine = build_clickhouse_engine() client = get_clickhouse_client() log.info( "Database connections established" ) # ------------------------------------------------------ # mids Keys # ------------------------------------------------------ mids = MID_TABLE_COV( sql_engine, run_date, ) emp_visit_df = MID_TABLE_COV1( sql_engine, run_date, ) # ------------------------------------------------------ # Process Tables # ------------------------------------------------------ for table in config["tables"]: table_name = table["name"] operation = table["operation"] fetch_by = table["fetch_by"] table_type=table["type"] log.info( "Pipeline Run Date: %s", run_date, ) log.info("=" * 80) log.info(f"Processing Table-:{table_name} | Table type -:{table_type} | fetcht by-:{fetch_by} | operation-:{operation}" ) try: # ------------------------------------------ # Fetch Data # ------------------------------------------ log.info(f"Fetching Data from sql server for table-: {table_name} ..............") fetch_list=["mids" ,"run_date", "reason_id"] if fetch_by in fetch_list : df=registry[f"fetch_{table_name}"](sql_engine, table_name , table_type, mids, run_date) else: df = fetch_data(sql_engine ,table_name,table_type) log.info(f"Fetched total row -: {len(df)} from sql server for table-:{table_name} ...........!!!") if df.is_empty(): log.warning( "%s returned no rows", table_name, ) continue log.info( "Fetched %s rows", len(df), ) # ------------------------------------------ # Create Table If Missing # ------------------------------------------ exists = table_exists( client, table_name, ) if not exists: log.info( "Creating table %s", table_name, ) create_clickhouse_table( df=df, table_name=table_name, clickhouse_engine=clickhouse_engine, ) # ------------------------------------------ # Existing Table Logic # ------------------------------------------ else: if operation == "DELETE+INSERT": truncate_table( client, table_name, ) else: delete_existing_data( client=client, table_name=table_name, run_date=run_date, mids=mids, emp_visit_df=emp_visit_df, ) # ------------------------------------------ # Load Data # ------------------------------------------ log.info("_ _ _ _Inserting data into clickhouse db from sql server_ _ _ _") load_to_clickhouse( client=client, table_name=table_name, df=df, ) log.info( "%s loaded successfully (%s rows)", table_name, len(df), ) except Exception: log.exception( "Failed processing table %s", table_name, ) raise log.info("=" * 80) log.info("Pipeline Completed Successfully") log.info("=" * 80)