# /// script # requires-python = ">=3.11" # dependencies = [ # "polars>=0.20.0", # "pyarrow>=18.0.0", # "sqlalchemy>=2.0.0", # "pyodbc>=5.0.0", # "clickhouse-connect>=0.7.0", # "clickhouse-sqlalchemy>=0.3.2", # "pyyaml>=6.0.3", # "python-dotenv>=1.0.0", # ] # /// from __future__ import annotations from time import sleep import sys from datetime import date, datetime, timedelta import polars as pl import yaml from log import log from clickhouse_task.create_table import create_clickhouse_table from clickhouse_task.delete_task import ( delete_existing_data, truncate_table, ) from clickhouse_task.load_table import load_to_clickhouse from db_con.connection import ( build_sql_server_engine, build_clickhouse_engine, get_clickhouse_client, ) from mids import ( MID_TABLE_COV, MID_TABLE_COV1, ) from src.bridge import * from src.fact_updated import * from src.dim import * from elt2 import * def main() : # ------------------------------------------------------ # Config # ------------------------------------------------------ with open( "y.yml", "r", ) as file: table_config = yaml.safe_load(file) config_file = Path("Pipeline_config.yml") if not config_file.exists(): default_config = { "pipeline": { "run_date": None, "status": None, "error_message": None, } } with open(config_file, "w") as f: yaml.safe_dump(default_config, f) with open(config_file, "r") as f: config = yaml.safe_load(f) p_start_date, p_end_date , flag= get_dates_from_yaml("elt_pipeline_custom_dates.yml") if flag =="Y" : start_date=p_start_date end_date=p_end_date elif len(sys.argv) > 1: start_date = datetime.strptime( sys.argv[1], "%Y-%m-%d", ).date() end_date=start_date + timedelta(days=1) else: start_date = date.today() - timedelta(days=1) end_date=start_date log.info( "Pipeline Start Date: %s", start_date, ) failed_dates=[] successful_dates=[] filename_successful = "successful_Pipeline_dates_config.yml" filename_failed = "failed_Pipeline_dates_config.yml" while start_date <=end_date: run_date = start_date for attempt in range(3): try: elt(run_date , table_config) successful_dates.append({ 'pipeline_trigeered_on_date': str(date.today()), 'last_successful_run_date': run_date, }) log.info( f"Pipeline completed successfully. " f"pipeline_trigeered_on_date={date.today()}" f"last_successful_run_date={run_date}" ) break except Exception as e: failed_dates.append({ 'pipeline_trigeered_on_date': str(date.today()), 'failed_run_date': run_date, "attempt" : attempt }) if attempt == 2: raise log.warning( f"Pipeline failed. Retry {attempt + 1}/3. Error: {e}" ) sleep(5) start_date=start_date + timedelta(days=1) with open(filename_successful, "w") as f: yaml.dump( successful_dates, f, default_flow_style=False, sort_keys=False, ) if len(failed_dates) == 0 : failed_dates.append({ 'pipeline_trigeered_on_date': str(date.today()), 'failed_run_date': "none", "attempt" : "none" }) with open(filename_failed, "w") as f: yaml.dump(failed_dates, f, default_flow_style=False, sort_keys=False) if __name__ == "__main__": main()