diff --git a/clickhouse_task/load_table.py b/clickhouse_task/load_table.py index b7ace82..8585576 100644 --- a/clickhouse_task/load_table.py +++ b/clickhouse_task/load_table.py @@ -20,12 +20,22 @@ def load_to_clickhouse( log.warning(f"{table_name}: DataFrame is empty. Skipping.") return - arrow_table = df.to_arrow() + chunk_size = 10000 - client.insert_arrow( - table=table_name, - arrow_table=arrow_table, - ) + for start in range(0, len(df), chunk_size): + end = start + chunk_size + + chunk_df = df.slice(start, chunk_size) + arrow_table = chunk_df.to_arrow() + + client.insert_arrow( + table=table_name, + arrow_table=arrow_table, + ) + + log.info( + f"Inserted rows {start:,} to {min(end, len(df)):,}" + ) log.info( f"{table_name}: inserted {len(df):,} rows into ClickHouse" diff --git a/db_con/connection.py b/db_con/connection.py index 2c1a698..d71ad85 100644 --- a/db_con/connection.py +++ b/db_con/connection.py @@ -86,5 +86,7 @@ def get_clickhouse_client(): port=CH_PORT, username=CH_USER, password=CH_PASS, - database=CH_DB + database=CH_DB, + connect_timeout=30, + send_receive_timeout=600, ) \ No newline at end of file diff --git a/elt.py b/elt.py new file mode 100644 index 0000000..6c3d722 --- /dev/null +++ b/elt.py @@ -0,0 +1,266 @@ + +from __future__ import annotations +from time import sleep +import sys +from datetime import date, datetime, timedelta + +import polars as pl +import yaml + + +from log import log + +from clickhouse_task.create_table import create_clickhouse_table +from clickhouse_task.delete_task import ( + delete_existing_data, + truncate_table, +) + +from clickhouse_task.load_table import load_to_clickhouse + +from db_con.connection import ( + build_sql_server_engine, + build_clickhouse_engine, + get_clickhouse_client, +) + +from mids import ( + MID_TABLE_COV, + MID_TABLE_COV1, +) + +from src.bridge import * +from src.fact import * +from src.dim import * + + +# ========================================================== +# Helpers +# ========================================================== + +def get_dates_from_yaml(filename: str): + with open(filename, "r") as file: + data = yaml.safe_load(file) + + start_date = date.fromisoformat( + str(data["pipeline"]["start_date"]) + ) + + end_date = date.fromisoformat( + str(data["pipeline"]["end_date"]) + ) + flag=str(data["pipeline"]["flag"]) + + return start_date, end_date , flag + + +def write_table_to_yaml( + data: dict, + run_date: date, + filename: str | None = None +): + """Write table data to a YAML file.""" + + if filename is None: + filename = f"elt_pipeline_{run_date}.yml" + + with open(filename, "w") as file: + yaml.dump( + data, + file, + default_flow_style=False, + sort_keys=False + ) + + print(f"Table written to {filename}") + + + + +def table_exists( + client, + table_name: str, +) -> bool: + + return bool( + client.command( + f"EXISTS TABLE {table_name}" + ) + ) + + +# ========================================================== +# elt () +# ========================================================== + +def elt(run_date : date , config: dict): + + log.info("=" * 80) + log.info("Hello from data-move Python data pipeline!") + + # ------------------------------------------------------ + # Run Date + # ------------------------------------------------------ + + + + log.info( + "Pipeline Run Date: %s", + run_date, + ) + + # ------------------------------------------------------ + # Connections + # ------------------------------------------------------ + + log.info( + "Connecting to databases..." + ) + + sql_engine = build_sql_server_engine() + clickhouse_engine = build_clickhouse_engine() + client = get_clickhouse_client() + + log.info( + "Database connections established" + ) + + # ------------------------------------------------------ + # mids Keys + # ------------------------------------------------------ + + mids = MID_TABLE_COV( + sql_engine, + run_date, + ) + + emp_visit_df = MID_TABLE_COV1( + sql_engine, + run_date, + ) + + + + + # ------------------------------------------------------ + # Process Tables + # ------------------------------------------------------ + for table in config["tables"]: + table_name = table["name"] + operation = table["operation"] + fetch_by = table["fetch_by"] + table_type=table["type"] + log.info( + "Pipeline Run Date: %s", + run_date, + ) + + + log.info("=" * 80) + log.info(f"Processing Table-:{table_name} | Table type -:{table_type} | fetcht by-:{fetch_by} | operation-:{operation}" ) + + try: + + # ------------------------------------------ + # Fetch Data + # ------------------------------------------ + + + log.info(f"Fetching Data from sql server for table-: {table_name} ..............") + fetch_list=["mids" ,"run_date", "reason_id"] + if fetch_by in fetch_list : + fn_name = f"fetch_{table_name}" + fn = globals()[fn_name] + df=fn(sql_engine, table_name , table_type, mids, run_date) + else: + df = fetch_data(sql_engine ,table_name,table_type) + + log.info(f"Fetched total row -: {len(df)} from sql server for table-:{table_name} ...........!!!") + + if df.is_empty(): + + log.warning( + "%s returned no rows", + table_name, + ) + + continue + + log.info( + "Fetched %s rows", + len(df), + ) + + # ------------------------------------------ + # Create Table If Missing + # ------------------------------------------ + + exists = table_exists( + client, + table_name, + ) + + if not exists: + + log.info( + "Creating table %s", + table_name, + ) + + create_clickhouse_table( + df=df, + table_name=table_name, + clickhouse_engine=clickhouse_engine, + ) + + # ------------------------------------------ + # Existing Table Logic + # ------------------------------------------ + + else: + + if operation == "DELETE+INSERT": + + truncate_table( + client, + table_name, + ) + + else: + + delete_existing_data( + client=client, + table_name=table_name, + run_date=run_date, + mids=mids, + emp_visit_df=emp_visit_df, + ) + + # ------------------------------------------ + # Load Data + # ------------------------------------------ + log.info("_ _ _ _Inserting data into clickhouse db from sql server_ _ _ _") + load_to_clickhouse( + client=client, + table_name=table_name, + df=df, + ) + + log.info( + "%s loaded successfully (%s rows)", + table_name, + len(df), + ) + + except Exception: + + log.exception( + "Failed processing table %s", + table_name, + ) + + raise + + log.info("=" * 80) + log.info("Pipeline Completed Successfully") + log.info("=" * 80) \ No newline at end of file diff --git a/elt2.py b/elt2.py new file mode 100644 index 0000000..0b1f5be --- /dev/null +++ b/elt2.py @@ -0,0 +1,264 @@ + +from __future__ import annotations +from time import sleep +import sys +from datetime import date, datetime, timedelta + +import polars as pl +import yaml + + +from log import log + +from clickhouse_task.create_table import create_clickhouse_table +from clickhouse_task.delete_task import ( + delete_existing_data, + truncate_table, +) + +from clickhouse_task.load_table import load_to_clickhouse + +from db_con.connection import ( + build_sql_server_engine, + build_clickhouse_engine, + get_clickhouse_client, +) + +from mids import ( + MID_TABLE_COV, + MID_TABLE_COV1, +) + +from src.bridge import * +from src.fact_updated import * +from src.dim import * + + +# ========================================================== +# Helpers +# ========================================================== + +def get_dates_from_yaml(filename: str): + with open(filename, "r") as file: + data = yaml.safe_load(file) + + start_date = date.fromisoformat( + str(data["pipeline"]["start_date"]) + ) + + end_date = date.fromisoformat( + str(data["pipeline"]["end_date"]) + ) + flag=str(data["pipeline"]["flag"]) + + return start_date, end_date , flag + + +def write_table_to_yaml( + data: dict, + run_date: date, + filename: str | None = None +): + """Write table data to a YAML file.""" + + if filename is None: + filename = f"elt_pipeline_{run_date}.yml" + + with open(filename, "w") as file: + yaml.dump( + data, + file, + default_flow_style=False, + sort_keys=False + ) + + print(f"Table written to {filename}") + + + + +def table_exists( + client, + table_name: str, +) -> bool: + + return bool( + client.command( + f"EXISTS TABLE {table_name}" + ) + ) + + +# ========================================================== +# elt () +# ========================================================== + +def elt(run_date : date , config: dict): + + log.info("=" * 80) + log.info("Hello from data-move Python data pipeline!") + + # ------------------------------------------------------ + # Run Date + # ------------------------------------------------------ + + + + log.info( + "Pipeline Run Date: %s", + run_date, + ) + + # ------------------------------------------------------ + # Connections + # ------------------------------------------------------ + + log.info( + "Connecting to databases..." + ) + + sql_engine = build_sql_server_engine() + clickhouse_engine = build_clickhouse_engine() + client = get_clickhouse_client() + + log.info( + "Database connections established" + ) + + # ------------------------------------------------------ + # mids Keys + # ------------------------------------------------------ + + mids = MID_TABLE_COV( + sql_engine, + run_date, + ) + + emp_visit_df = MID_TABLE_COV1( + sql_engine, + run_date, + ) + + + + + # ------------------------------------------------------ + # Process Tables + # ------------------------------------------------------ + for table in config["tables"]: + table_name = table["name"] + operation = table["operation"] + fetch_by = table["fetch_by"] + table_type=table["type"] + log.info( + "Pipeline Run Date: %s", + run_date, + ) + + + log.info("=" * 80) + log.info(f"Processing Table-:{table_name} | Table type -:{table_type} | fetcht by-:{fetch_by} | operation-:{operation}" ) + + try: + + # ------------------------------------------ + # Fetch Data + # ------------------------------------------ + + + log.info(f"Fetching Data from sql server for table-: {table_name} ..............") + fetch_list=["mids" ,"run_date", "reason_id"] + if fetch_by in fetch_list : + df=registry[f"fetch_{table_name}"](sql_engine, table_name , table_type, mids, run_date) + else: + df = fetch_data(sql_engine ,table_name,table_type) + + log.info(f"Fetched total row -: {len(df)} from sql server for table-:{table_name} ...........!!!") + + if df.is_empty(): + + log.warning( + "%s returned no rows", + table_name, + ) + + continue + + log.info( + "Fetched %s rows", + len(df), + ) + + # ------------------------------------------ + # Create Table If Missing + # ------------------------------------------ + + exists = table_exists( + client, + table_name, + ) + + if not exists: + + log.info( + "Creating table %s", + table_name, + ) + + create_clickhouse_table( + df=df, + table_name=table_name, + clickhouse_engine=clickhouse_engine, + ) + + # ------------------------------------------ + # Existing Table Logic + # ------------------------------------------ + + else: + + if operation == "DELETE+INSERT": + + truncate_table( + client, + table_name, + ) + + else: + + delete_existing_data( + client=client, + table_name=table_name, + run_date=run_date, + mids=mids, + emp_visit_df=emp_visit_df, + ) + + # ------------------------------------------ + # Load Data + # ------------------------------------------ + log.info("_ _ _ _Inserting data into clickhouse db from sql server_ _ _ _") + load_to_clickhouse( + client=client, + table_name=table_name, + df=df, + ) + + log.info( + "%s loaded successfully (%s rows)", + table_name, + len(df), + ) + + except Exception: + + log.exception( + "Failed processing table %s", + table_name, + ) + + raise + + log.info("=" * 80) + log.info("Pipeline Completed Successfully") + log.info("=" * 80) \ No newline at end of file diff --git a/failed_Pipeline_dates_config.yml b/failed_Pipeline_dates_config.yml index a1bb2a3..d82f49b 100644 --- a/failed_Pipeline_dates_config.yml +++ b/failed_Pipeline_dates_config.yml @@ -1,3 +1,3 @@ -- pipeline_trigeered_on_date: '2026-06-23' +- pipeline_trigeered_on_date: '2026-06-25' failed_run_date: none attempt: none diff --git a/logs/etl_20260624.log b/logs/etl_20260624.log new file mode 100644 index 0000000..b42199a --- /dev/null +++ b/logs/etl_20260624.log @@ -0,0 +1,240 @@ +2026-06-24 15:55:46 | INFO | Pipeline Start Date: 2026-06-23 +2026-06-24 15:55:46 | INFO | ================================================================================ +2026-06-24 15:55:46 | INFO | Hello from data-move Python data pipeline! +2026-06-24 15:55:46 | INFO | Pipeline Run Date: 2026-06-23 +2026-06-24 15:55:46 | INFO | Connecting to databases... +2026-06-24 15:55:48 | INFO | +2026-06-24 15:55:52 | INFO | +2026-06-24 15:55:52 | INFO | Database connections established +2026-06-24 15:55:52 | INFO | Collecting MIDs for: 2026-06-23 +2026-06-24 15:55:52 | INFO | Found 863 MIDs +2026-06-24 15:55:52 | WARNING | Pipeline failed. Retry 1/3. Error: 'tables' +2026-06-24 15:55:57 | INFO | ================================================================================ +2026-06-24 15:55:57 | INFO | Hello from data-move Python data pipeline! +2026-06-24 15:55:57 | INFO | Pipeline Run Date: 2026-06-23 +2026-06-24 15:55:57 | INFO | Connecting to databases... +2026-06-24 15:55:58 | INFO | +2026-06-24 15:55:58 | INFO | +2026-06-24 15:55:59 | INFO | Database connections established +2026-06-24 15:55:59 | INFO | Collecting MIDs for: 2026-06-23 +2026-06-24 15:55:59 | INFO | Found 863 MIDs +2026-06-24 15:55:59 | WARNING | Pipeline failed. Retry 2/3. Error: 'tables' +2026-06-24 15:56:04 | INFO | ================================================================================ +2026-06-24 15:56:04 | INFO | Hello from data-move Python data pipeline! +2026-06-24 15:56:04 | INFO | Pipeline Run Date: 2026-06-23 +2026-06-24 15:56:04 | INFO | Connecting to databases... +2026-06-24 15:56:04 | INFO | +2026-06-24 15:56:05 | INFO | +2026-06-24 15:56:05 | INFO | Database connections established +2026-06-24 15:56:05 | INFO | Collecting MIDs for: 2026-06-23 +2026-06-24 15:56:05 | INFO | Found 863 MIDs +2026-06-24 16:10:30 | INFO | Pipeline Start Date: 2026-06-23 +2026-06-24 16:10:30 | INFO | ================================================================================ +2026-06-24 16:10:30 | INFO | Hello from data-move Python data pipeline! +2026-06-24 16:10:30 | INFO | Pipeline Run Date: 2026-06-23 +2026-06-24 16:10:30 | INFO | Connecting to databases... +2026-06-24 16:10:31 | INFO | +2026-06-24 16:10:32 | INFO | +2026-06-24 16:10:32 | INFO | Database connections established +2026-06-24 16:10:32 | INFO | Collecting MIDs for: 2026-06-23 +2026-06-24 16:10:33 | INFO | Found 863 MIDs +2026-06-24 16:10:33 | WARNING | Pipeline failed. Retry 1/3. Error: 'tables' +2026-06-24 16:10:38 | INFO | ================================================================================ +2026-06-24 16:10:38 | INFO | Hello from data-move Python data pipeline! +2026-06-24 16:10:38 | INFO | Pipeline Run Date: 2026-06-23 +2026-06-24 16:10:38 | INFO | Connecting to databases... +2026-06-24 16:10:38 | INFO | +2026-06-24 16:10:39 | INFO | +2026-06-24 16:10:40 | INFO | Database connections established +2026-06-24 16:10:40 | INFO | Collecting MIDs for: 2026-06-23 +2026-06-24 16:10:40 | INFO | Found 863 MIDs +2026-06-24 16:10:40 | WARNING | Pipeline failed. Retry 2/3. Error: 'tables' +2026-06-24 16:10:45 | INFO | ================================================================================ +2026-06-24 16:10:45 | INFO | Hello from data-move Python data pipeline! +2026-06-24 16:10:45 | INFO | Pipeline Run Date: 2026-06-23 +2026-06-24 16:10:45 | INFO | Connecting to databases... +2026-06-24 16:10:45 | INFO | +2026-06-24 16:10:46 | INFO | +2026-06-24 16:10:46 | INFO | Database connections established +2026-06-24 16:10:46 | INFO | Collecting MIDs for: 2026-06-23 +2026-06-24 16:10:47 | INFO | Found 863 MIDs +2026-06-24 16:12:02 | INFO | Pipeline Start Date: 2026-06-23 +2026-06-24 16:12:02 | INFO | ================================================================================ +2026-06-24 16:12:02 | INFO | Hello from data-move Python data pipeline! +2026-06-24 16:12:02 | INFO | Pipeline Run Date: 2026-06-23 +2026-06-24 16:12:02 | INFO | Connecting to databases... +2026-06-24 16:12:02 | INFO | +2026-06-24 16:12:03 | INFO | +2026-06-24 16:12:04 | INFO | Database connections established +2026-06-24 16:12:04 | INFO | Collecting MIDs for: 2026-06-23 +2026-06-24 16:12:04 | INFO | Found 863 MIDs +2026-06-24 16:12:05 | INFO | Pipeline Run Date: 2026-06-23 +2026-06-24 16:12:05 | INFO | ================================================================================ +2026-06-24 16:12:05 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT +2026-06-24 16:12:05 | INFO | Fetching Data from sql server for table-: OQaD .............. +2026-06-24 16:12:05 | INFO | Exists: True +2026-06-24 16:12:05 | INFO | Path: D:\z\airflow-project\src\sql\fact\OQaD.sql +2026-06-24 16:12:05 | INFO | Fetching data for 863 EMPIDs +2026-06-24 16:12:05 | INFO | Fetching OQaD data for run_date=2026-06-23 +2026-06-24 16:12:11 | INFO | fn name is fetch_OQad ------Fetched 458 rows +2026-06-24 16:12:11 | INFO | Fetched total row -: 458 from sql server for table-:OQaD ...........!!! +2026-06-24 16:12:11 | INFO | Fetched 458 rows +2026-06-24 16:12:11 | INFO | _ _ _ _ Deleting Data from ClickHouse for OQaD _ _ _ _ +2026-06-24 16:12:11 | INFO | _ _ _ _Inserting data into clickhouse db from sql server_ _ _ _ +2026-06-24 16:12:11 | ERROR | Failed processing table OQaD +Traceback (most recent call last): + File "D:\z\airflow-project\elt.py", line 243, in elt + load_to_clickhouse( + ~~~~~~~~~~~~~~~~~~^ + client=client, + ^^^^^^^^^^^^^^ + table_name=table_name, + ^^^^^^^^^^^^^^^^^^^^^^ + df=df, + ^^^^^^ + ) + ^ + File "D:\z\airflow-project\clickhouse_task\load_table.py", line 28, in load_to_clickhouse + chunk_df = df.iloc[start:end] + ^^^^^^^ +AttributeError: 'DataFrame' object has no attribute 'iloc' +2026-06-24 16:12:11 | WARNING | Pipeline failed. Retry 1/3. Error: 'DataFrame' object has no attribute 'iloc' +2026-06-24 16:12:16 | INFO | ================================================================================ +2026-06-24 16:12:16 | INFO | Hello from data-move Python data pipeline! +2026-06-24 16:12:16 | INFO | Pipeline Run Date: 2026-06-23 +2026-06-24 16:12:16 | INFO | Connecting to databases... +2026-06-24 16:12:17 | INFO | +2026-06-24 16:12:17 | INFO | +2026-06-24 16:12:18 | INFO | Database connections established +2026-06-24 16:12:18 | INFO | Collecting MIDs for: 2026-06-23 +2026-06-24 16:12:18 | INFO | Found 863 MIDs +2026-06-24 16:12:18 | INFO | Pipeline Run Date: 2026-06-23 +2026-06-24 16:12:18 | INFO | ================================================================================ +2026-06-24 16:12:18 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT +2026-06-24 16:12:18 | INFO | Fetching Data from sql server for table-: OQaD .............. +2026-06-24 16:12:18 | INFO | Exists: True +2026-06-24 16:12:18 | INFO | Path: D:\z\airflow-project\src\sql\fact\OQaD.sql +2026-06-24 16:12:18 | INFO | Fetching data for 863 EMPIDs +2026-06-24 16:12:18 | INFO | Fetching OQaD data for run_date=2026-06-23 +2026-06-24 16:12:24 | INFO | fn name is fetch_OQad ------Fetched 458 rows +2026-06-24 16:12:24 | INFO | Fetched total row -: 458 from sql server for table-:OQaD ...........!!! +2026-06-24 16:12:24 | INFO | Fetched 458 rows +2026-06-24 16:12:24 | INFO | _ _ _ _ Deleting Data from ClickHouse for OQaD _ _ _ _ +2026-06-24 16:12:24 | INFO | _ _ _ _Inserting data into clickhouse db from sql server_ _ _ _ +2026-06-24 16:12:24 | ERROR | Failed processing table OQaD +Traceback (most recent call last): + File "D:\z\airflow-project\elt.py", line 243, in elt + load_to_clickhouse( + ~~~~~~~~~~~~~~~~~~^ + client=client, + ^^^^^^^^^^^^^^ + table_name=table_name, + ^^^^^^^^^^^^^^^^^^^^^^ + df=df, + ^^^^^^ + ) + ^ + File "D:\z\airflow-project\clickhouse_task\load_table.py", line 28, in load_to_clickhouse + chunk_df = df.iloc[start:end] + ^^^^^^^ +AttributeError: 'DataFrame' object has no attribute 'iloc' +2026-06-24 16:12:24 | WARNING | Pipeline failed. Retry 2/3. Error: 'DataFrame' object has no attribute 'iloc' +2026-06-24 16:12:29 | INFO | ================================================================================ +2026-06-24 16:12:29 | INFO | Hello from data-move Python data pipeline! +2026-06-24 16:12:29 | INFO | Pipeline Run Date: 2026-06-23 +2026-06-24 16:12:29 | INFO | Connecting to databases... +2026-06-24 16:12:30 | INFO | +2026-06-24 16:12:30 | INFO | +2026-06-24 16:12:31 | INFO | Database connections established +2026-06-24 16:12:31 | INFO | Collecting MIDs for: 2026-06-23 +2026-06-24 16:12:31 | INFO | Found 863 MIDs +2026-06-24 16:12:31 | INFO | Pipeline Run Date: 2026-06-23 +2026-06-24 16:12:31 | INFO | ================================================================================ +2026-06-24 16:12:31 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT +2026-06-24 16:12:31 | INFO | Fetching Data from sql server for table-: OQaD .............. +2026-06-24 16:12:31 | INFO | Exists: True +2026-06-24 16:12:31 | INFO | Path: D:\z\airflow-project\src\sql\fact\OQaD.sql +2026-06-24 16:12:31 | INFO | Fetching data for 863 EMPIDs +2026-06-24 16:12:31 | INFO | Fetching OQaD data for run_date=2026-06-23 +2026-06-24 16:12:43 | INFO | fn name is fetch_OQad ------Fetched 458 rows +2026-06-24 16:12:43 | INFO | Fetched total row -: 458 from sql server for table-:OQaD ...........!!! +2026-06-24 16:12:43 | INFO | Fetched 458 rows +2026-06-24 16:12:43 | INFO | _ _ _ _ Deleting Data from ClickHouse for OQaD _ _ _ _ +2026-06-24 16:12:43 | INFO | _ _ _ _Inserting data into clickhouse db from sql server_ _ _ _ +2026-06-24 16:12:43 | ERROR | Failed processing table OQaD +Traceback (most recent call last): + File "D:\z\airflow-project\elt.py", line 243, in elt + load_to_clickhouse( + ~~~~~~~~~~~~~~~~~~^ + client=client, + ^^^^^^^^^^^^^^ + table_name=table_name, + ^^^^^^^^^^^^^^^^^^^^^^ + df=df, + ^^^^^^ + ) + ^ + File "D:\z\airflow-project\clickhouse_task\load_table.py", line 28, in load_to_clickhouse + chunk_df = df.iloc[start:end] + ^^^^^^^ +AttributeError: 'DataFrame' object has no attribute 'iloc' +2026-06-24 16:17:01 | INFO | Pipeline Start Date: 2026-06-23 +2026-06-24 16:17:01 | INFO | ================================================================================ +2026-06-24 16:17:01 | INFO | Hello from data-move Python data pipeline! +2026-06-24 16:17:01 | INFO | Pipeline Run Date: 2026-06-23 +2026-06-24 16:17:01 | INFO | Connecting to databases... +2026-06-24 16:17:02 | INFO | +2026-06-24 16:17:03 | INFO | +2026-06-24 16:17:04 | INFO | Database connections established +2026-06-24 16:17:04 | INFO | Collecting MIDs for: 2026-06-23 +2026-06-24 16:17:04 | INFO | Found 863 MIDs +2026-06-24 16:17:04 | INFO | Pipeline Run Date: 2026-06-23 +2026-06-24 16:17:04 | INFO | ================================================================================ +2026-06-24 16:17:04 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT +2026-06-24 16:17:04 | INFO | Fetching Data from sql server for table-: OQaD .............. +2026-06-24 16:17:04 | INFO | Exists: True +2026-06-24 16:17:04 | INFO | Path: D:\z\airflow-project\src\sql\fact\OQaD.sql +2026-06-24 16:17:04 | INFO | Fetching data for 863 EMPIDs +2026-06-24 16:17:04 | INFO | Fetching OQaD data for run_date=2026-06-23 +2026-06-24 16:17:10 | INFO | fn name is fetch_OQad ------Fetched 458 rows +2026-06-24 16:17:10 | INFO | Fetched total row -: 458 from sql server for table-:OQaD ...........!!! +2026-06-24 16:17:10 | INFO | Fetched 458 rows +2026-06-24 16:17:10 | INFO | _ _ _ _ Deleting Data from ClickHouse for OQaD _ _ _ _ +2026-06-24 16:17:11 | INFO | _ _ _ _Inserting data into clickhouse db from sql server_ _ _ _ +2026-06-24 16:17:11 | INFO | Inserted rows 0 to 458 +2026-06-24 16:17:11 | INFO | OQaD: inserted 458 rows into ClickHouse +2026-06-24 16:17:11 | INFO | OQaD loaded successfully (458 rows) +2026-06-24 16:17:11 | INFO | ================================================================================ +2026-06-24 16:17:11 | INFO | Pipeline Completed Successfully +2026-06-24 16:17:11 | INFO | ================================================================================ +2026-06-24 16:17:11 | INFO | Pipeline completed successfully. pipeline_trigeered_on_date=2026-06-24last_successful_run_date=2026-06-23 +2026-06-24 16:17:31 | INFO | Pipeline Start Date: 2026-06-23 +2026-06-24 16:17:31 | INFO | ================================================================================ +2026-06-24 16:17:31 | INFO | Hello from data-move Python data pipeline! +2026-06-24 16:17:31 | INFO | Pipeline Run Date: 2026-06-23 +2026-06-24 16:17:31 | INFO | Connecting to databases... +2026-06-24 16:17:31 | INFO | +2026-06-24 16:17:33 | INFO | +2026-06-24 16:17:33 | INFO | Database connections established +2026-06-24 16:17:33 | INFO | Collecting MIDs for: 2026-06-23 +2026-06-24 16:17:34 | INFO | Found 863 MIDs +2026-06-24 16:17:34 | INFO | Pipeline Run Date: 2026-06-23 +2026-06-24 16:17:34 | INFO | ================================================================================ +2026-06-24 16:17:34 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT +2026-06-24 16:17:34 | INFO | Fetching Data from sql server for table-: OQaD .............. +2026-06-24 16:17:34 | INFO | Exists: True +2026-06-24 16:17:34 | INFO | Path: D:\z\airflow-project\src\sql\fact\OQaD.sql +2026-06-24 16:17:34 | INFO | Fetching data for 863 EMPIDs +2026-06-24 16:17:34 | INFO | Fetching OQaD data for run_date=2026-06-23 +2026-06-24 16:17:40 | INFO | fn name is fetch_OQad ------Fetched 458 rows +2026-06-24 16:17:40 | INFO | Fetched total row -: 458 from sql server for table-:OQaD ...........!!! +2026-06-24 16:17:40 | INFO | Fetched 458 rows +2026-06-24 16:17:40 | INFO | _ _ _ _ Deleting Data from ClickHouse for OQaD _ _ _ _ +2026-06-24 16:17:41 | INFO | _ _ _ _Inserting data into clickhouse db from sql server_ _ _ _ +2026-06-24 16:17:41 | INFO | Inserted rows 0 to 458 +2026-06-24 16:17:41 | INFO | OQaD: inserted 458 rows into ClickHouse +2026-06-24 16:17:41 | INFO | OQaD loaded successfully (458 rows) +2026-06-24 16:17:41 | INFO | ================================================================================ +2026-06-24 16:17:41 | INFO | Pipeline Completed Successfully +2026-06-24 16:17:41 | INFO | ================================================================================ +2026-06-24 16:17:41 | INFO | Pipeline completed successfully. pipeline_trigeered_on_date=2026-06-24last_successful_run_date=2026-06-23 diff --git a/logs/etl_20260625.log b/logs/etl_20260625.log new file mode 100644 index 0000000..22ba959 --- /dev/null +++ b/logs/etl_20260625.log @@ -0,0 +1,90 @@ +2026-06-25 10:23:52 | INFO | Pipeline Start Date: 2026-06-24 +2026-06-25 10:23:52 | INFO | ================================================================================ +2026-06-25 10:23:52 | INFO | Hello from data-move Python data pipeline! +2026-06-25 10:23:52 | INFO | Pipeline Run Date: 2026-06-24 +2026-06-25 10:23:52 | INFO | Connecting to databases... +2026-06-25 10:23:54 | INFO | +2026-06-25 10:23:57 | INFO | +2026-06-25 10:23:58 | INFO | Database connections established +2026-06-25 10:23:58 | INFO | Collecting MIDs for: 2026-06-24 +2026-06-25 10:23:58 | INFO | Found 868 MIDs +2026-06-25 10:23:59 | INFO | Pipeline Run Date: 2026-06-24 +2026-06-25 10:23:59 | INFO | ================================================================================ +2026-06-25 10:23:59 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT +2026-06-25 10:23:59 | INFO | Fetching Data from sql server for table-: OQaD .............. +2026-06-25 10:23:59 | ERROR | Failed processing table OQaD +Traceback (most recent call last): + File "D:\z\airflow-project\elt2.py", line 172, in elt + df=registry[f"fetch_{table_name}"]()(sql_engine, table_name , table_type, mids, run_date) + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^ +TypeError: fetch_OQaD() missing 5 required positional arguments: 'engine', 'table_name', 'table_type', 'empids', and 'run_date' +2026-06-25 10:23:59 | WARNING | Pipeline failed. Retry 1/3. Error: fetch_OQaD() missing 5 required positional arguments: 'engine', 'table_name', 'table_type', 'empids', and 'run_date' +2026-06-25 10:24:04 | INFO | ================================================================================ +2026-06-25 10:24:04 | INFO | Hello from data-move Python data pipeline! +2026-06-25 10:24:04 | INFO | Pipeline Run Date: 2026-06-24 +2026-06-25 10:24:04 | INFO | Connecting to databases... +2026-06-25 10:24:05 | INFO | +2026-06-25 10:24:05 | INFO | +2026-06-25 10:24:06 | INFO | Database connections established +2026-06-25 10:24:06 | INFO | Collecting MIDs for: 2026-06-24 +2026-06-25 10:24:06 | INFO | Found 868 MIDs +2026-06-25 10:24:06 | INFO | Pipeline Run Date: 2026-06-24 +2026-06-25 10:24:06 | INFO | ================================================================================ +2026-06-25 10:24:06 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT +2026-06-25 10:24:06 | INFO | Fetching Data from sql server for table-: OQaD .............. +2026-06-25 10:24:06 | ERROR | Failed processing table OQaD +Traceback (most recent call last): + File "D:\z\airflow-project\elt2.py", line 172, in elt + df=registry[f"fetch_{table_name}"]()(sql_engine, table_name , table_type, mids, run_date) + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^ +TypeError: fetch_OQaD() missing 5 required positional arguments: 'engine', 'table_name', 'table_type', 'empids', and 'run_date' +2026-06-25 10:24:06 | WARNING | Pipeline failed. Retry 2/3. Error: fetch_OQaD() missing 5 required positional arguments: 'engine', 'table_name', 'table_type', 'empids', and 'run_date' +2026-06-25 10:24:11 | INFO | ================================================================================ +2026-06-25 10:24:11 | INFO | Hello from data-move Python data pipeline! +2026-06-25 10:24:11 | INFO | Pipeline Run Date: 2026-06-24 +2026-06-25 10:24:11 | INFO | Connecting to databases... +2026-06-25 10:24:12 | INFO | +2026-06-25 10:24:12 | INFO | +2026-06-25 10:24:12 | INFO | Database connections established +2026-06-25 10:24:12 | INFO | Collecting MIDs for: 2026-06-24 +2026-06-25 10:24:13 | INFO | Found 868 MIDs +2026-06-25 10:24:13 | INFO | Pipeline Run Date: 2026-06-24 +2026-06-25 10:24:13 | INFO | ================================================================================ +2026-06-25 10:24:13 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT +2026-06-25 10:24:13 | INFO | Fetching Data from sql server for table-: OQaD .............. +2026-06-25 10:24:13 | ERROR | Failed processing table OQaD +Traceback (most recent call last): + File "D:\z\airflow-project\elt2.py", line 172, in elt + df=registry[f"fetch_{table_name}"]()(sql_engine, table_name , table_type, mids, run_date) + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^ +TypeError: fetch_OQaD() missing 5 required positional arguments: 'engine', 'table_name', 'table_type', 'empids', and 'run_date' +2026-06-25 11:19:10 | INFO | Pipeline Start Date: 2026-06-24 +2026-06-25 11:19:10 | INFO | ================================================================================ +2026-06-25 11:19:10 | INFO | Hello from data-move Python data pipeline! +2026-06-25 11:19:10 | INFO | Pipeline Run Date: 2026-06-24 +2026-06-25 11:19:10 | INFO | Connecting to databases... +2026-06-25 11:19:11 | INFO | +2026-06-25 11:19:13 | INFO | +2026-06-25 11:19:14 | INFO | Database connections established +2026-06-25 11:19:14 | INFO | Collecting MIDs for: 2026-06-24 +2026-06-25 11:19:14 | INFO | Found 868 MIDs +2026-06-25 11:19:15 | INFO | Pipeline Run Date: 2026-06-24 +2026-06-25 11:19:15 | INFO | ================================================================================ +2026-06-25 11:19:15 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT +2026-06-25 11:19:15 | INFO | Fetching Data from sql server for table-: OQaD .............. +2026-06-25 11:19:15 | INFO | Exists: True +2026-06-25 11:19:15 | INFO | Path: D:\z\airflow-project\src\sql\fact\OQaD.sql +2026-06-25 11:19:15 | INFO | Fetching data for 868 EMPIDs +2026-06-25 11:19:15 | INFO | Fetching OQaD data for run_date=2026-06-24 +2026-06-25 11:19:21 | INFO | fn name is fetch_OQad ------Fetched 469 rows +2026-06-25 11:19:21 | INFO | Fetched total row -: 469 from sql server for table-:OQaD ...........!!! +2026-06-25 11:19:21 | INFO | Fetched 469 rows +2026-06-25 11:19:21 | INFO | _ _ _ _ Deleting Data from ClickHouse for OQaD _ _ _ _ +2026-06-25 11:19:21 | INFO | _ _ _ _Inserting data into clickhouse db from sql server_ _ _ _ +2026-06-25 11:19:21 | INFO | Inserted rows 0 to 469 +2026-06-25 11:19:21 | INFO | OQaD: inserted 469 rows into ClickHouse +2026-06-25 11:19:21 | INFO | OQaD loaded successfully (469 rows) +2026-06-25 11:19:21 | INFO | ================================================================================ +2026-06-25 11:19:21 | INFO | Pipeline Completed Successfully +2026-06-25 11:19:21 | INFO | ================================================================================ +2026-06-25 11:19:21 | INFO | Pipeline completed successfully. pipeline_trigeered_on_date=2026-06-25last_successful_run_date=2026-06-24 diff --git a/main2.py b/main2.py index 57b5c0b..ddebd6f 100644 --- a/main2.py +++ b/main2.py @@ -46,113 +46,13 @@ from mids import ( from src.bridge import * from src.fact import * from src.dim import * - - -# ========================================================== -# Helpers -# ========================================================== - -def get_dates_from_yaml(filename: str): - with open(filename, "r") as file: - data = yaml.safe_load(file) - - start_date = date.fromisoformat( - str(data["pipeline"]["start_date"]) - ) - - end_date = date.fromisoformat( - str(data["pipeline"]["end_date"]) - ) - flag=str(data["pipeline"]["flag"]) - - return start_date, end_date , flag - - -def write_table_to_yaml( - data: dict, - run_date: date, - filename: str | None = None -): - """Write table data to a YAML file.""" - - if filename is None: - filename = f"elt_pipeline_{run_date}.yml" - - with open(filename, "w") as file: - yaml.dump( - data, - file, - default_flow_style=False, - sort_keys=False - ) - - print(f"Table written to {filename}") +from elt import * -def table_exists( - client, - table_name: str, -) -> bool: - - return bool( - client.command( - f"EXISTS TABLE {table_name}" - ) - ) - - -# ========================================================== -# Main -# ========================================================== - -def elt(run_date : date): - - log.info("=" * 80) - log.info("Hello from data-move Python data pipeline!") - - # ------------------------------------------------------ - # Run Date - # ------------------------------------------------------ - - - - log.info( - "Pipeline Run Date: %s", - run_date, - ) - - # ------------------------------------------------------ - # Connections - # ------------------------------------------------------ - - log.info( - "Connecting to databases..." - ) - - sql_engine = build_sql_server_engine() - clickhouse_engine = build_clickhouse_engine() - client = get_clickhouse_client() - - log.info( - "Database connections established" - ) - - # ------------------------------------------------------ - # mids Keys - # ------------------------------------------------------ - - mids = MID_TABLE_COV( - sql_engine, - run_date, - ) - - emp_visit_df = MID_TABLE_COV1( - sql_engine, - run_date, - ) +def main() : # ------------------------------------------------------ # Config @@ -163,128 +63,8 @@ def elt(run_date : date): "r", ) as file: - config = yaml.safe_load(file) + table_config = yaml.safe_load(file) - # ------------------------------------------------------ - # Process Tables - # ------------------------------------------------------ - for table in config["tables"]: - table_name = table["name"] - operation = table["operation"] - fetch_by = table["fetch_by"] - table_type=table["type"] - - log.info("=" * 80) - log.info(f"Processing Table-:{table_name} | Table type -:{table_type} | fetcht by-:{fetch_by} | operation-:{operation}" ) - - try: - - # ------------------------------------------ - # Fetch Data - # ------------------------------------------ - - - log.info(f"Fetching Data from sql server for table-: {table_name} ..............") - fetch_list=["mids" ,"run_date", "reason_id"] - if fetch_by in fetch_list : - fn_name = f"fetch_{table_name}" - fn = globals()[fn_name] - df=fn(sql_engine, table_name , table_type, mids, run_date) - else: - df = fetch_data(sql_engine ,table_name,table_type) - - log.info(f"Fetched total row -: {len(df)} from sql server for table-:{table_name} ...........!!!") - - if df.is_empty(): - - log.warning( - "%s returned no rows", - table_name, - ) - - continue - - log.info( - "Fetched %s rows", - len(df), - ) - - # ------------------------------------------ - # Create Table If Missing - # ------------------------------------------ - - exists = table_exists( - client, - table_name, - ) - - if not exists: - - log.info( - "Creating table %s", - table_name, - ) - - create_clickhouse_table( - df=df, - table_name=table_name, - clickhouse_engine=clickhouse_engine, - ) - - # ------------------------------------------ - # Existing Table Logic - # ------------------------------------------ - - else: - - if operation == "DELETE+INSERT": - - truncate_table( - client, - table_name, - ) - - else: - - delete_existing_data( - client=client, - table_name=table_name, - run_date=run_date, - mids=mids, - emp_visit_df=emp_visit_df, - ) - - # ------------------------------------------ - # Load Data - # ------------------------------------------ - log.info("_ _ _ _Inserting data into clickhouse db from sql server_ _ _ _") - load_to_clickhouse( - client=client, - table_name=table_name, - df=df, - ) - - log.info( - "%s loaded successfully (%s rows)", - table_name, - len(df), - ) - - except Exception: - - log.exception( - "Failed processing table %s", - table_name, - ) - - raise - - log.info("=" * 80) - log.info("Pipeline Completed Successfully") - log.info("=" * 80) - - -def main() : config_file = Path("Pipeline_config.yml") @@ -336,7 +116,7 @@ def main() : for attempt in range(3): try: - elt(run_date) + elt(run_date , table_config) successful_dates.append({ 'pipeline_trigeered_on_date': str(date.today()), @@ -370,7 +150,7 @@ def main() : ) sleep(5) - + start_date=start_date + timedelta(days=1) @@ -392,7 +172,8 @@ def main() : with open(filename_failed, "w") as f: yaml.dump(failed_dates, f, default_flow_style=False, - sort_keys=False) + sort_keys=False) + if __name__ == "__main__": diff --git a/main3.py b/main3.py new file mode 100644 index 0000000..3f6e79c --- /dev/null +++ b/main3.py @@ -0,0 +1,181 @@ +# /// script +# requires-python = ">=3.11" +# dependencies = [ +# "polars>=0.20.0", +# "pyarrow>=18.0.0", +# "sqlalchemy>=2.0.0", +# "pyodbc>=5.0.0", +# "clickhouse-connect>=0.7.0", +# "clickhouse-sqlalchemy>=0.3.2", +# "pyyaml>=6.0.3", +# "python-dotenv>=1.0.0", +# ] +# /// + + +from __future__ import annotations +from time import sleep +import sys +from datetime import date, datetime, timedelta + +import polars as pl +import yaml + + +from log import log + +from clickhouse_task.create_table import create_clickhouse_table +from clickhouse_task.delete_task import ( + delete_existing_data, + truncate_table, +) + +from clickhouse_task.load_table import load_to_clickhouse + +from db_con.connection import ( + build_sql_server_engine, + build_clickhouse_engine, + get_clickhouse_client, +) + +from mids import ( + MID_TABLE_COV, + MID_TABLE_COV1, +) + +from src.bridge import * +from src.fact_updated import * +from src.dim import * +from elt2 import * + + + + + +def main() : + + # ------------------------------------------------------ + # Config + # ------------------------------------------------------ + + with open( + "y.yml", + "r", + ) as file: + + table_config = yaml.safe_load(file) + + + config_file = Path("Pipeline_config.yml") + + if not config_file.exists(): + default_config = { + "pipeline": { + "run_date": None, + "status": None, + "error_message": None, + } + } + + with open(config_file, "w") as f: + yaml.safe_dump(default_config, f) + + with open(config_file, "r") as f: + config = yaml.safe_load(f) + + + + p_start_date, p_end_date , flag= get_dates_from_yaml("elt_pipeline_custom_dates.yml") + if flag =="Y" : + start_date=p_start_date + end_date=p_end_date + + elif len(sys.argv) > 1: + start_date = datetime.strptime( + sys.argv[1], + "%Y-%m-%d", + ).date() + end_date=start_date + timedelta(days=1) + else: + start_date = date.today() - timedelta(days=1) + end_date=start_date + + log.info( + "Pipeline Start Date: %s", + start_date, + ) + + + failed_dates=[] + successful_dates=[] + filename_successful = "successful_Pipeline_dates_config.yml" + filename_failed = "failed_Pipeline_dates_config.yml" + + while start_date <=end_date: + run_date = start_date + + for attempt in range(3): + try: + elt(run_date , table_config) + + successful_dates.append({ + 'pipeline_trigeered_on_date': str(date.today()), + 'last_successful_run_date': run_date, + }) + + log.info( + f"Pipeline completed successfully. " + f"pipeline_trigeered_on_date={date.today()}" + f"last_successful_run_date={run_date}" + ) + + break + + except Exception as e: + + + + failed_dates.append({ + 'pipeline_trigeered_on_date': str(date.today()), + 'failed_run_date': run_date, + "attempt" : attempt + }) + + + if attempt == 2: + raise + + log.warning( + f"Pipeline failed. Retry {attempt + 1}/3. Error: {e}" + ) + + sleep(5) + + + start_date=start_date + timedelta(days=1) + + + + with open(filename_successful, "w") as f: + yaml.dump( + successful_dates, + f, + default_flow_style=False, + sort_keys=False, + ) + if len(failed_dates) == 0 : + failed_dates.append({ + 'pipeline_trigeered_on_date': str(date.today()), + 'failed_run_date': "none", + "attempt" : "none" + }) + with open(filename_failed, "w") as f: + yaml.dump(failed_dates, + f, default_flow_style=False, + sort_keys=False) + + + +if __name__ == "__main__": + + main() \ No newline at end of file diff --git a/src/fact.py b/src/fact.py index d875571..2c3a583 100644 --- a/src/fact.py +++ b/src/fact.py @@ -135,175 +135,6 @@ def fetch_additional_visibility( engine: Engine, -def fetch_OQaD( - sql_engine: Engine, - table_name: str, - table_type: str, - mids: list[int], - run_date: date -) -> pl.DataFrame: - - # ───────────────────────────────────────────── - # INNER HELPERS (defined once, used below) - # ───────────────────────────────────────────── - - client = get_clickhouse_client() - - # ── Does a ClickHouse table exist? ──────────── - def table_exists(client, table_name: str) -> bool: - - return bool(client.command(f"EXISTS TABLE {table_name}")) - - # ── STEP 1: Who submitted yesterday in SQL Server? ─── - def fetch_quiz_empids(engine: Engine, run_date: date) -> pl.DataFrame: - - - # Format date ONCE safely — avoids f-string injection bugs - run_date_str = run_date.strftime("%Y-%m-%d") - next_date_str = (run_date + timedelta(days=1)).strftime("%Y-%m-%d") - - sql = f""" - WITH MID_TABLE_COV1 AS - ( - -- Records CREATED yesterday - SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate - FROM OneApp_KelloggsMT.dbo.T_OQAD - WHERE CreateDate >= '{run_date_str}' - AND CreateDate < '{next_date_str}' - - UNION ALL - - -- Records UPDATED yesterday (different rows, safe to UNION ALL) - SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate - FROM OneApp_KelloggsMT.dbo.T_OQAD - WHERE UpdateDate >= '{run_date_str}' - AND UpdateDate < '{next_date_str}' - ), - QUIZ AS - ( - SELECT DISTINCT - E.EmpId AS empid, - CAST(DQ.VisitDate AS DATE) AS visitdate - FROM OneApp_KelloggsMT.dbo.T_OQAD DQ - INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail E - ON DQ.EmpId = E.EmpId - INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Question QU - ON DQ.QuestionId = QU.QuestionId - INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Category QC - ON QU.QuestionCategoryId = QC.QuestionCategoryId - WHERE E.EmpName NOT LIKE '%TEST%' -- exclude test employees - AND E.RightId = 6 -- only field reps - AND ( - E.ResignDate IS NULL - OR CAST(E.ResignDate AS DATE) >= '{run_date_str}' - ) - AND EXISTS ( -- ✅ EXISTS beats IN for large sets - SELECT 1 - FROM MID_TABLE_COV1 A - WHERE A.EmpId = DQ.EmpId - AND A.VisitDate = CAST(DQ.VisitDate AS DATE) - ) - ) - SELECT * FROM QUIZ - """ - - log.info("Fetching quiz empids for run_date=%s", run_date_str) - df = pl.read_database(query=sql, connection=engine) - log.info("Fetched %s (EmpId, VisitDate) pairs from SQL Server", len(df)) - return df - - # ── STEP 2: Who do we ALREADY have in ClickHouse? ─── - def get_empids_clickhouse_OQAD( - client, - table_name: str = "OQaD", - ) -> pl.DataFrame: - - - if not table_exists(client, table_name): - log.warning("Table '%s' does not exist in ClickHouse.", table_name) - return pl.DataFrame(schema={"empid": pl.Int64, "visitdate": pl.Date}) - - query = f""" - SELECT DISTINCT - employee_id AS empid, - visit_date AS visitdate - FROM {table_name} - """ - - arrow_table = client.query_arrow(query) - df = pl.from_arrow(arrow_table) - log.info("Fetched %s existing (EmpId, VisitDate) pairs from ClickHouse", len(df)) - return df - - # ── STEP 3: Who is NEW? (in SQL Server but NOT yet in ClickHouse) ─── - def find_new_empids( - sql_df: pl.DataFrame, - ch_df: pl.DataFrame, - ) -> list[int]: - - - new_df = sql_df.join( - ch_df, - on=["empid", "visitdate"], - how="anti", # ✅ anti = keep rows NOT found in ch_df - ) - - if new_df.is_empty(): - log.warning("No new EmpIds found for table=%s — nothing to fetch.", table_name) - return [0] # sentinel value — the .sql WHERE will return 0 rows safely - - empids = new_df["empid"].unique().to_list() - log.info("Found %s NEW empids to fetch for %s", len(empids), table_name) - return empids - - # ── STEP 4: Fetch full quiz data for new empids ─── - def fetch_data( - engine: Engine, - table_name: str, - table_type: str, - empids: list[int], - run_date: date, - ) -> pl.DataFrame: - - - run_date_str = run_date.strftime("%Y-%m-%d") - empid_list = ", ".join(str(e) for e in empids) # "101, 102, 103" - - sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql" - log.info("Loading SQL from: %s (exists=%s)", sql_file.resolve(), sql_file.exists()) - - with open(sql_file, "r", encoding="utf-8") as f: - sql_template = f.read() - - sql = sql_template.format( - empid_list=empid_list, - run_date=run_date_str, - ) - - log.info("Fetching full OQaD data for %s empids, run_date=%s", len(empids), run_date_str) - df = pl.read_database(query=sql, connection=engine) - log.info("Fetched %s rows from SQL Server for table=%s", len(df), table_name) - return df - - # ───────────────────────────────────────────── - # MAIN FLOW (the 4 steps, clearly sequenced) - # ───────────────────────────────────────────── - - qf = fetch_quiz_empids(sql_engine, run_date) # Step 1 - db_df = get_empids_clickhouse_OQAD(client, table_name) # Step 2 - empids = find_new_empids(qf, db_df) # Step 3 - - df = fetch_data( # Step 4 - engine=sql_engine, - table_name=table_name, - table_type=table_type, - empids=empids, - run_date=run_date, - ) - - log.info("fetch_OQaD complete — returning %s rows", len(df)) - return df - # def fetch_OQaD( # sql_engine: Engine, # table_name: str, @@ -312,204 +143,373 @@ def fetch_OQaD( # run_date: date # ) -> pl.DataFrame: +# # ───────────────────────────────────────────── +# # INNER HELPERS (defined once, used below) +# # ───────────────────────────────────────────── -# client= get_clickhouse_client() -# def table_exists( -# client, -# table_name: str, -# ) -> bool: +# client = get_clickhouse_client() -# return bool( -# client.command( -# f"EXISTS TABLE {table_name}" -# ) -# ) +# # ── Does a ClickHouse table exist? ──────────── +# def table_exists(client, table_name: str) -> bool: + +# return bool(client.command(f"EXISTS TABLE {table_name}")) + +# # ── STEP 1: Who submitted yesterday in SQL Server? ─── +# def fetch_quiz_empids(engine: Engine, run_date: date) -> pl.DataFrame: + + +# # Format date ONCE safely — avoids f-string injection bugs +# run_date_str = run_date.strftime("%Y-%m-%d") +# next_date_str = (run_date + timedelta(days=1)).strftime("%Y-%m-%d") + +# sql = f""" +# WITH MID_TABLE_COV1 AS +# ( +# -- Records CREATED yesterday +# SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate +# FROM OneApp_KelloggsMT.dbo.T_OQAD +# WHERE CreateDate >= '{run_date_str}' +# AND CreateDate < '{next_date_str}' + +# UNION ALL + +# -- Records UPDATED yesterday (different rows, safe to UNION ALL) +# SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate +# FROM OneApp_KelloggsMT.dbo.T_OQAD +# WHERE UpdateDate >= '{run_date_str}' +# AND UpdateDate < '{next_date_str}' +# ), +# QUIZ AS +# ( +# SELECT DISTINCT +# E.EmpId AS empid, +# CAST(DQ.VisitDate AS DATE) AS visitdate +# FROM OneApp_KelloggsMT.dbo.T_OQAD DQ +# INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail E +# ON DQ.EmpId = E.EmpId +# INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Question QU +# ON DQ.QuestionId = QU.QuestionId +# INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Category QC +# ON QU.QuestionCategoryId = QC.QuestionCategoryId +# WHERE E.EmpName NOT LIKE '%TEST%' -- exclude test employees +# AND E.RightId = 6 -- only field reps +# AND ( +# E.ResignDate IS NULL +# OR CAST(E.ResignDate AS DATE) >= '{run_date_str}' +# ) +# AND EXISTS ( -- ✅ EXISTS beats IN for large sets +# SELECT 1 +# FROM MID_TABLE_COV1 A +# WHERE A.EmpId = DQ.EmpId +# AND A.VisitDate = CAST(DQ.VisitDate AS DATE) +# ) +# ) +# SELECT * FROM QUIZ +# """ + +# log.info("Fetching quiz empids for run_date=%s", run_date_str) +# df = pl.read_database(query=sql, connection=engine) +# log.info("Fetched %s (EmpId, VisitDate) pairs from SQL Server", len(df)) +# return df + +# # ── STEP 2: Who do we ALREADY have in ClickHouse? ─── +# def get_empids_clickhouse_OQAD( +# client, +# table_name: str = "OQaD", +# ) -> pl.DataFrame: + + +# if not table_exists(client, table_name): +# log.warning("Table '%s' does not exist in ClickHouse.", table_name) +# return pl.DataFrame(schema={"empid": pl.Int64, "visitdate": pl.Date}) + +# query = f""" +# SELECT DISTINCT +# employee_id AS empid, +# visit_date AS visitdate +# FROM {table_name} +# """ + +# arrow_table = client.query_arrow(query) +# df = pl.from_arrow(arrow_table) +# log.info("Fetched %s existing (EmpId, VisitDate) pairs from ClickHouse", len(df)) +# return df + +# # ── STEP 3: Who is NEW? (in SQL Server but NOT yet in ClickHouse) ─── +# def find_new_empids( +# sql_df: pl.DataFrame, +# ch_df: pl.DataFrame, +# ) -> list[int]: + + +# new_df = sql_df.join( +# ch_df, +# on=["empid", "visitdate"], +# how="anti", # ✅ anti = keep rows NOT found in ch_df +# ) + +# if new_df.is_empty(): +# log.warning("No new EmpIds found for table=%s — nothing to fetch.", table_name) +# return [0] # sentinel value — the .sql WHERE will return 0 rows safely + +# empids = new_df["empid"].unique().to_list() +# log.info("Found %s NEW empids to fetch for %s", len(empids), table_name) +# return empids + +# # ── STEP 4: Fetch full quiz data for new empids ─── +# def fetch_data( +# engine: Engine, +# table_name: str, +# table_type: str, +# empids: list[int], +# run_date: date, +# ) -> pl.DataFrame: + + +# run_date_str = run_date.strftime("%Y-%m-%d") +# empid_list = ", ".join(str(e) for e in empids) # "101, 102, 103" + +# sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql" +# log.info("Loading SQL from: %s (exists=%s)", sql_file.resolve(), sql_file.exists()) + +# with open(sql_file, "r", encoding="utf-8") as f: +# sql_template = f.read() + +# sql = sql_template.format( +# empid_list=empid_list, +# run_date=run_date_str, +# ) + +# log.info("Fetching full OQaD data for %s empids, run_date=%s", len(empids), run_date_str) +# df = pl.read_database(query=sql, connection=engine) +# log.info("Fetched %s rows from SQL Server for table=%s", len(df), table_name) +# return df + +# # ───────────────────────────────────────────── +# # MAIN FLOW (the 4 steps, clearly sequenced) +# # ───────────────────────────────────────────── + +# qf = fetch_quiz_empids(sql_engine, run_date) # Step 1 +# db_df = get_empids_clickhouse_OQAD(client, table_name) # Step 2 +# empids = find_new_empids(qf, db_df) # Step 3 + +# df = fetch_data( # Step 4 +# engine=sql_engine, +# table_name=table_name, +# table_type=table_type, +# empids=empids, +# run_date=run_date, +# ) + +# log.info("fetch_OQaD complete — returning %s rows", len(df)) +# return df + +def fetch_OQaD( + sql_engine: Engine, + table_name: str, + table_type: str, + mids: list[int], + run_date: date +) -> pl.DataFrame: + + + client= get_clickhouse_client() + def table_exists( + client, + table_name: str, + ) -> bool: + + return bool( + client.command( + f"EXISTS TABLE {table_name}" + ) + ) -# def fetch_quiz_empids(engine: Engine, run_date : date) -> pl.DataFrame: + def fetch_quiz_empids(engine: Engine, run_date : date) -> pl.DataFrame: -# sql_template = f""" -# WITH MID_TABLE_COV1 AS -# ( -# SELECT EmpId, VisitDate -# FROM OneApp_KelloggsMT.dbo.T_OQAD -# WHERE CreateDate >= {run_date} -# AND CreateDate < DATEADD(DAY,1,'{run_date}') + sql_template = f""" + WITH MID_TABLE_COV1 AS + ( + SELECT EmpId, VisitDate + FROM OneApp_KelloggsMT.dbo.T_OQAD + WHERE CreateDate >= {run_date} + AND CreateDate < DATEADD(DAY,1,'{run_date}') -# UNION + UNION -# SELECT EmpId, VisitDate -# FROM OneApp_KelloggsMT.dbo.T_OQAD -# WHERE UpdateDate >= {run_date} -# AND UpdateDate < DATEADD(DAY,1, '{run_date}') -# ), + SELECT EmpId, VisitDate + FROM OneApp_KelloggsMT.dbo.T_OQAD + WHERE UpdateDate >= {run_date} + AND UpdateDate < DATEADD(DAY,1, '{run_date}') + ), -# QUIZ AS -# ( -# SELECT Distinct E.EmpId as empid -# , CONVERT(date,DQ.VisitDate) AS visitdate -# FROM OneApp_KelloggsMT.dbo.T_OQAD DQ INNER JOIN -# OneApp_KelloggsMT.dbo.vw_Employee_Detail E ON DQ.EmpId = E.EmpId inner join -# OneApp_KelloggsMT.dbo.Master_OQAD_Question QU on DQ.QuestionId= qu.QuestionId inner join -# OneApp_KelloggsMT.dbo.Master_OQAD_Category qc on qu.QuestionCategoryId= qc.QuestionCategoryId -# where e.EmpName not like 'test%' and e.RightId in (6) -# and (E.ResignDate is null or E.ResignDate>=''+CONVERT(VARCHAR,'{run_date}')+'') AND E.EmpName NOT LIKE '%TEST%' -# AND DQ.EmpId IN (SELECT EmpId FROM MID_TABLE_COV1 A WHERE -# DQ.EmpId=A.EmpId AND CONVERT(date,VisitDate)=CONVERT(date,A.VisitDate) ) -# ) select * from quiz -# """ -# sql = sql_template.format( -# run_date=run_date.strftime("%Y-%m-%d") -# ) + QUIZ AS + ( + SELECT Distinct E.EmpId as empid + , CONVERT(date,DQ.VisitDate) AS visitdate + FROM OneApp_KelloggsMT.dbo.T_OQAD DQ INNER JOIN + OneApp_KelloggsMT.dbo.vw_Employee_Detail E ON DQ.EmpId = E.EmpId inner join + OneApp_KelloggsMT.dbo.Master_OQAD_Question QU on DQ.QuestionId= qu.QuestionId inner join + OneApp_KelloggsMT.dbo.Master_OQAD_Category qc on qu.QuestionCategoryId= qc.QuestionCategoryId + where e.EmpName not like 'test%' and e.RightId in (6) + and (E.ResignDate is null or E.ResignDate>=''+CONVERT(VARCHAR,'{run_date}')+'') AND E.EmpName NOT LIKE '%TEST%' + AND DQ.EmpId IN (SELECT EmpId FROM MID_TABLE_COV1 A WHERE + DQ.EmpId=A.EmpId AND CONVERT(date,VisitDate)=CONVERT(date,A.VisitDate) ) + ) select * from quiz + """ + sql = sql_template.format( + run_date=run_date.strftime("%Y-%m-%d") + ) -# log.info(f"Fetching quiz_empids data for EMPID and Visitid") + log.info(f"Fetching quiz_empids data for EMPID and Visitid") -# df = pl.read_database( -# query=sql, -# connection=engine -# ) + df = pl.read_database( + query=sql, + connection=engine + ) -# log.info(f"Fetched {len(df):,} total empid and visitdate fetched for OQAD from SQL Server") + log.info(f"Fetched {len(df):,} total empid and visitdate fetched for OQAD from SQL Server") -# return df + return df -# def get_empids_clickhouse_OQAD( -# client, -# table_name: str = "OQaD", -# ) -> pl.DataFrame: + def get_empids_clickhouse_OQAD( + client, + table_name: str = "OQaD", + ) -> pl.DataFrame: -# if not table_exists(client, table_name): -# log.warning(f"Table '{table_name}' does not exist.") -# return pl.DataFrame( -# schema={ -# "empid": pl.Int64, -# "visitdate": pl.Date, -# } -# ) + if not table_exists(client, table_name): + log.warning(f"Table '{table_name}' does not exist.") + return pl.DataFrame( + schema={ + "empid": pl.Int64, + "visitdate": pl.Date, + } + ) -# query = f""" -# SELECT DISTINCT -# employee_id AS empid, -# visit_date AS visitdate -# FROM {table_name} -# """ + query = f""" + SELECT DISTINCT + employee_id AS empid, + visit_date AS visitdate + FROM {table_name} + """ -# # ClickHouse -> PyArrow -> Polars -# arrow_table = client.query_arrow(query) + # ClickHouse -> PyArrow -> Polars + arrow_table = client.query_arrow(query) -# return pl.from_arrow(arrow_table) + return pl.from_arrow(arrow_table) -# qf=fetch_quiz_empids(sql_engine,run_date) -# db_df = get_empids_clickhouse_OQAD(client) + qf=fetch_quiz_empids(sql_engine,run_date) + db_df = get_empids_clickhouse_OQAD(client) -# matched = qf.join( -# db_df, -# on=["empid", "visitdate"], -# how="inner", -# ) + matched = qf.join( + db_df, + on=["empid", "visitdate"], + how="inner", + ) -# if matched.is_empty(): + if matched.is_empty(): -# empids=[0] -# log.warning( -# "%s Matched df in OQaD returned no rows", -# table_name, -# ) + empids=[0] + log.warning( + "%s Matched df in OQaD returned no rows", + table_name, + ) -# else: -# empids=matched["empid"].to_list() + else: + empids=matched["empid"].to_list() -# log.info(f"Fetched {len(empids):,} matched empids fetched for OQAD ") + log.info(f"Fetched {len(empids):,} matched empids fetched for OQAD ") -# def fetch_data( -# engine: Engine, -# table_name: str, -# table_type: str, -# empids: list[int], -# run_date: date -# ) -> pl.DataFrame: + def fetch_data( + engine: Engine, + table_name: str, + table_type: str, + empids: list[int], + run_date: date + ) -> pl.DataFrame: -# empid_list = ",".join(str(empid) for empid in empids) + empid_list = ",".join(str(empid) for empid in empids) -# sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql" + sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql" -# log.info(f"Exists: {sql_file.exists()}") -# log.info(f"Path: {sql_file.resolve()}") + log.info(f"Exists: {sql_file.exists()}") + log.info(f"Path: {sql_file.resolve()}") -# with open(sql_file, "r", encoding="utf-8") as f: -# sql_template = f.read() + with open(sql_file, "r", encoding="utf-8") as f: + sql_template = f.read() -# sql = sql_template.format( -# empid_list=empid_list, -# run_date=run_date.strftime("%Y-%m-%d") -# ) + sql = sql_template.format( + empid_list=empid_list, + run_date=run_date.strftime("%Y-%m-%d") + ) -# log.info(f"Fetching data for {len(empids):,} EMPIDs") + log.info(f"Fetching data for {len(empids):,} EMPIDs") -# log.info("Fetching OQaD data for run_date=%s", run_date) + log.info("Fetching OQaD data for run_date=%s", run_date) -# df = pl.read_database( -# query=sql, -# connection=engine, -# ) + df = pl.read_database( + query=sql, + connection=engine, + ) -# log.info("fn name is fetch_OQad ------Fetched %s rows", len(df)) + log.info("fn name is fetch_OQad ------Fetched %s rows", len(df)) -# return df -# df=fetch_data( engine=sql_engine, -# table_name=table_name, -# table_type=table_type, -# empids=empids, -# run_date=run_date -# ) -# log.info(f"Fetched {len(df):,} rows from SQL Server") + return df + df=fetch_data( engine=sql_engine, + table_name=table_name, + table_type=table_type, + empids=empids, + run_date=run_date + ) + log.info(f"Fetched {len(df):,} rows from SQL Server") -# return df + return df -# def fetch_OQaD( -# engine: Engine, -# table_name: str, -# table_type: str, -# empids: list[int], -# run_date: date -# ) -> pl.DataFrame: +def fetch_OQaD( + engine: Engine, + table_name: str, + table_type: str, + empids: list[int], + run_date: date +) -> pl.DataFrame: -# empid_list = ",".join(str(empid) for empid in empids) + empid_list = ",".join(str(empid) for empid in empids) -# sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql" + sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql" -# log.info(f"Exists: {sql_file.exists()}") -# log.info(f"Path: {sql_file.resolve()}") + log.info(f"Exists: {sql_file.exists()}") + log.info(f"Path: {sql_file.resolve()}") -# with open(sql_file, "r", encoding="utf-8") as f: -# sql_template = f.read() + with open(sql_file, "r", encoding="utf-8") as f: + sql_template = f.read() -# sql = sql_template.format( -# empid_list=empid_list, -# run_date=run_date.strftime("%Y-%m-%d") -# ) + sql = sql_template.format( + empid_list=empid_list, + run_date=run_date.strftime("%Y-%m-%d") + ) -# log.info(f"Fetching data for {len(empids):,} EMPIDs") + log.info(f"Fetching data for {len(empids):,} EMPIDs") -# log.info("Fetching OQaD data for run_date=%s", run_date) + log.info("Fetching OQaD data for run_date=%s", run_date) -# df = pl.read_database( -# query=sql, -# connection=engine, -# ) + df = pl.read_database( + query=sql, + connection=engine, + ) -# log.info("fn name is fetch_OQad ------Fetched %s rows", len(df)) + log.info("fn name is fetch_OQad ------Fetched %s rows", len(df)) -# return df + return df diff --git a/src/fact_updated.py b/src/fact_updated.py new file mode 100644 index 0000000..3a30b4f --- /dev/null +++ b/src/fact_updated.py @@ -0,0 +1,903 @@ +from pathlib import Path +import polars as pl +from sqlalchemy import Engine +from datetime import date , timedelta +from log import log + + + + +from db_con.connection import ( + build_sql_server_engine, + build_clickhouse_engine, + get_clickhouse_client, +) + + + + + + + + + +registry = {} + +def register(func): + registry[func.__name__] = func + return func + + + + +# def fetch_data( +# engine: Engine, +# table_name: str, +# table_type: str, +# mids: list[int], +# run_date: date +# ) -> pl.DataFrame: + +# if not mids: +# log.warning("No MIDs — nothing to fetch.") +# return pl.DataFrame() + +# mid_list = ",".join(str(mid) for mid in mids) + +# sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" + +# with open(sql_file, "r", encoding="utf-8") as f: +# sql_template = f.read() + +# sql = sql_template.format( +# mid_list=mid_list, +# run_date=run_date.strftime("%Y-%m-%d") +# ) + +# log.info(f"Fetching data for {len(mids):,} MIDs") + +# df = pl.read_database( +# query=sql, +# connection=engine +# ) + +# log.info(f"Fetched {len(df):,} rows from SQL Server") + +# return df + + +@register +def fetch_SOS_OneApp( engine: Engine, + table_name: str, + table_type: str, + mids: list[int], + run_date: date +) -> pl.DataFrame: + + + + if not mids: + log.warning("No MIDs — nothing to fetch.") + return pl.DataFrame() + log.info(f" Start Fetching data for these {len(mids):} MIDs ") + mid_list = ",".join(str(mid) for mid in mids) + log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}") + sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" + + with open(sql_file, "r", encoding="utf-8") as f: + sql_template = f.read() + + sql = sql_template.format( + mid_list=mid_list, + run_date=run_date.strftime("%Y-%m-%d") + ) + log.info(f"Fetching data for {len(mids):,} MIDs") + + df = pl.read_database( + query=sql, + connection=engine + ) + log.info(f"Fetched {len(df):,} rows from SQL Server") + + return df + + +@register +def fetch_additional_visibility( engine: Engine, + table_name: str, + table_type: str, + mids: list[int], + run_date: date +) -> pl.DataFrame: + + + + if not mids: + log.warning("No MIDs — nothing to fetch.") + return pl.DataFrame() + + log.info(f" Start Fetching data for these {len(mids):} MIDs ") + mid_list = ",".join(str(mid) for mid in mids) + log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}") + sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" + + with open(sql_file, "r", encoding="utf-8") as f: + sql_template = f.read() + + sql = sql_template.format( + mid_list=mid_list, + run_date=run_date.strftime("%Y-%m-%d") + ) + log.info(f"Fetching data for {len(mids):,} MIDs") + + df = pl.read_database( + query=sql, + connection=engine + ) + log.info(f"Fetched {len(df):,} rows from SQL Server") + + return df + + + +# def fetch_OQaD( +# sql_engine: Engine, +# table_name: str, +# table_type: str, +# mids: list[int], +# run_date: date +# ) -> pl.DataFrame: + +# # ───────────────────────────────────────────── +# # INNER HELPERS (defined once, used below) +# # ───────────────────────────────────────────── + +# client = get_clickhouse_client() + +# # ── Does a ClickHouse table exist? ──────────── +# def table_exists(client, table_name: str) -> bool: + +# return bool(client.command(f"EXISTS TABLE {table_name}")) + +# # ── STEP 1: Who submitted yesterday in SQL Server? ─── +# def fetch_quiz_empids(engine: Engine, run_date: date) -> pl.DataFrame: + + +# # Format date ONCE safely — avoids f-string injection bugs +# run_date_str = run_date.strftime("%Y-%m-%d") +# next_date_str = (run_date + timedelta(days=1)).strftime("%Y-%m-%d") + +# sql = f""" +# WITH MID_TABLE_COV1 AS +# ( +# -- Records CREATED yesterday +# SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate +# FROM OneApp_KelloggsMT.dbo.T_OQAD +# WHERE CreateDate >= '{run_date_str}' +# AND CreateDate < '{next_date_str}' + +# UNION ALL + +# -- Records UPDATED yesterday (different rows, safe to UNION ALL) +# SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate +# FROM OneApp_KelloggsMT.dbo.T_OQAD +# WHERE UpdateDate >= '{run_date_str}' +# AND UpdateDate < '{next_date_str}' +# ), +# QUIZ AS +# ( +# SELECT DISTINCT +# E.EmpId AS empid, +# CAST(DQ.VisitDate AS DATE) AS visitdate +# FROM OneApp_KelloggsMT.dbo.T_OQAD DQ +# INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail E +# ON DQ.EmpId = E.EmpId +# INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Question QU +# ON DQ.QuestionId = QU.QuestionId +# INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Category QC +# ON QU.QuestionCategoryId = QC.QuestionCategoryId +# WHERE E.EmpName NOT LIKE '%TEST%' -- exclude test employees +# AND E.RightId = 6 -- only field reps +# AND ( +# E.ResignDate IS NULL +# OR CAST(E.ResignDate AS DATE) >= '{run_date_str}' +# ) +# AND EXISTS ( -- ✅ EXISTS beats IN for large sets +# SELECT 1 +# FROM MID_TABLE_COV1 A +# WHERE A.EmpId = DQ.EmpId +# AND A.VisitDate = CAST(DQ.VisitDate AS DATE) +# ) +# ) +# SELECT * FROM QUIZ +# """ + +# log.info("Fetching quiz empids for run_date=%s", run_date_str) +# df = pl.read_database(query=sql, connection=engine) +# log.info("Fetched %s (EmpId, VisitDate) pairs from SQL Server", len(df)) +# return df + +# # ── STEP 2: Who do we ALREADY have in ClickHouse? ─── +# def get_empids_clickhouse_OQAD( +# client, +# table_name: str = "OQaD", +# ) -> pl.DataFrame: + + +# if not table_exists(client, table_name): +# log.warning("Table '%s' does not exist in ClickHouse.", table_name) +# return pl.DataFrame(schema={"empid": pl.Int64, "visitdate": pl.Date}) + +# query = f""" +# SELECT DISTINCT +# employee_id AS empid, +# visit_date AS visitdate +# FROM {table_name} +# """ + +# arrow_table = client.query_arrow(query) +# df = pl.from_arrow(arrow_table) +# log.info("Fetched %s existing (EmpId, VisitDate) pairs from ClickHouse", len(df)) +# return df + +# # ── STEP 3: Who is NEW? (in SQL Server but NOT yet in ClickHouse) ─── +# def find_new_empids( +# sql_df: pl.DataFrame, +# ch_df: pl.DataFrame, +# ) -> list[int]: + + +# new_df = sql_df.join( +# ch_df, +# on=["empid", "visitdate"], +# how="anti", # ✅ anti = keep rows NOT found in ch_df +# ) + +# if new_df.is_empty(): +# log.warning("No new EmpIds found for table=%s — nothing to fetch.", table_name) +# return [0] # sentinel value — the .sql WHERE will return 0 rows safely + +# empids = new_df["empid"].unique().to_list() +# log.info("Found %s NEW empids to fetch for %s", len(empids), table_name) +# return empids + +# # ── STEP 4: Fetch full quiz data for new empids ─── +# def fetch_data( +# engine: Engine, +# table_name: str, +# table_type: str, +# empids: list[int], +# run_date: date, +# ) -> pl.DataFrame: + + +# run_date_str = run_date.strftime("%Y-%m-%d") +# empid_list = ", ".join(str(e) for e in empids) # "101, 102, 103" + +# sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql" +# log.info("Loading SQL from: %s (exists=%s)", sql_file.resolve(), sql_file.exists()) + +# with open(sql_file, "r", encoding="utf-8") as f: +# sql_template = f.read() + +# sql = sql_template.format( +# empid_list=empid_list, +# run_date=run_date_str, +# ) + +# log.info("Fetching full OQaD data for %s empids, run_date=%s", len(empids), run_date_str) +# df = pl.read_database(query=sql, connection=engine) +# log.info("Fetched %s rows from SQL Server for table=%s", len(df), table_name) +# return df + +# # ───────────────────────────────────────────── +# # MAIN FLOW (the 4 steps, clearly sequenced) +# # ───────────────────────────────────────────── + +# qf = fetch_quiz_empids(sql_engine, run_date) # Step 1 +# db_df = get_empids_clickhouse_OQAD(client, table_name) # Step 2 +# empids = find_new_empids(qf, db_df) # Step 3 + +# df = fetch_data( # Step 4 +# engine=sql_engine, +# table_name=table_name, +# table_type=table_type, +# empids=empids, +# run_date=run_date, +# ) + +# log.info("fetch_OQaD complete — returning %s rows", len(df)) +# return df + + +@register +def fetch_OQaD( + sql_engine: Engine, + table_name: str, + table_type: str, + mids: list[int], + run_date: date +) -> pl.DataFrame: + + + client= get_clickhouse_client() + def table_exists( + client, + table_name: str, + ) -> bool: + + return bool( + client.command( + f"EXISTS TABLE {table_name}" + ) + ) + + + + def fetch_quiz_empids(engine: Engine, run_date : date) -> pl.DataFrame: + + sql_template = f""" + WITH MID_TABLE_COV1 AS + ( + SELECT EmpId, VisitDate + FROM OneApp_KelloggsMT.dbo.T_OQAD + WHERE CreateDate >= {run_date} + AND CreateDate < DATEADD(DAY,1,'{run_date}') + + UNION + + SELECT EmpId, VisitDate + FROM OneApp_KelloggsMT.dbo.T_OQAD + WHERE UpdateDate >= {run_date} + AND UpdateDate < DATEADD(DAY,1, '{run_date}') + ), + + QUIZ AS + ( + SELECT Distinct E.EmpId as empid + , CONVERT(date,DQ.VisitDate) AS visitdate + FROM OneApp_KelloggsMT.dbo.T_OQAD DQ INNER JOIN + OneApp_KelloggsMT.dbo.vw_Employee_Detail E ON DQ.EmpId = E.EmpId inner join + OneApp_KelloggsMT.dbo.Master_OQAD_Question QU on DQ.QuestionId= qu.QuestionId inner join + OneApp_KelloggsMT.dbo.Master_OQAD_Category qc on qu.QuestionCategoryId= qc.QuestionCategoryId + where e.EmpName not like 'test%' and e.RightId in (6) + and (E.ResignDate is null or E.ResignDate>=''+CONVERT(VARCHAR,'{run_date}')+'') AND E.EmpName NOT LIKE '%TEST%' + AND DQ.EmpId IN (SELECT EmpId FROM MID_TABLE_COV1 A WHERE + DQ.EmpId=A.EmpId AND CONVERT(date,VisitDate)=CONVERT(date,A.VisitDate) ) + ) select * from quiz + """ + sql = sql_template.format( + run_date=run_date.strftime("%Y-%m-%d") + ) + + log.info(f"Fetching quiz_empids data for EMPID and Visitid") + + df = pl.read_database( + query=sql, + connection=engine + ) + + + log.info(f"Fetched {len(df):,} total empid and visitdate fetched for OQAD from SQL Server") + + return df + + + def get_empids_clickhouse_OQAD( + client, + table_name: str = "OQaD", + ) -> pl.DataFrame: + + if not table_exists(client, table_name): + log.warning(f"Table '{table_name}' does not exist.") + return pl.DataFrame( + schema={ + "empid": pl.Int64, + "visitdate": pl.Date, + } + ) + + query = f""" + SELECT DISTINCT + employee_id AS empid, + visit_date AS visitdate + FROM {table_name} + """ + + # ClickHouse -> PyArrow -> Polars + arrow_table = client.query_arrow(query) + + return pl.from_arrow(arrow_table) + + + + qf=fetch_quiz_empids(sql_engine,run_date) + db_df = get_empids_clickhouse_OQAD(client) + + matched = qf.join( + db_df, + on=["empid", "visitdate"], + how="inner", + ) + + if matched.is_empty(): + + empids=[0] + log.warning( + "%s Matched df in OQaD returned no rows", + table_name, + ) + + else: + empids=matched["empid"].to_list() + + + log.info(f"Fetched {len(empids):,} matched empids fetched for OQAD ") + + def fetch_data( + engine: Engine, + table_name: str, + table_type: str, + empids: list[int], + run_date: date + ) -> pl.DataFrame: + + empid_list = ",".join(str(empid) for empid in empids) + + + sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql" + + log.info(f"Exists: {sql_file.exists()}") + log.info(f"Path: {sql_file.resolve()}") + + with open(sql_file, "r", encoding="utf-8") as f: + sql_template = f.read() + + sql = sql_template.format( + empid_list=empid_list, + run_date=run_date.strftime("%Y-%m-%d") + ) + + log.info(f"Fetching data for {len(empids):,} EMPIDs") + + log.info("Fetching OQaD data for run_date=%s", run_date) + + df = pl.read_database( + query=sql, + connection=engine, + ) + + log.info("fn name is fetch_OQad ------Fetched %s rows", len(df)) + + return df + df=fetch_data( engine=sql_engine, + table_name=table_name, + table_type=table_type, + empids=empids, + run_date=run_date + ) + log.info(f"Fetched {len(df):,} rows from SQL Server") + + return df + + +@register +def fetch_OQaD( + engine: Engine, + table_name: str, + table_type: str, + empids: list[int], + run_date: date +) -> pl.DataFrame: + + empid_list = ",".join(str(empid) for empid in empids) + + + sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql" + + log.info(f"Exists: {sql_file.exists()}") + log.info(f"Path: {sql_file.resolve()}") + + with open(sql_file, "r", encoding="utf-8") as f: + sql_template = f.read() + + sql = sql_template.format( + empid_list=empid_list, + run_date=run_date.strftime("%Y-%m-%d") + ) + + log.info(f"Fetching data for {len(empids):,} EMPIDs") + + log.info("Fetching OQaD data for run_date=%s", run_date) + + df = pl.read_database( + query=sql, + connection=engine, + ) + + log.info("fn name is fetch_OQad ------Fetched %s rows", len(df)) + + return df + + + + +@register + +def fetch_Coverage( engine: Engine, + table_name: str, + table_type: str, + mids: list[int], + run_date: date +) -> pl.DataFrame: + + + + if not mids: + log.warning("No MIDs — nothing to fetch.") + return pl.DataFrame() + log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}") + mid_list = ",".join(str(mid) for mid in mids) + + sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" + + with open(sql_file, "r", encoding="utf-8") as f: + sql_template = f.read() + + sql = sql_template.format( + mid_list=mid_list, + run_date=run_date.strftime("%Y-%m-%d") + ) + log.info(f"Fetching data for {len(mids):,} MIDs") + + df = pl.read_database( + query=sql, + connection=engine + ) + log.info(f"Fetched {len(df):,} rows from SQL Server") + + return df + + +@register +def fetch_Survey( engine: Engine, + table_name: str, + table_type: str, + mids: list[int], + run_date: date +) -> pl.DataFrame: + + + + if not mids: + log.warning("No MIDs — nothing to fetch.") + return pl.DataFrame() + + mid_list = ",".join(str(mid) for mid in mids) + log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}") + sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" + + with open(sql_file, "r", encoding="utf-8") as f: + sql_template = f.read() + + sql = sql_template.format( + mid_list=mid_list, + run_date=run_date.strftime("%Y-%m-%d") + ) + log.info(f"Fetching data for {len(mids):,} MIDs") + + df = pl.read_database( + query=sql, + connection=engine + ) + log.info(f"Fetched {len(df):,} rows from SQL Server") + + return df + + + + +@register +def fetch_Login( engine: Engine, + table_name: str, + table_type: str, + mids: list[int], + run_date: date +) -> pl.DataFrame: + + + + # if not mids: + # log.warning("No MIDs — nothing to fetch.") + # return pl.DataFrame() + + mid_list = ",".join(str(mid) for mid in mids) + log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}") + sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" + + with open(sql_file, "r", encoding="utf-8") as f: + sql_template = f.read() + + sql = sql_template.format( + # mid_list=mid_list, + run_date=run_date.strftime("%Y-%m-%d") + ) + log.info(f"Fetching data for {len(mids):,} MIDs") + + df = pl.read_database( + query=sql, + connection=engine + ) + log.info(f"Fetched {len(df):,} rows from SQL Server") + + return df + + + + + +@register +def fetch_Stock_Details( engine: Engine, + table_name: str, + table_type: str, + mids: list[int], + run_date: date +) -> pl.DataFrame: + + + + if not mids: + log.warning("No MIDs — nothing to fetch.") + return pl.DataFrame() + + mid_list = ",".join(str(mid) for mid in mids) + log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}") + sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" + + with open(sql_file, "r", encoding="utf-8") as f: + sql_template = f.read() + + sql = sql_template.format( + mid_list=mid_list, + run_date=run_date.strftime("%Y-%m-%d") + ) + log.info(f"Fetching data for {len(mids):,} MIDs") + + df = pl.read_database( + query=sql, + connection=engine + ) + log.info(f"Fetched {len(df):,} rows from SQL Server") + + return df + + + + +# def fetch_Coverage( engine: Engine, +# table_name: str, +# table_type: str, +# mids: list[int], +# run_date: date +# ) -> pl.DataFrame: + + + +# if not mids: +# log.warning("No MIDs — nothing to fetch.") +# return pl.DataFrame() + +# mid_list = ",".join(str(mid) for mid in mids) +# log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}") +# sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" + +# with open(sql_file, "r", encoding="utf-8") as f: +# sql_template = f.read() + +# sql = sql_template.format( +# mid_list=mid_list, +# run_date=run_date.strftime("%Y-%m-%d") +# ) +# log.info(f"Fetching data for {len(mids):,} MIDs") + +# df = pl.read_database( +# query=sql, +# connection=engine +# ) +# log.info(f"Fetched {len(df):,} rows from SQL Server") + +# return df + + +@register +def fetch_Attendance( + engine: Engine, + table_name: str, + table_type: str, + mids: list[int], + run_date: date | None = None, + days_back: int = 15 +) -> pl.DataFrame: + """ + Fetch attendance source data. + + Default: + end_date = yesterday + start_date = yesterday - 15 days + """ + + if run_date is None: + run_date = date.today() - timedelta(days=1) + + start_date = run_date - timedelta(days=days_back) + + sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" + + with open(sql_file, "r", encoding="utf-8") as f: + sql_template = f.read() + + sql = sql_template.format( + start_date=start_date.strftime("%Y-%m-%d"), + run_date=run_date.strftime("%Y-%m-%d") + ) + log.info( + f"Fetching Attendance data from {start_date} to {run_date}" + ) + + df = pl.read_database( + query=sql, + connection=engine + ) + + log.info( + f"Fetched {len(df):,} attendance rows " + f"for {df['employee_id'].n_unique():,} employees" + ) + + return df + + +@register +def fetch_Journey_Plan( engine: Engine, + table_name: str, + table_type: str, + mids: list[int], + run_date: date +) -> pl.DataFrame: + + + + if not mids: + log.warning("No MIDs — nothing to fetch.") + return pl.DataFrame() + + log.info(f" Start Fetching data for these {len(mids):} MIDs ") + mid_list = ",".join(str(mid) for mid in mids) + log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}") + sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" + + with open(sql_file, "r", encoding="utf-8") as f: + sql_template = f.read() + + sql = sql_template.format( + mid_list=mid_list, + run_date=run_date.strftime("%Y-%m-%d") + ) + log.info(f"Fetching data for {len(mids):,} MIDs") + + df = pl.read_database( + query=sql, + connection=engine + ) + log.info(f"Fetched {len(df):,} rows from SQL Server") + + return df + + +@register +def fetch_PaidVisibility( engine: Engine, + table_name: str, + table_type: str, + mids: list[int], + run_date: date +) -> pl.DataFrame: + + + + if not mids: + log.warning("No MIDs — nothing to fetch.") + return pl.DataFrame() + + log.info(f" Start Fetching data for these {len(mids):} MIDs ") + mid_list = ",".join(str(mid) for mid in mids) + log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}") + sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" + + with open(sql_file, "r", encoding="utf-8") as f: + sql_template = f.read() + + sql = sql_template.format( + mid_list=mid_list, + run_date=run_date.strftime("%Y-%m-%d") + ) + log.info(f"Fetching data for {len(mids):,} MIDs") + + df = pl.read_database( + query=sql, + connection=engine + ) + log.info(f"Fetched {len(df):,} rows from SQL Server") + + return df + +@register +def fetch_Web_Logins( engine: Engine, + table_name: str, + table_type: str, + mids: list[int], + run_date: date +) -> pl.DataFrame: + + + + if not mids: + log.warning("No MIDs — nothing to fetch.") + return pl.DataFrame() + + log.info(f" Start Fetching data for these {len(mids):} MIDs ") + mid_list = ",".join(str(mid) for mid in mids) + log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}") + sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" + + with open(sql_file, "r", encoding="utf-8") as f: + sql_template = f.read() + + sql = sql_template.format( + mid_list=mid_list, + run_date=run_date.strftime("%Y-%m-%d") + ) + log.info(f"Fetching data for {len(mids):,} MIDs") + + df = pl.read_database( + query=sql, + connection=engine + ) + log.info(f"Fetched {len(df):,} rows from SQL Server") + + return df + +@register +def fetch_Promotion( + engine: Engine, + table_name: str, + table_type: str, + mids: list[int], + run_date: date +) -> pl.DataFrame: + + if not mids: + log.warning("No MIDs — nothing to fetch.") + return pl.DataFrame() + + mid_list = ",".join(str(mid) for mid in mids) + + sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql" + + with open(sql_file, "r", encoding="utf-8") as f: + sql_template = f.read() + + sql = sql_template.format( + mid_list=mid_list, + run_date=run_date.strftime("%Y-%m-%d") + ) + + log.info(f"Fetching data for {len(mids):,} MIDs") + + df = pl.read_database( + query=sql, + connection=engine + ) + + log.info(f"Fetched {len(df):,} rows from SQL Server") + + return df \ No newline at end of file diff --git a/src/sql/fact/OQaD.sql b/src/sql/fact/OQaD.sql index bf6feff..0324f67 100644 --- a/src/sql/fact/OQaD.sql +++ b/src/sql/fact/OQaD.sql @@ -1,76 +1,40 @@ + WITH MID_TABLE_COV1 AS ( - - SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate - FROM OneApp_KelloggsMT.dbo.T_OQAD - WHERE CreateDate >= '{run_date}' - AND CreateDate < DATEADD(DAY, 1, '{run_date}') - - UNION ALL - - SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate - FROM OneApp_KelloggsMT.dbo.T_OQAD - WHERE UpdateDate >= '{run_date}' - AND UpdateDate < DATEADD(DAY, 1, '{run_date}') +SELECT EmpId,VisitDate FROM OneApp_KelloggsMT.dbo.T_OQAD + WHERE CONVERT(VARCHAR,CreateDate,101)= CONVERT(VARCHAR,cast('{run_date}' as date),101) + UNION + SELECT EmpId,VisitDate FROM OneApp_KelloggsMT.dbo.T_OQAD + WHERE CONVERT(VARCHAR,UpdateDate,101)= CONVERT(VARCHAR,cast('{run_date}' as date),101) ), - -QUIZ AS -( - SELECT DISTINCT - E.EmpId, - E.EmpName, - E.SupervisorId, - E.SupervisorName, - E.DesignationName, - E.CityName, - E.StateName, - E.RegionName, - CAST(DQ.VisitDate AS DATE) AS VisitDate, - DQ.QuestionId, - DQ.AnswerId, - QC.QuestionCategoryId, - QC.QuestionCategory - FROM OneApp_KelloggsMT.dbo.T_OQAD DQ - INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail E - ON DQ.EmpId = E.EmpId - INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Question QU - ON DQ.QuestionId = QU.QuestionId - INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Category QC - ON QU.QuestionCategoryId = QC.QuestionCategoryId - WHERE E.EmpName NOT LIKE '%TEST%' - AND E.RightId = 6 - AND (E.ResignDate IS NULL OR CAST(E.ResignDate AS DATE) >= '{run_date}') - AND EXISTS ( - SELECT 1 - FROM MID_TABLE_COV1 A - WHERE A.EmpId = DQ.EmpId - AND A.VisitDate = CAST(DQ.VisitDate AS DATE) - ) - -- ✅ Exclude EmpIds already loaded into ClickHouse - AND E.EmpId NOT IN ({empid_list}) -) - -SELECT - 40148 AS project_id, - Q.EmpId AS employee_id, - 0 AS process_id, - Q.VisitDate AS visit_date, - Q.QuestionCategoryId AS question_category_id, - Q.QuestionCategory AS question_category, - QM.QuestionId AS question_id, - QM.Question AS question, - ISNULL(QA.AnswerId, 0) AS answer_id, - ISNULL(QA.Answer, '') AS answer, - CASE - WHEN QA.AnswerId IS NULL THEN 'Not Answer' - WHEN QA.RightAnswer = 1 THEN 'Y' - WHEN QA.RightAnswer IS NULL THEN 'Not Answer' - ELSE 'N' - END AS correct_answer, - GETDATE() AS update_date, - 'SP-Pius' AS update_by -FROM QUIZ Q -INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Question QM - ON Q.QuestionId = QM.QuestionId -LEFT JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Answer QA - ON Q.AnswerId = QA.AnswerId \ No newline at end of file +QUIZ +As +( +SELECT Distinct E.EmpId, E.EmpName, E.SupervisorId, E.SupervisorName, E.DesignationName, E.CityName, E.StateName, E.RegionName +, CONVERT(VARCHAR,DQ.VisitDate,101) AS VisitDate +, DQ.QuestionId, DQ.AnswerId, qc.QuestionCategoryId, qc.QuestionCategory +FROM OneApp_KelloggsMT.dbo.T_OQAD DQ INNER JOIN +OneApp_KelloggsMT.dbo.vw_Employee_Detail E ON DQ.EmpId = E.EmpId inner join +OneApp_KelloggsMT.dbo.Master_OQAD_Question QU on DQ.QuestionId= qu.QuestionId inner join +OneApp_KelloggsMT.dbo.Master_OQAD_Category qc on qu.QuestionCategoryId= qc.QuestionCategoryId +where e.EmpName not like 'test%' and e.RightId in (6) +and (E.ResignDate is null or E.ResignDate>=''+CONVERT(VARCHAR,cast('{run_date}' as date),101)+'') AND E.EmpName NOT LIKE '%TEST%' +AND DQ.EmpId IN (SELECT EmpId FROM MID_TABLE_COV1 A WHERE +DQ.EmpId=A.EmpId AND CONVERT(date,DQ.VisitDate,101)=CONVERT(date,A.VisitDate,101) ) +) +, OQaD (project_id, +employee_id,process_id,visit_date,question_category_id,question_category,question_id, +question,answer_id,answer,correct_answer,update_date,update_by) +AS ( +Select '40148' AS ProjectId,Q.EmpId,0,Q.VisitDate,Q.QuestionCategoryId,Q.QuestionCategory, + QM.QuestionId, QM.Question +, CASE WHEN QA.AnswerId IS NULL THEN 0 ELSE QA.AnswerId END AS AnswerId +,CASE WHEN QA.ANSWER IS NULL THEN '' ELSE QA.ANSWER END AS Answer +, CASE WHEN QA.AnswerId IS NULL THEN 'Not Answer' ELSE CASE WHEN QA.RightAnswer = 1 THEN 'Y' +ELSE CASE WHEN QA.RightAnswer IS NULL THEN 'Not Answer' ELSE 'N' END END END AS RightAnswer,GETDATE(),'SP-Pius' +FROM QUIZ Q INNER JOIN +OneApp_KelloggsMT.dbo.Master_OQAD_Question QM ON Q.QuestionId = QM.QuestionId Left JOIN +OneApp_KelloggsMT.dbo.Master_OQAD_Answer QA ON Q.AnswerId= QA.AnswerId + where Q.EmpId not in ( {empid_list}) + ) +select * from OQaD \ No newline at end of file diff --git a/successful_Pipeline_dates_config.yml b/successful_Pipeline_dates_config.yml index 278d2a6..80a569b 100644 --- a/successful_Pipeline_dates_config.yml +++ b/successful_Pipeline_dates_config.yml @@ -1,2 +1,2 @@ -- pipeline_trigeered_on_date: '2026-06-23' - last_successful_run_date: 2026-06-22 +- pipeline_trigeered_on_date: '2026-06-25' + last_successful_run_date: 2026-06-24 diff --git a/tables.yml b/tables.yml new file mode 100644 index 0000000..e8d33cd --- /dev/null +++ b/tables.yml @@ -0,0 +1 @@ +fn(sql_engine, table_name , table_type, mids, run_date) \ No newline at end of file