Compare commits

1 Commits

Author SHA1 Message Date
Ankit Malik 1d5ad2d793 1st commit 2026-06-25 11:20:22 +05:30
14 changed files with 2322 additions and 620 deletions
+15 -5
View File
@@ -20,12 +20,22 @@ def load_to_clickhouse(
log.warning(f"{table_name}: DataFrame is empty. Skipping.")
return
arrow_table = df.to_arrow()
chunk_size = 10000
client.insert_arrow(
table=table_name,
arrow_table=arrow_table,
)
for start in range(0, len(df), chunk_size):
end = start + chunk_size
chunk_df = df.slice(start, chunk_size)
arrow_table = chunk_df.to_arrow()
client.insert_arrow(
table=table_name,
arrow_table=arrow_table,
)
log.info(
f"Inserted rows {start:,} to {min(end, len(df)):,}"
)
log.info(
f"{table_name}: inserted {len(df):,} rows into ClickHouse"
+3 -1
View File
@@ -86,5 +86,7 @@ def get_clickhouse_client():
port=CH_PORT,
username=CH_USER,
password=CH_PASS,
database=CH_DB
database=CH_DB,
connect_timeout=30,
send_receive_timeout=600,
)
+266
View File
@@ -0,0 +1,266 @@
from __future__ import annotations
from time import sleep
import sys
from datetime import date, datetime, timedelta
import polars as pl
import yaml
from log import log
from clickhouse_task.create_table import create_clickhouse_table
from clickhouse_task.delete_task import (
delete_existing_data,
truncate_table,
)
from clickhouse_task.load_table import load_to_clickhouse
from db_con.connection import (
build_sql_server_engine,
build_clickhouse_engine,
get_clickhouse_client,
)
from mids import (
MID_TABLE_COV,
MID_TABLE_COV1,
)
from src.bridge import *
from src.fact import *
from src.dim import *
# ==========================================================
# Helpers
# ==========================================================
def get_dates_from_yaml(filename: str):
with open(filename, "r") as file:
data = yaml.safe_load(file)
start_date = date.fromisoformat(
str(data["pipeline"]["start_date"])
)
end_date = date.fromisoformat(
str(data["pipeline"]["end_date"])
)
flag=str(data["pipeline"]["flag"])
return start_date, end_date , flag
def write_table_to_yaml(
data: dict,
run_date: date,
filename: str | None = None
):
"""Write table data to a YAML file."""
if filename is None:
filename = f"elt_pipeline_{run_date}.yml"
with open(filename, "w") as file:
yaml.dump(
data,
file,
default_flow_style=False,
sort_keys=False
)
print(f"Table written to {filename}")
def table_exists(
client,
table_name: str,
) -> bool:
return bool(
client.command(
f"EXISTS TABLE {table_name}"
)
)
# ==========================================================
# elt ()
# ==========================================================
def elt(run_date : date , config: dict):
log.info("=" * 80)
log.info("Hello from data-move Python data pipeline!")
# ------------------------------------------------------
# Run Date
# ------------------------------------------------------
log.info(
"Pipeline Run Date: %s",
run_date,
)
# ------------------------------------------------------
# Connections
# ------------------------------------------------------
log.info(
"Connecting to databases..."
)
sql_engine = build_sql_server_engine()
clickhouse_engine = build_clickhouse_engine()
client = get_clickhouse_client()
log.info(
"Database connections established"
)
# ------------------------------------------------------
# mids Keys
# ------------------------------------------------------
mids = MID_TABLE_COV(
sql_engine,
run_date,
)
emp_visit_df = MID_TABLE_COV1(
sql_engine,
run_date,
)
# ------------------------------------------------------
# Process Tables
# ------------------------------------------------------
for table in config["tables"]:
table_name = table["name"]
operation = table["operation"]
fetch_by = table["fetch_by"]
table_type=table["type"]
log.info(
"Pipeline Run Date: %s",
run_date,
)
log.info("=" * 80)
log.info(f"Processing Table-:{table_name} | Table type -:{table_type} | fetcht by-:{fetch_by} | operation-:{operation}" )
try:
# ------------------------------------------
# Fetch Data
# ------------------------------------------
log.info(f"Fetching Data from sql server for table-: {table_name} ..............")
fetch_list=["mids" ,"run_date", "reason_id"]
if fetch_by in fetch_list :
fn_name = f"fetch_{table_name}"
fn = globals()[fn_name]
df=fn(sql_engine, table_name , table_type, mids, run_date)
else:
df = fetch_data(sql_engine ,table_name,table_type)
log.info(f"Fetched total row -: {len(df)} from sql server for table-:{table_name} ...........!!!")
if df.is_empty():
log.warning(
"%s returned no rows",
table_name,
)
continue
log.info(
"Fetched %s rows",
len(df),
)
# ------------------------------------------
# Create Table If Missing
# ------------------------------------------
exists = table_exists(
client,
table_name,
)
if not exists:
log.info(
"Creating table %s",
table_name,
)
create_clickhouse_table(
df=df,
table_name=table_name,
clickhouse_engine=clickhouse_engine,
)
# ------------------------------------------
# Existing Table Logic
# ------------------------------------------
else:
if operation == "DELETE+INSERT":
truncate_table(
client,
table_name,
)
else:
delete_existing_data(
client=client,
table_name=table_name,
run_date=run_date,
mids=mids,
emp_visit_df=emp_visit_df,
)
# ------------------------------------------
# Load Data
# ------------------------------------------
log.info("_ _ _ _Inserting data into clickhouse db from sql server_ _ _ _")
load_to_clickhouse(
client=client,
table_name=table_name,
df=df,
)
log.info(
"%s loaded successfully (%s rows)",
table_name,
len(df),
)
except Exception:
log.exception(
"Failed processing table %s",
table_name,
)
raise
log.info("=" * 80)
log.info("Pipeline Completed Successfully")
log.info("=" * 80)
+264
View File
@@ -0,0 +1,264 @@
from __future__ import annotations
from time import sleep
import sys
from datetime import date, datetime, timedelta
import polars as pl
import yaml
from log import log
from clickhouse_task.create_table import create_clickhouse_table
from clickhouse_task.delete_task import (
delete_existing_data,
truncate_table,
)
from clickhouse_task.load_table import load_to_clickhouse
from db_con.connection import (
build_sql_server_engine,
build_clickhouse_engine,
get_clickhouse_client,
)
from mids import (
MID_TABLE_COV,
MID_TABLE_COV1,
)
from src.bridge import *
from src.fact_updated import *
from src.dim import *
# ==========================================================
# Helpers
# ==========================================================
def get_dates_from_yaml(filename: str):
with open(filename, "r") as file:
data = yaml.safe_load(file)
start_date = date.fromisoformat(
str(data["pipeline"]["start_date"])
)
end_date = date.fromisoformat(
str(data["pipeline"]["end_date"])
)
flag=str(data["pipeline"]["flag"])
return start_date, end_date , flag
def write_table_to_yaml(
data: dict,
run_date: date,
filename: str | None = None
):
"""Write table data to a YAML file."""
if filename is None:
filename = f"elt_pipeline_{run_date}.yml"
with open(filename, "w") as file:
yaml.dump(
data,
file,
default_flow_style=False,
sort_keys=False
)
print(f"Table written to {filename}")
def table_exists(
client,
table_name: str,
) -> bool:
return bool(
client.command(
f"EXISTS TABLE {table_name}"
)
)
# ==========================================================
# elt ()
# ==========================================================
def elt(run_date : date , config: dict):
log.info("=" * 80)
log.info("Hello from data-move Python data pipeline!")
# ------------------------------------------------------
# Run Date
# ------------------------------------------------------
log.info(
"Pipeline Run Date: %s",
run_date,
)
# ------------------------------------------------------
# Connections
# ------------------------------------------------------
log.info(
"Connecting to databases..."
)
sql_engine = build_sql_server_engine()
clickhouse_engine = build_clickhouse_engine()
client = get_clickhouse_client()
log.info(
"Database connections established"
)
# ------------------------------------------------------
# mids Keys
# ------------------------------------------------------
mids = MID_TABLE_COV(
sql_engine,
run_date,
)
emp_visit_df = MID_TABLE_COV1(
sql_engine,
run_date,
)
# ------------------------------------------------------
# Process Tables
# ------------------------------------------------------
for table in config["tables"]:
table_name = table["name"]
operation = table["operation"]
fetch_by = table["fetch_by"]
table_type=table["type"]
log.info(
"Pipeline Run Date: %s",
run_date,
)
log.info("=" * 80)
log.info(f"Processing Table-:{table_name} | Table type -:{table_type} | fetcht by-:{fetch_by} | operation-:{operation}" )
try:
# ------------------------------------------
# Fetch Data
# ------------------------------------------
log.info(f"Fetching Data from sql server for table-: {table_name} ..............")
fetch_list=["mids" ,"run_date", "reason_id"]
if fetch_by in fetch_list :
df=registry[f"fetch_{table_name}"](sql_engine, table_name , table_type, mids, run_date)
else:
df = fetch_data(sql_engine ,table_name,table_type)
log.info(f"Fetched total row -: {len(df)} from sql server for table-:{table_name} ...........!!!")
if df.is_empty():
log.warning(
"%s returned no rows",
table_name,
)
continue
log.info(
"Fetched %s rows",
len(df),
)
# ------------------------------------------
# Create Table If Missing
# ------------------------------------------
exists = table_exists(
client,
table_name,
)
if not exists:
log.info(
"Creating table %s",
table_name,
)
create_clickhouse_table(
df=df,
table_name=table_name,
clickhouse_engine=clickhouse_engine,
)
# ------------------------------------------
# Existing Table Logic
# ------------------------------------------
else:
if operation == "DELETE+INSERT":
truncate_table(
client,
table_name,
)
else:
delete_existing_data(
client=client,
table_name=table_name,
run_date=run_date,
mids=mids,
emp_visit_df=emp_visit_df,
)
# ------------------------------------------
# Load Data
# ------------------------------------------
log.info("_ _ _ _Inserting data into clickhouse db from sql server_ _ _ _")
load_to_clickhouse(
client=client,
table_name=table_name,
df=df,
)
log.info(
"%s loaded successfully (%s rows)",
table_name,
len(df),
)
except Exception:
log.exception(
"Failed processing table %s",
table_name,
)
raise
log.info("=" * 80)
log.info("Pipeline Completed Successfully")
log.info("=" * 80)
+1 -1
View File
@@ -1,3 +1,3 @@
- pipeline_trigeered_on_date: '2026-06-23'
- pipeline_trigeered_on_date: '2026-06-25'
failed_run_date: none
attempt: none
+240
View File
@@ -0,0 +1,240 @@
2026-06-24 15:55:46 | INFO | Pipeline Start Date: 2026-06-23
2026-06-24 15:55:46 | INFO | ================================================================================
2026-06-24 15:55:46 | INFO | Hello from data-move Python data pipeline!
2026-06-24 15:55:46 | INFO | Pipeline Run Date: 2026-06-23
2026-06-24 15:55:46 | INFO | Connecting to databases...
2026-06-24 15:55:48 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000002C8151D4730>
2026-06-24 15:55:52 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000002C8162F0B90>
2026-06-24 15:55:52 | INFO | Database connections established
2026-06-24 15:55:52 | INFO | Collecting MIDs for: 2026-06-23
2026-06-24 15:55:52 | INFO | Found 863 MIDs
2026-06-24 15:55:52 | WARNING | Pipeline failed. Retry 1/3. Error: 'tables'
2026-06-24 15:55:57 | INFO | ================================================================================
2026-06-24 15:55:57 | INFO | Hello from data-move Python data pipeline!
2026-06-24 15:55:57 | INFO | Pipeline Run Date: 2026-06-23
2026-06-24 15:55:57 | INFO | Connecting to databases...
2026-06-24 15:55:58 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000002C81600B550>
2026-06-24 15:55:58 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000002C8162F5A90>
2026-06-24 15:55:59 | INFO | Database connections established
2026-06-24 15:55:59 | INFO | Collecting MIDs for: 2026-06-23
2026-06-24 15:55:59 | INFO | Found 863 MIDs
2026-06-24 15:55:59 | WARNING | Pipeline failed. Retry 2/3. Error: 'tables'
2026-06-24 15:56:04 | INFO | ================================================================================
2026-06-24 15:56:04 | INFO | Hello from data-move Python data pipeline!
2026-06-24 15:56:04 | INFO | Pipeline Run Date: 2026-06-23
2026-06-24 15:56:04 | INFO | Connecting to databases...
2026-06-24 15:56:04 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000002C8162F59A0>
2026-06-24 15:56:05 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000002C8162F68A0>
2026-06-24 15:56:05 | INFO | Database connections established
2026-06-24 15:56:05 | INFO | Collecting MIDs for: 2026-06-23
2026-06-24 15:56:05 | INFO | Found 863 MIDs
2026-06-24 16:10:30 | INFO | Pipeline Start Date: 2026-06-23
2026-06-24 16:10:30 | INFO | ================================================================================
2026-06-24 16:10:30 | INFO | Hello from data-move Python data pipeline!
2026-06-24 16:10:30 | INFO | Pipeline Run Date: 2026-06-23
2026-06-24 16:10:30 | INFO | Connecting to databases...
2026-06-24 16:10:31 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001B29EE8C470>
2026-06-24 16:10:32 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001B29FEBCA50>
2026-06-24 16:10:32 | INFO | Database connections established
2026-06-24 16:10:32 | INFO | Collecting MIDs for: 2026-06-23
2026-06-24 16:10:33 | INFO | Found 863 MIDs
2026-06-24 16:10:33 | WARNING | Pipeline failed. Retry 1/3. Error: 'tables'
2026-06-24 16:10:38 | INFO | ================================================================================
2026-06-24 16:10:38 | INFO | Hello from data-move Python data pipeline!
2026-06-24 16:10:38 | INFO | Pipeline Run Date: 2026-06-23
2026-06-24 16:10:38 | INFO | Connecting to databases...
2026-06-24 16:10:38 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001B29FBB7550>
2026-06-24 16:10:39 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001B29FECD9A0>
2026-06-24 16:10:40 | INFO | Database connections established
2026-06-24 16:10:40 | INFO | Collecting MIDs for: 2026-06-23
2026-06-24 16:10:40 | INFO | Found 863 MIDs
2026-06-24 16:10:40 | WARNING | Pipeline failed. Retry 2/3. Error: 'tables'
2026-06-24 16:10:45 | INFO | ================================================================================
2026-06-24 16:10:45 | INFO | Hello from data-move Python data pipeline!
2026-06-24 16:10:45 | INFO | Pipeline Run Date: 2026-06-23
2026-06-24 16:10:45 | INFO | Connecting to databases...
2026-06-24 16:10:45 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001B29FECD8B0>
2026-06-24 16:10:46 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001B29FECE7B0>
2026-06-24 16:10:46 | INFO | Database connections established
2026-06-24 16:10:46 | INFO | Collecting MIDs for: 2026-06-23
2026-06-24 16:10:47 | INFO | Found 863 MIDs
2026-06-24 16:12:02 | INFO | Pipeline Start Date: 2026-06-23
2026-06-24 16:12:02 | INFO | ================================================================================
2026-06-24 16:12:02 | INFO | Hello from data-move Python data pipeline!
2026-06-24 16:12:02 | INFO | Pipeline Run Date: 2026-06-23
2026-06-24 16:12:02 | INFO | Connecting to databases...
2026-06-24 16:12:02 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x00000258DBAE05D0>
2026-06-24 16:12:03 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x00000258DCB8CA50>
2026-06-24 16:12:04 | INFO | Database connections established
2026-06-24 16:12:04 | INFO | Collecting MIDs for: 2026-06-23
2026-06-24 16:12:04 | INFO | Found 863 MIDs
2026-06-24 16:12:05 | INFO | Pipeline Run Date: 2026-06-23
2026-06-24 16:12:05 | INFO | ================================================================================
2026-06-24 16:12:05 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT
2026-06-24 16:12:05 | INFO | Fetching Data from sql server for table-: OQaD ..............
2026-06-24 16:12:05 | INFO | Exists: True
2026-06-24 16:12:05 | INFO | Path: D:\z\airflow-project\src\sql\fact\OQaD.sql
2026-06-24 16:12:05 | INFO | Fetching data for 863 EMPIDs
2026-06-24 16:12:05 | INFO | Fetching OQaD data for run_date=2026-06-23
2026-06-24 16:12:11 | INFO | fn name is fetch_OQad ------Fetched 458 rows
2026-06-24 16:12:11 | INFO | Fetched total row -: 458 from sql server for table-:OQaD ...........!!!
2026-06-24 16:12:11 | INFO | Fetched 458 rows
2026-06-24 16:12:11 | INFO | _ _ _ _ Deleting Data from ClickHouse for OQaD _ _ _ _
2026-06-24 16:12:11 | INFO | _ _ _ _Inserting data into clickhouse db from sql server_ _ _ _
2026-06-24 16:12:11 | ERROR | Failed processing table OQaD
Traceback (most recent call last):
File "D:\z\airflow-project\elt.py", line 243, in elt
load_to_clickhouse(
~~~~~~~~~~~~~~~~~~^
client=client,
^^^^^^^^^^^^^^
table_name=table_name,
^^^^^^^^^^^^^^^^^^^^^^
df=df,
^^^^^^
)
^
File "D:\z\airflow-project\clickhouse_task\load_table.py", line 28, in load_to_clickhouse
chunk_df = df.iloc[start:end]
^^^^^^^
AttributeError: 'DataFrame' object has no attribute 'iloc'
2026-06-24 16:12:11 | WARNING | Pipeline failed. Retry 1/3. Error: 'DataFrame' object has no attribute 'iloc'
2026-06-24 16:12:16 | INFO | ================================================================================
2026-06-24 16:12:16 | INFO | Hello from data-move Python data pipeline!
2026-06-24 16:12:16 | INFO | Pipeline Run Date: 2026-06-23
2026-06-24 16:12:16 | INFO | Connecting to databases...
2026-06-24 16:12:17 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x00000258DCB6C650>
2026-06-24 16:12:17 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x00000258DCB99A90>
2026-06-24 16:12:18 | INFO | Database connections established
2026-06-24 16:12:18 | INFO | Collecting MIDs for: 2026-06-23
2026-06-24 16:12:18 | INFO | Found 863 MIDs
2026-06-24 16:12:18 | INFO | Pipeline Run Date: 2026-06-23
2026-06-24 16:12:18 | INFO | ================================================================================
2026-06-24 16:12:18 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT
2026-06-24 16:12:18 | INFO | Fetching Data from sql server for table-: OQaD ..............
2026-06-24 16:12:18 | INFO | Exists: True
2026-06-24 16:12:18 | INFO | Path: D:\z\airflow-project\src\sql\fact\OQaD.sql
2026-06-24 16:12:18 | INFO | Fetching data for 863 EMPIDs
2026-06-24 16:12:18 | INFO | Fetching OQaD data for run_date=2026-06-23
2026-06-24 16:12:24 | INFO | fn name is fetch_OQad ------Fetched 458 rows
2026-06-24 16:12:24 | INFO | Fetched total row -: 458 from sql server for table-:OQaD ...........!!!
2026-06-24 16:12:24 | INFO | Fetched 458 rows
2026-06-24 16:12:24 | INFO | _ _ _ _ Deleting Data from ClickHouse for OQaD _ _ _ _
2026-06-24 16:12:24 | INFO | _ _ _ _Inserting data into clickhouse db from sql server_ _ _ _
2026-06-24 16:12:24 | ERROR | Failed processing table OQaD
Traceback (most recent call last):
File "D:\z\airflow-project\elt.py", line 243, in elt
load_to_clickhouse(
~~~~~~~~~~~~~~~~~~^
client=client,
^^^^^^^^^^^^^^
table_name=table_name,
^^^^^^^^^^^^^^^^^^^^^^
df=df,
^^^^^^
)
^
File "D:\z\airflow-project\clickhouse_task\load_table.py", line 28, in load_to_clickhouse
chunk_df = df.iloc[start:end]
^^^^^^^
AttributeError: 'DataFrame' object has no attribute 'iloc'
2026-06-24 16:12:24 | WARNING | Pipeline failed. Retry 2/3. Error: 'DataFrame' object has no attribute 'iloc'
2026-06-24 16:12:29 | INFO | ================================================================================
2026-06-24 16:12:29 | INFO | Hello from data-move Python data pipeline!
2026-06-24 16:12:29 | INFO | Pipeline Run Date: 2026-06-23
2026-06-24 16:12:29 | INFO | Connecting to databases...
2026-06-24 16:12:30 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x00000258DCB9A120>
2026-06-24 16:12:30 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x00000258DCB9A8A0>
2026-06-24 16:12:31 | INFO | Database connections established
2026-06-24 16:12:31 | INFO | Collecting MIDs for: 2026-06-23
2026-06-24 16:12:31 | INFO | Found 863 MIDs
2026-06-24 16:12:31 | INFO | Pipeline Run Date: 2026-06-23
2026-06-24 16:12:31 | INFO | ================================================================================
2026-06-24 16:12:31 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT
2026-06-24 16:12:31 | INFO | Fetching Data from sql server for table-: OQaD ..............
2026-06-24 16:12:31 | INFO | Exists: True
2026-06-24 16:12:31 | INFO | Path: D:\z\airflow-project\src\sql\fact\OQaD.sql
2026-06-24 16:12:31 | INFO | Fetching data for 863 EMPIDs
2026-06-24 16:12:31 | INFO | Fetching OQaD data for run_date=2026-06-23
2026-06-24 16:12:43 | INFO | fn name is fetch_OQad ------Fetched 458 rows
2026-06-24 16:12:43 | INFO | Fetched total row -: 458 from sql server for table-:OQaD ...........!!!
2026-06-24 16:12:43 | INFO | Fetched 458 rows
2026-06-24 16:12:43 | INFO | _ _ _ _ Deleting Data from ClickHouse for OQaD _ _ _ _
2026-06-24 16:12:43 | INFO | _ _ _ _Inserting data into clickhouse db from sql server_ _ _ _
2026-06-24 16:12:43 | ERROR | Failed processing table OQaD
Traceback (most recent call last):
File "D:\z\airflow-project\elt.py", line 243, in elt
load_to_clickhouse(
~~~~~~~~~~~~~~~~~~^
client=client,
^^^^^^^^^^^^^^
table_name=table_name,
^^^^^^^^^^^^^^^^^^^^^^
df=df,
^^^^^^
)
^
File "D:\z\airflow-project\clickhouse_task\load_table.py", line 28, in load_to_clickhouse
chunk_df = df.iloc[start:end]
^^^^^^^
AttributeError: 'DataFrame' object has no attribute 'iloc'
2026-06-24 16:17:01 | INFO | Pipeline Start Date: 2026-06-23
2026-06-24 16:17:01 | INFO | ================================================================================
2026-06-24 16:17:01 | INFO | Hello from data-move Python data pipeline!
2026-06-24 16:17:01 | INFO | Pipeline Run Date: 2026-06-23
2026-06-24 16:17:01 | INFO | Connecting to databases...
2026-06-24 16:17:02 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x0000018D3B1F05D0>
2026-06-24 16:17:03 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x0000018D3C29CA50>
2026-06-24 16:17:04 | INFO | Database connections established
2026-06-24 16:17:04 | INFO | Collecting MIDs for: 2026-06-23
2026-06-24 16:17:04 | INFO | Found 863 MIDs
2026-06-24 16:17:04 | INFO | Pipeline Run Date: 2026-06-23
2026-06-24 16:17:04 | INFO | ================================================================================
2026-06-24 16:17:04 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT
2026-06-24 16:17:04 | INFO | Fetching Data from sql server for table-: OQaD ..............
2026-06-24 16:17:04 | INFO | Exists: True
2026-06-24 16:17:04 | INFO | Path: D:\z\airflow-project\src\sql\fact\OQaD.sql
2026-06-24 16:17:04 | INFO | Fetching data for 863 EMPIDs
2026-06-24 16:17:04 | INFO | Fetching OQaD data for run_date=2026-06-23
2026-06-24 16:17:10 | INFO | fn name is fetch_OQad ------Fetched 458 rows
2026-06-24 16:17:10 | INFO | Fetched total row -: 458 from sql server for table-:OQaD ...........!!!
2026-06-24 16:17:10 | INFO | Fetched 458 rows
2026-06-24 16:17:10 | INFO | _ _ _ _ Deleting Data from ClickHouse for OQaD _ _ _ _
2026-06-24 16:17:11 | INFO | _ _ _ _Inserting data into clickhouse db from sql server_ _ _ _
2026-06-24 16:17:11 | INFO | Inserted rows 0 to 458
2026-06-24 16:17:11 | INFO | OQaD: inserted 458 rows into ClickHouse
2026-06-24 16:17:11 | INFO | OQaD loaded successfully (458 rows)
2026-06-24 16:17:11 | INFO | ================================================================================
2026-06-24 16:17:11 | INFO | Pipeline Completed Successfully
2026-06-24 16:17:11 | INFO | ================================================================================
2026-06-24 16:17:11 | INFO | Pipeline completed successfully. pipeline_trigeered_on_date=2026-06-24last_successful_run_date=2026-06-23
2026-06-24 16:17:31 | INFO | Pipeline Start Date: 2026-06-23
2026-06-24 16:17:31 | INFO | ================================================================================
2026-06-24 16:17:31 | INFO | Hello from data-move Python data pipeline!
2026-06-24 16:17:31 | INFO | Pipeline Run Date: 2026-06-23
2026-06-24 16:17:31 | INFO | Connecting to databases...
2026-06-24 16:17:31 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x0000022F667F05D0>
2026-06-24 16:17:33 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x0000022F678CCA50>
2026-06-24 16:17:33 | INFO | Database connections established
2026-06-24 16:17:33 | INFO | Collecting MIDs for: 2026-06-23
2026-06-24 16:17:34 | INFO | Found 863 MIDs
2026-06-24 16:17:34 | INFO | Pipeline Run Date: 2026-06-23
2026-06-24 16:17:34 | INFO | ================================================================================
2026-06-24 16:17:34 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT
2026-06-24 16:17:34 | INFO | Fetching Data from sql server for table-: OQaD ..............
2026-06-24 16:17:34 | INFO | Exists: True
2026-06-24 16:17:34 | INFO | Path: D:\z\airflow-project\src\sql\fact\OQaD.sql
2026-06-24 16:17:34 | INFO | Fetching data for 863 EMPIDs
2026-06-24 16:17:34 | INFO | Fetching OQaD data for run_date=2026-06-23
2026-06-24 16:17:40 | INFO | fn name is fetch_OQad ------Fetched 458 rows
2026-06-24 16:17:40 | INFO | Fetched total row -: 458 from sql server for table-:OQaD ...........!!!
2026-06-24 16:17:40 | INFO | Fetched 458 rows
2026-06-24 16:17:40 | INFO | _ _ _ _ Deleting Data from ClickHouse for OQaD _ _ _ _
2026-06-24 16:17:41 | INFO | _ _ _ _Inserting data into clickhouse db from sql server_ _ _ _
2026-06-24 16:17:41 | INFO | Inserted rows 0 to 458
2026-06-24 16:17:41 | INFO | OQaD: inserted 458 rows into ClickHouse
2026-06-24 16:17:41 | INFO | OQaD loaded successfully (458 rows)
2026-06-24 16:17:41 | INFO | ================================================================================
2026-06-24 16:17:41 | INFO | Pipeline Completed Successfully
2026-06-24 16:17:41 | INFO | ================================================================================
2026-06-24 16:17:41 | INFO | Pipeline completed successfully. pipeline_trigeered_on_date=2026-06-24last_successful_run_date=2026-06-23
+90
View File
@@ -0,0 +1,90 @@
2026-06-25 10:23:52 | INFO | Pipeline Start Date: 2026-06-24
2026-06-25 10:23:52 | INFO | ================================================================================
2026-06-25 10:23:52 | INFO | Hello from data-move Python data pipeline!
2026-06-25 10:23:52 | INFO | Pipeline Run Date: 2026-06-24
2026-06-25 10:23:52 | INFO | Connecting to databases...
2026-06-25 10:23:54 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001D3489C90D0>
2026-06-25 10:23:57 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001D349ACA490>
2026-06-25 10:23:58 | INFO | Database connections established
2026-06-25 10:23:58 | INFO | Collecting MIDs for: 2026-06-24
2026-06-25 10:23:58 | INFO | Found 868 MIDs
2026-06-25 10:23:59 | INFO | Pipeline Run Date: 2026-06-24
2026-06-25 10:23:59 | INFO | ================================================================================
2026-06-25 10:23:59 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT
2026-06-25 10:23:59 | INFO | Fetching Data from sql server for table-: OQaD ..............
2026-06-25 10:23:59 | ERROR | Failed processing table OQaD
Traceback (most recent call last):
File "D:\z\airflow-project\elt2.py", line 172, in elt
df=registry[f"fetch_{table_name}"]()(sql_engine, table_name , table_type, mids, run_date)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^
TypeError: fetch_OQaD() missing 5 required positional arguments: 'engine', 'table_name', 'table_type', 'empids', and 'run_date'
2026-06-25 10:23:59 | WARNING | Pipeline failed. Retry 1/3. Error: fetch_OQaD() missing 5 required positional arguments: 'engine', 'table_name', 'table_type', 'empids', and 'run_date'
2026-06-25 10:24:04 | INFO | ================================================================================
2026-06-25 10:24:04 | INFO | Hello from data-move Python data pipeline!
2026-06-25 10:24:04 | INFO | Pipeline Run Date: 2026-06-24
2026-06-25 10:24:04 | INFO | Connecting to databases...
2026-06-25 10:24:05 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001D349AC3950>
2026-06-25 10:24:05 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001D349B68E60>
2026-06-25 10:24:06 | INFO | Database connections established
2026-06-25 10:24:06 | INFO | Collecting MIDs for: 2026-06-24
2026-06-25 10:24:06 | INFO | Found 868 MIDs
2026-06-25 10:24:06 | INFO | Pipeline Run Date: 2026-06-24
2026-06-25 10:24:06 | INFO | ================================================================================
2026-06-25 10:24:06 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT
2026-06-25 10:24:06 | INFO | Fetching Data from sql server for table-: OQaD ..............
2026-06-25 10:24:06 | ERROR | Failed processing table OQaD
Traceback (most recent call last):
File "D:\z\airflow-project\elt2.py", line 172, in elt
df=registry[f"fetch_{table_name}"]()(sql_engine, table_name , table_type, mids, run_date)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^
TypeError: fetch_OQaD() missing 5 required positional arguments: 'engine', 'table_name', 'table_type', 'empids', and 'run_date'
2026-06-25 10:24:06 | WARNING | Pipeline failed. Retry 2/3. Error: fetch_OQaD() missing 5 required positional arguments: 'engine', 'table_name', 'table_type', 'empids', and 'run_date'
2026-06-25 10:24:11 | INFO | ================================================================================
2026-06-25 10:24:11 | INFO | Hello from data-move Python data pipeline!
2026-06-25 10:24:11 | INFO | Pipeline Run Date: 2026-06-24
2026-06-25 10:24:11 | INFO | Connecting to databases...
2026-06-25 10:24:12 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001D349B68D70>
2026-06-25 10:24:12 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001D349B69C70>
2026-06-25 10:24:12 | INFO | Database connections established
2026-06-25 10:24:12 | INFO | Collecting MIDs for: 2026-06-24
2026-06-25 10:24:13 | INFO | Found 868 MIDs
2026-06-25 10:24:13 | INFO | Pipeline Run Date: 2026-06-24
2026-06-25 10:24:13 | INFO | ================================================================================
2026-06-25 10:24:13 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT
2026-06-25 10:24:13 | INFO | Fetching Data from sql server for table-: OQaD ..............
2026-06-25 10:24:13 | ERROR | Failed processing table OQaD
Traceback (most recent call last):
File "D:\z\airflow-project\elt2.py", line 172, in elt
df=registry[f"fetch_{table_name}"]()(sql_engine, table_name , table_type, mids, run_date)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^
TypeError: fetch_OQaD() missing 5 required positional arguments: 'engine', 'table_name', 'table_type', 'empids', and 'run_date'
2026-06-25 11:19:10 | INFO | Pipeline Start Date: 2026-06-24
2026-06-25 11:19:10 | INFO | ================================================================================
2026-06-25 11:19:10 | INFO | Hello from data-move Python data pipeline!
2026-06-25 11:19:10 | INFO | Pipeline Run Date: 2026-06-24
2026-06-25 11:19:10 | INFO | Connecting to databases...
2026-06-25 11:19:11 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000002B17CBD0890>
2026-06-25 11:19:13 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000002B17DCE0910>
2026-06-25 11:19:14 | INFO | Database connections established
2026-06-25 11:19:14 | INFO | Collecting MIDs for: 2026-06-24
2026-06-25 11:19:14 | INFO | Found 868 MIDs
2026-06-25 11:19:15 | INFO | Pipeline Run Date: 2026-06-24
2026-06-25 11:19:15 | INFO | ================================================================================
2026-06-25 11:19:15 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT
2026-06-25 11:19:15 | INFO | Fetching Data from sql server for table-: OQaD ..............
2026-06-25 11:19:15 | INFO | Exists: True
2026-06-25 11:19:15 | INFO | Path: D:\z\airflow-project\src\sql\fact\OQaD.sql
2026-06-25 11:19:15 | INFO | Fetching data for 868 EMPIDs
2026-06-25 11:19:15 | INFO | Fetching OQaD data for run_date=2026-06-24
2026-06-25 11:19:21 | INFO | fn name is fetch_OQad ------Fetched 469 rows
2026-06-25 11:19:21 | INFO | Fetched total row -: 469 from sql server for table-:OQaD ...........!!!
2026-06-25 11:19:21 | INFO | Fetched 469 rows
2026-06-25 11:19:21 | INFO | _ _ _ _ Deleting Data from ClickHouse for OQaD _ _ _ _
2026-06-25 11:19:21 | INFO | _ _ _ _Inserting data into clickhouse db from sql server_ _ _ _
2026-06-25 11:19:21 | INFO | Inserted rows 0 to 469
2026-06-25 11:19:21 | INFO | OQaD: inserted 469 rows into ClickHouse
2026-06-25 11:19:21 | INFO | OQaD loaded successfully (469 rows)
2026-06-25 11:19:21 | INFO | ================================================================================
2026-06-25 11:19:21 | INFO | Pipeline Completed Successfully
2026-06-25 11:19:21 | INFO | ================================================================================
2026-06-25 11:19:21 | INFO | Pipeline completed successfully. pipeline_trigeered_on_date=2026-06-25last_successful_run_date=2026-06-24
+5 -224
View File
@@ -46,113 +46,13 @@ from mids import (
from src.bridge import *
from src.fact import *
from src.dim import *
# ==========================================================
# Helpers
# ==========================================================
def get_dates_from_yaml(filename: str):
with open(filename, "r") as file:
data = yaml.safe_load(file)
start_date = date.fromisoformat(
str(data["pipeline"]["start_date"])
)
end_date = date.fromisoformat(
str(data["pipeline"]["end_date"])
)
flag=str(data["pipeline"]["flag"])
return start_date, end_date , flag
def write_table_to_yaml(
data: dict,
run_date: date,
filename: str | None = None
):
"""Write table data to a YAML file."""
if filename is None:
filename = f"elt_pipeline_{run_date}.yml"
with open(filename, "w") as file:
yaml.dump(
data,
file,
default_flow_style=False,
sort_keys=False
)
print(f"Table written to {filename}")
from elt import *
def table_exists(
client,
table_name: str,
) -> bool:
return bool(
client.command(
f"EXISTS TABLE {table_name}"
)
)
# ==========================================================
# Main
# ==========================================================
def elt(run_date : date):
log.info("=" * 80)
log.info("Hello from data-move Python data pipeline!")
# ------------------------------------------------------
# Run Date
# ------------------------------------------------------
log.info(
"Pipeline Run Date: %s",
run_date,
)
# ------------------------------------------------------
# Connections
# ------------------------------------------------------
log.info(
"Connecting to databases..."
)
sql_engine = build_sql_server_engine()
clickhouse_engine = build_clickhouse_engine()
client = get_clickhouse_client()
log.info(
"Database connections established"
)
# ------------------------------------------------------
# mids Keys
# ------------------------------------------------------
mids = MID_TABLE_COV(
sql_engine,
run_date,
)
emp_visit_df = MID_TABLE_COV1(
sql_engine,
run_date,
)
def main() :
# ------------------------------------------------------
# Config
@@ -163,128 +63,8 @@ def elt(run_date : date):
"r",
) as file:
config = yaml.safe_load(file)
table_config = yaml.safe_load(file)
# ------------------------------------------------------
# Process Tables
# ------------------------------------------------------
for table in config["tables"]:
table_name = table["name"]
operation = table["operation"]
fetch_by = table["fetch_by"]
table_type=table["type"]
log.info("=" * 80)
log.info(f"Processing Table-:{table_name} | Table type -:{table_type} | fetcht by-:{fetch_by} | operation-:{operation}" )
try:
# ------------------------------------------
# Fetch Data
# ------------------------------------------
log.info(f"Fetching Data from sql server for table-: {table_name} ..............")
fetch_list=["mids" ,"run_date", "reason_id"]
if fetch_by in fetch_list :
fn_name = f"fetch_{table_name}"
fn = globals()[fn_name]
df=fn(sql_engine, table_name , table_type, mids, run_date)
else:
df = fetch_data(sql_engine ,table_name,table_type)
log.info(f"Fetched total row -: {len(df)} from sql server for table-:{table_name} ...........!!!")
if df.is_empty():
log.warning(
"%s returned no rows",
table_name,
)
continue
log.info(
"Fetched %s rows",
len(df),
)
# ------------------------------------------
# Create Table If Missing
# ------------------------------------------
exists = table_exists(
client,
table_name,
)
if not exists:
log.info(
"Creating table %s",
table_name,
)
create_clickhouse_table(
df=df,
table_name=table_name,
clickhouse_engine=clickhouse_engine,
)
# ------------------------------------------
# Existing Table Logic
# ------------------------------------------
else:
if operation == "DELETE+INSERT":
truncate_table(
client,
table_name,
)
else:
delete_existing_data(
client=client,
table_name=table_name,
run_date=run_date,
mids=mids,
emp_visit_df=emp_visit_df,
)
# ------------------------------------------
# Load Data
# ------------------------------------------
log.info("_ _ _ _Inserting data into clickhouse db from sql server_ _ _ _")
load_to_clickhouse(
client=client,
table_name=table_name,
df=df,
)
log.info(
"%s loaded successfully (%s rows)",
table_name,
len(df),
)
except Exception:
log.exception(
"Failed processing table %s",
table_name,
)
raise
log.info("=" * 80)
log.info("Pipeline Completed Successfully")
log.info("=" * 80)
def main() :
config_file = Path("Pipeline_config.yml")
@@ -336,7 +116,7 @@ def main() :
for attempt in range(3):
try:
elt(run_date)
elt(run_date , table_config)
successful_dates.append({
'pipeline_trigeered_on_date': str(date.today()),
@@ -395,6 +175,7 @@ def main() :
sort_keys=False)
if __name__ == "__main__":
main()
+181
View File
@@ -0,0 +1,181 @@
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "polars>=0.20.0",
# "pyarrow>=18.0.0",
# "sqlalchemy>=2.0.0",
# "pyodbc>=5.0.0",
# "clickhouse-connect>=0.7.0",
# "clickhouse-sqlalchemy>=0.3.2",
# "pyyaml>=6.0.3",
# "python-dotenv>=1.0.0",
# ]
# ///
from __future__ import annotations
from time import sleep
import sys
from datetime import date, datetime, timedelta
import polars as pl
import yaml
from log import log
from clickhouse_task.create_table import create_clickhouse_table
from clickhouse_task.delete_task import (
delete_existing_data,
truncate_table,
)
from clickhouse_task.load_table import load_to_clickhouse
from db_con.connection import (
build_sql_server_engine,
build_clickhouse_engine,
get_clickhouse_client,
)
from mids import (
MID_TABLE_COV,
MID_TABLE_COV1,
)
from src.bridge import *
from src.fact_updated import *
from src.dim import *
from elt2 import *
def main() :
# ------------------------------------------------------
# Config
# ------------------------------------------------------
with open(
"y.yml",
"r",
) as file:
table_config = yaml.safe_load(file)
config_file = Path("Pipeline_config.yml")
if not config_file.exists():
default_config = {
"pipeline": {
"run_date": None,
"status": None,
"error_message": None,
}
}
with open(config_file, "w") as f:
yaml.safe_dump(default_config, f)
with open(config_file, "r") as f:
config = yaml.safe_load(f)
p_start_date, p_end_date , flag= get_dates_from_yaml("elt_pipeline_custom_dates.yml")
if flag =="Y" :
start_date=p_start_date
end_date=p_end_date
elif len(sys.argv) > 1:
start_date = datetime.strptime(
sys.argv[1],
"%Y-%m-%d",
).date()
end_date=start_date + timedelta(days=1)
else:
start_date = date.today() - timedelta(days=1)
end_date=start_date
log.info(
"Pipeline Start Date: %s",
start_date,
)
failed_dates=[]
successful_dates=[]
filename_successful = "successful_Pipeline_dates_config.yml"
filename_failed = "failed_Pipeline_dates_config.yml"
while start_date <=end_date:
run_date = start_date
for attempt in range(3):
try:
elt(run_date , table_config)
successful_dates.append({
'pipeline_trigeered_on_date': str(date.today()),
'last_successful_run_date': run_date,
})
log.info(
f"Pipeline completed successfully. "
f"pipeline_trigeered_on_date={date.today()}"
f"last_successful_run_date={run_date}"
)
break
except Exception as e:
failed_dates.append({
'pipeline_trigeered_on_date': str(date.today()),
'failed_run_date': run_date,
"attempt" : attempt
})
if attempt == 2:
raise
log.warning(
f"Pipeline failed. Retry {attempt + 1}/3. Error: {e}"
)
sleep(5)
start_date=start_date + timedelta(days=1)
with open(filename_successful, "w") as f:
yaml.dump(
successful_dates,
f,
default_flow_style=False,
sort_keys=False,
)
if len(failed_dates) == 0 :
failed_dates.append({
'pipeline_trigeered_on_date': str(date.today()),
'failed_run_date': "none",
"attempt" : "none"
})
with open(filename_failed, "w") as f:
yaml.dump(failed_dates,
f, default_flow_style=False,
sort_keys=False)
if __name__ == "__main__":
main()
+332 -332
View File
@@ -135,175 +135,6 @@ def fetch_additional_visibility( engine: Engine,
def fetch_OQaD(
sql_engine: Engine,
table_name: str,
table_type: str,
mids: list[int],
run_date: date
) -> pl.DataFrame:
# ─────────────────────────────────────────────
# INNER HELPERS (defined once, used below)
# ─────────────────────────────────────────────
client = get_clickhouse_client()
# ── Does a ClickHouse table exist? ────────────
def table_exists(client, table_name: str) -> bool:
return bool(client.command(f"EXISTS TABLE {table_name}"))
# ── STEP 1: Who submitted yesterday in SQL Server? ───
def fetch_quiz_empids(engine: Engine, run_date: date) -> pl.DataFrame:
# Format date ONCE safely — avoids f-string injection bugs
run_date_str = run_date.strftime("%Y-%m-%d")
next_date_str = (run_date + timedelta(days=1)).strftime("%Y-%m-%d")
sql = f"""
WITH MID_TABLE_COV1 AS
(
-- Records CREATED yesterday
SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate
FROM OneApp_KelloggsMT.dbo.T_OQAD
WHERE CreateDate >= '{run_date_str}'
AND CreateDate < '{next_date_str}'
UNION ALL
-- Records UPDATED yesterday (different rows, safe to UNION ALL)
SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate
FROM OneApp_KelloggsMT.dbo.T_OQAD
WHERE UpdateDate >= '{run_date_str}'
AND UpdateDate < '{next_date_str}'
),
QUIZ AS
(
SELECT DISTINCT
E.EmpId AS empid,
CAST(DQ.VisitDate AS DATE) AS visitdate
FROM OneApp_KelloggsMT.dbo.T_OQAD DQ
INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail E
ON DQ.EmpId = E.EmpId
INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Question QU
ON DQ.QuestionId = QU.QuestionId
INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Category QC
ON QU.QuestionCategoryId = QC.QuestionCategoryId
WHERE E.EmpName NOT LIKE '%TEST%' -- exclude test employees
AND E.RightId = 6 -- only field reps
AND (
E.ResignDate IS NULL
OR CAST(E.ResignDate AS DATE) >= '{run_date_str}'
)
AND EXISTS ( -- ✅ EXISTS beats IN for large sets
SELECT 1
FROM MID_TABLE_COV1 A
WHERE A.EmpId = DQ.EmpId
AND A.VisitDate = CAST(DQ.VisitDate AS DATE)
)
)
SELECT * FROM QUIZ
"""
log.info("Fetching quiz empids for run_date=%s", run_date_str)
df = pl.read_database(query=sql, connection=engine)
log.info("Fetched %s (EmpId, VisitDate) pairs from SQL Server", len(df))
return df
# ── STEP 2: Who do we ALREADY have in ClickHouse? ───
def get_empids_clickhouse_OQAD(
client,
table_name: str = "OQaD",
) -> pl.DataFrame:
if not table_exists(client, table_name):
log.warning("Table '%s' does not exist in ClickHouse.", table_name)
return pl.DataFrame(schema={"empid": pl.Int64, "visitdate": pl.Date})
query = f"""
SELECT DISTINCT
employee_id AS empid,
visit_date AS visitdate
FROM {table_name}
"""
arrow_table = client.query_arrow(query)
df = pl.from_arrow(arrow_table)
log.info("Fetched %s existing (EmpId, VisitDate) pairs from ClickHouse", len(df))
return df
# ── STEP 3: Who is NEW? (in SQL Server but NOT yet in ClickHouse) ───
def find_new_empids(
sql_df: pl.DataFrame,
ch_df: pl.DataFrame,
) -> list[int]:
new_df = sql_df.join(
ch_df,
on=["empid", "visitdate"],
how="anti", # ✅ anti = keep rows NOT found in ch_df
)
if new_df.is_empty():
log.warning("No new EmpIds found for table=%s — nothing to fetch.", table_name)
return [0] # sentinel value — the .sql WHERE will return 0 rows safely
empids = new_df["empid"].unique().to_list()
log.info("Found %s NEW empids to fetch for %s", len(empids), table_name)
return empids
# ── STEP 4: Fetch full quiz data for new empids ───
def fetch_data(
engine: Engine,
table_name: str,
table_type: str,
empids: list[int],
run_date: date,
) -> pl.DataFrame:
run_date_str = run_date.strftime("%Y-%m-%d")
empid_list = ", ".join(str(e) for e in empids) # "101, 102, 103"
sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql"
log.info("Loading SQL from: %s (exists=%s)", sql_file.resolve(), sql_file.exists())
with open(sql_file, "r", encoding="utf-8") as f:
sql_template = f.read()
sql = sql_template.format(
empid_list=empid_list,
run_date=run_date_str,
)
log.info("Fetching full OQaD data for %s empids, run_date=%s", len(empids), run_date_str)
df = pl.read_database(query=sql, connection=engine)
log.info("Fetched %s rows from SQL Server for table=%s", len(df), table_name)
return df
# ─────────────────────────────────────────────
# MAIN FLOW (the 4 steps, clearly sequenced)
# ─────────────────────────────────────────────
qf = fetch_quiz_empids(sql_engine, run_date) # Step 1
db_df = get_empids_clickhouse_OQAD(client, table_name) # Step 2
empids = find_new_empids(qf, db_df) # Step 3
df = fetch_data( # Step 4
engine=sql_engine,
table_name=table_name,
table_type=table_type,
empids=empids,
run_date=run_date,
)
log.info("fetch_OQaD complete — returning %s rows", len(df))
return df
# def fetch_OQaD(
# sql_engine: Engine,
# table_name: str,
@@ -312,205 +143,374 @@ def fetch_OQaD(
# run_date: date
# ) -> pl.DataFrame:
# # ─────────────────────────────────────────────
# # INNER HELPERS (defined once, used below)
# # ─────────────────────────────────────────────
# client= get_clickhouse_client()
# def table_exists(
# client,
# table_name: str,
# ) -> bool:
# client = get_clickhouse_client()
# return bool(
# client.command(
# f"EXISTS TABLE {table_name}"
# )
# )
# # ── Does a ClickHouse table exist? ────────────
# def table_exists(client, table_name: str) -> bool:
# return bool(client.command(f"EXISTS TABLE {table_name}"))
# # ── STEP 1: Who submitted yesterday in SQL Server? ───
# def fetch_quiz_empids(engine: Engine, run_date: date) -> pl.DataFrame:
# # Format date ONCE safely — avoids f-string injection bugs
# run_date_str = run_date.strftime("%Y-%m-%d")
# next_date_str = (run_date + timedelta(days=1)).strftime("%Y-%m-%d")
# def fetch_quiz_empids(engine: Engine, run_date : date) -> pl.DataFrame:
# sql_template = f"""
# WITH MID_TABLE_COV1 AS
# sql = f"""
# WITH MID_TABLE_COV1 AS
# (
# SELECT EmpId, VisitDate
# FROM OneApp_KelloggsMT.dbo.T_OQAD
# WHERE CreateDate >= {run_date}
# AND CreateDate < DATEADD(DAY,1,'{run_date}')
# -- Records CREATED yesterday
# SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate
# FROM OneApp_KelloggsMT.dbo.T_OQAD
# WHERE CreateDate >= '{run_date_str}'
# AND CreateDate < '{next_date_str}'
# UNION
# UNION ALL
# SELECT EmpId, VisitDate
# FROM OneApp_KelloggsMT.dbo.T_OQAD
# WHERE UpdateDate >= {run_date}
# AND UpdateDate < DATEADD(DAY,1, '{run_date}')
# -- Records UPDATED yesterday (different rows, safe to UNION ALL)
# SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate
# FROM OneApp_KelloggsMT.dbo.T_OQAD
# WHERE UpdateDate >= '{run_date_str}'
# AND UpdateDate < '{next_date_str}'
# ),
# QUIZ AS
# (
# SELECT Distinct E.EmpId as empid
# , CONVERT(date,DQ.VisitDate) AS visitdate
# FROM OneApp_KelloggsMT.dbo.T_OQAD DQ INNER JOIN
# OneApp_KelloggsMT.dbo.vw_Employee_Detail E ON DQ.EmpId = E.EmpId inner join
# OneApp_KelloggsMT.dbo.Master_OQAD_Question QU on DQ.QuestionId= qu.QuestionId inner join
# OneApp_KelloggsMT.dbo.Master_OQAD_Category qc on qu.QuestionCategoryId= qc.QuestionCategoryId
# where e.EmpName not like 'test%' and e.RightId in (6)
# and (E.ResignDate is null or E.ResignDate>=''+CONVERT(VARCHAR,'{run_date}')+'') AND E.EmpName NOT LIKE '%TEST%'
# AND DQ.EmpId IN (SELECT EmpId FROM MID_TABLE_COV1 A WHERE
# DQ.EmpId=A.EmpId AND CONVERT(date,VisitDate)=CONVERT(date,A.VisitDate) )
# ) select * from quiz
# """
# sql = sql_template.format(
# run_date=run_date.strftime("%Y-%m-%d")
# )
# log.info(f"Fetching quiz_empids data for EMPID and Visitid")
# df = pl.read_database(
# query=sql,
# connection=engine
# )
# log.info(f"Fetched {len(df):,} total empid and visitdate fetched for OQAD from SQL Server")
# return df
# def get_empids_clickhouse_OQAD(
# client,
# table_name: str = "OQaD",
# ) -> pl.DataFrame:
# if not table_exists(client, table_name):
# log.warning(f"Table '{table_name}' does not exist.")
# return pl.DataFrame(
# schema={
# "empid": pl.Int64,
# "visitdate": pl.Date,
# }
# )
# query = f"""
# SELECT DISTINCT
# employee_id AS empid,
# visit_date AS visitdate
# FROM {table_name}
# """
# E.EmpId AS empid,
# CAST(DQ.VisitDate AS DATE) AS visitdate
# FROM OneApp_KelloggsMT.dbo.T_OQAD DQ
# INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail E
# ON DQ.EmpId = E.EmpId
# INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Question QU
# ON DQ.QuestionId = QU.QuestionId
# INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Category QC
# ON QU.QuestionCategoryId = QC.QuestionCategoryId
# WHERE E.EmpName NOT LIKE '%TEST%' -- exclude test employees
# AND E.RightId = 6 -- only field reps
# AND (
# E.ResignDate IS NULL
# OR CAST(E.ResignDate AS DATE) >= '{run_date_str}'
# )
# AND EXISTS ( -- ✅ EXISTS beats IN for large sets
# SELECT 1
# FROM MID_TABLE_COV1 A
# WHERE A.EmpId = DQ.EmpId
# AND A.VisitDate = CAST(DQ.VisitDate AS DATE)
# )
# )
# SELECT * FROM QUIZ
# """
# # ClickHouse -> PyArrow -> Polars
# arrow_table = client.query_arrow(query)
# log.info("Fetching quiz empids for run_date=%s", run_date_str)
# df = pl.read_database(query=sql, connection=engine)
# log.info("Fetched %s (EmpId, VisitDate) pairs from SQL Server", len(df))
# return df
# return pl.from_arrow(arrow_table)
# # ── STEP 2: Who do we ALREADY have in ClickHouse? ───
# def get_empids_clickhouse_OQAD(
# client,
# table_name: str = "OQaD",
# ) -> pl.DataFrame:
# if not table_exists(client, table_name):
# log.warning("Table '%s' does not exist in ClickHouse.", table_name)
# return pl.DataFrame(schema={"empid": pl.Int64, "visitdate": pl.Date})
# qf=fetch_quiz_empids(sql_engine,run_date)
# db_df = get_empids_clickhouse_OQAD(client)
# query = f"""
# SELECT DISTINCT
# employee_id AS empid,
# visit_date AS visitdate
# FROM {table_name}
# """
# matched = qf.join(
# db_df,
# arrow_table = client.query_arrow(query)
# df = pl.from_arrow(arrow_table)
# log.info("Fetched %s existing (EmpId, VisitDate) pairs from ClickHouse", len(df))
# return df
# # ── STEP 3: Who is NEW? (in SQL Server but NOT yet in ClickHouse) ───
# def find_new_empids(
# sql_df: pl.DataFrame,
# ch_df: pl.DataFrame,
# ) -> list[int]:
# new_df = sql_df.join(
# ch_df,
# on=["empid", "visitdate"],
# how="inner",
# how="anti", # ✅ anti = keep rows NOT found in ch_df
# )
# if matched.is_empty():
# if new_df.is_empty():
# log.warning("No new EmpIds found for table=%s — nothing to fetch.", table_name)
# return [0] # sentinel value — the .sql WHERE will return 0 rows safely
# empids=[0]
# log.warning(
# "%s Matched df in OQaD returned no rows",
# table_name,
# )
# empids = new_df["empid"].unique().to_list()
# log.info("Found %s NEW empids to fetch for %s", len(empids), table_name)
# return empids
# else:
# empids=matched["empid"].to_list()
# # ── STEP 4: Fetch full quiz data for new empids ───
# def fetch_data(
# engine: Engine,
# table_name: str,
# table_type: str,
# empids: list[int],
# run_date: date,
# ) -> pl.DataFrame:
# log.info(f"Fetched {len(empids):,} matched empids fetched for OQAD ")
# run_date_str = run_date.strftime("%Y-%m-%d")
# empid_list = ", ".join(str(e) for e in empids) # "101, 102, 103"
# def fetch_data(
# engine: Engine,
# table_name: str,
# table_type: str,
# empids: list[int],
# run_date: date
# ) -> pl.DataFrame:
# sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql"
# log.info("Loading SQL from: %s (exists=%s)", sql_file.resolve(), sql_file.exists())
# empid_list = ",".join(str(empid) for empid in empids)
# with open(sql_file, "r", encoding="utf-8") as f:
# sql_template = f.read()
# sql = sql_template.format(
# empid_list=empid_list,
# run_date=run_date_str,
# )
# sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql"
# log.info("Fetching full OQaD data for %s empids, run_date=%s", len(empids), run_date_str)
# df = pl.read_database(query=sql, connection=engine)
# log.info("Fetched %s rows from SQL Server for table=%s", len(df), table_name)
# return df
# log.info(f"Exists: {sql_file.exists()}")
# log.info(f"Path: {sql_file.resolve()}")
# # ─────────────────────────────────────────────
# # MAIN FLOW (the 4 steps, clearly sequenced)
# # ─────────────────────────────────────────────
# with open(sql_file, "r", encoding="utf-8") as f:
# sql_template = f.read()
# qf = fetch_quiz_empids(sql_engine, run_date) # Step 1
# db_df = get_empids_clickhouse_OQAD(client, table_name) # Step 2
# empids = find_new_empids(qf, db_df) # Step 3
# sql = sql_template.format(
# empid_list=empid_list,
# run_date=run_date.strftime("%Y-%m-%d")
# )
# log.info(f"Fetching data for {len(empids):,} EMPIDs")
# log.info("Fetching OQaD data for run_date=%s", run_date)
# df = pl.read_database(
# query=sql,
# connection=engine,
# )
# log.info("fn name is fetch_OQad ------Fetched %s rows", len(df))
# return df
# df=fetch_data( engine=sql_engine,
# table_name=table_name,
# table_type=table_type,
# empids=empids,
# run_date=run_date
# )
# log.info(f"Fetched {len(df):,} rows from SQL Server")
# return df
# def fetch_OQaD(
# engine: Engine,
# table_name: str,
# table_type: str,
# empids: list[int],
# run_date: date
# ) -> pl.DataFrame:
# empid_list = ",".join(str(empid) for empid in empids)
# sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql"
# log.info(f"Exists: {sql_file.exists()}")
# log.info(f"Path: {sql_file.resolve()}")
# with open(sql_file, "r", encoding="utf-8") as f:
# sql_template = f.read()
# sql = sql_template.format(
# empid_list=empid_list,
# run_date=run_date.strftime("%Y-%m-%d")
# df = fetch_data( # Step 4
# engine=sql_engine,
# table_name=table_name,
# table_type=table_type,
# empids=empids,
# run_date=run_date,
# )
# log.info(f"Fetching data for {len(empids):,} EMPIDs")
# log.info("Fetching OQaD data for run_date=%s", run_date)
# df = pl.read_database(
# query=sql,
# connection=engine,
# )
# log.info("fn name is fetch_OQad ------Fetched %s rows", len(df))
# log.info("fetch_OQaD complete — returning %s rows", len(df))
# return df
def fetch_OQaD(
sql_engine: Engine,
table_name: str,
table_type: str,
mids: list[int],
run_date: date
) -> pl.DataFrame:
client= get_clickhouse_client()
def table_exists(
client,
table_name: str,
) -> bool:
return bool(
client.command(
f"EXISTS TABLE {table_name}"
)
)
def fetch_quiz_empids(engine: Engine, run_date : date) -> pl.DataFrame:
sql_template = f"""
WITH MID_TABLE_COV1 AS
(
SELECT EmpId, VisitDate
FROM OneApp_KelloggsMT.dbo.T_OQAD
WHERE CreateDate >= {run_date}
AND CreateDate < DATEADD(DAY,1,'{run_date}')
UNION
SELECT EmpId, VisitDate
FROM OneApp_KelloggsMT.dbo.T_OQAD
WHERE UpdateDate >= {run_date}
AND UpdateDate < DATEADD(DAY,1, '{run_date}')
),
QUIZ AS
(
SELECT Distinct E.EmpId as empid
, CONVERT(date,DQ.VisitDate) AS visitdate
FROM OneApp_KelloggsMT.dbo.T_OQAD DQ INNER JOIN
OneApp_KelloggsMT.dbo.vw_Employee_Detail E ON DQ.EmpId = E.EmpId inner join
OneApp_KelloggsMT.dbo.Master_OQAD_Question QU on DQ.QuestionId= qu.QuestionId inner join
OneApp_KelloggsMT.dbo.Master_OQAD_Category qc on qu.QuestionCategoryId= qc.QuestionCategoryId
where e.EmpName not like 'test%' and e.RightId in (6)
and (E.ResignDate is null or E.ResignDate>=''+CONVERT(VARCHAR,'{run_date}')+'') AND E.EmpName NOT LIKE '%TEST%'
AND DQ.EmpId IN (SELECT EmpId FROM MID_TABLE_COV1 A WHERE
DQ.EmpId=A.EmpId AND CONVERT(date,VisitDate)=CONVERT(date,A.VisitDate) )
) select * from quiz
"""
sql = sql_template.format(
run_date=run_date.strftime("%Y-%m-%d")
)
log.info(f"Fetching quiz_empids data for EMPID and Visitid")
df = pl.read_database(
query=sql,
connection=engine
)
log.info(f"Fetched {len(df):,} total empid and visitdate fetched for OQAD from SQL Server")
return df
def get_empids_clickhouse_OQAD(
client,
table_name: str = "OQaD",
) -> pl.DataFrame:
if not table_exists(client, table_name):
log.warning(f"Table '{table_name}' does not exist.")
return pl.DataFrame(
schema={
"empid": pl.Int64,
"visitdate": pl.Date,
}
)
query = f"""
SELECT DISTINCT
employee_id AS empid,
visit_date AS visitdate
FROM {table_name}
"""
# ClickHouse -> PyArrow -> Polars
arrow_table = client.query_arrow(query)
return pl.from_arrow(arrow_table)
qf=fetch_quiz_empids(sql_engine,run_date)
db_df = get_empids_clickhouse_OQAD(client)
matched = qf.join(
db_df,
on=["empid", "visitdate"],
how="inner",
)
if matched.is_empty():
empids=[0]
log.warning(
"%s Matched df in OQaD returned no rows",
table_name,
)
else:
empids=matched["empid"].to_list()
log.info(f"Fetched {len(empids):,} matched empids fetched for OQAD ")
def fetch_data(
engine: Engine,
table_name: str,
table_type: str,
empids: list[int],
run_date: date
) -> pl.DataFrame:
empid_list = ",".join(str(empid) for empid in empids)
sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql"
log.info(f"Exists: {sql_file.exists()}")
log.info(f"Path: {sql_file.resolve()}")
with open(sql_file, "r", encoding="utf-8") as f:
sql_template = f.read()
sql = sql_template.format(
empid_list=empid_list,
run_date=run_date.strftime("%Y-%m-%d")
)
log.info(f"Fetching data for {len(empids):,} EMPIDs")
log.info("Fetching OQaD data for run_date=%s", run_date)
df = pl.read_database(
query=sql,
connection=engine,
)
log.info("fn name is fetch_OQad ------Fetched %s rows", len(df))
return df
df=fetch_data( engine=sql_engine,
table_name=table_name,
table_type=table_type,
empids=empids,
run_date=run_date
)
log.info(f"Fetched {len(df):,} rows from SQL Server")
return df
def fetch_OQaD(
engine: Engine,
table_name: str,
table_type: str,
empids: list[int],
run_date: date
) -> pl.DataFrame:
empid_list = ",".join(str(empid) for empid in empids)
sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql"
log.info(f"Exists: {sql_file.exists()}")
log.info(f"Path: {sql_file.resolve()}")
with open(sql_file, "r", encoding="utf-8") as f:
sql_template = f.read()
sql = sql_template.format(
empid_list=empid_list,
run_date=run_date.strftime("%Y-%m-%d")
)
log.info(f"Fetching data for {len(empids):,} EMPIDs")
log.info("Fetching OQaD data for run_date=%s", run_date)
df = pl.read_database(
query=sql,
connection=engine,
)
log.info("fn name is fetch_OQad ------Fetched %s rows", len(df))
return df
+903
View File
@@ -0,0 +1,903 @@
from pathlib import Path
import polars as pl
from sqlalchemy import Engine
from datetime import date , timedelta
from log import log
from db_con.connection import (
build_sql_server_engine,
build_clickhouse_engine,
get_clickhouse_client,
)
registry = {}
def register(func):
registry[func.__name__] = func
return func
# def fetch_data(
# engine: Engine,
# table_name: str,
# table_type: str,
# mids: list[int],
# run_date: date
# ) -> pl.DataFrame:
# if not mids:
# log.warning("No MIDs — nothing to fetch.")
# return pl.DataFrame()
# mid_list = ",".join(str(mid) for mid in mids)
# sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
# with open(sql_file, "r", encoding="utf-8") as f:
# sql_template = f.read()
# sql = sql_template.format(
# mid_list=mid_list,
# run_date=run_date.strftime("%Y-%m-%d")
# )
# log.info(f"Fetching data for {len(mids):,} MIDs")
# df = pl.read_database(
# query=sql,
# connection=engine
# )
# log.info(f"Fetched {len(df):,} rows from SQL Server")
# return df
@register
def fetch_SOS_OneApp( engine: Engine,
table_name: str,
table_type: str,
mids: list[int],
run_date: date
) -> pl.DataFrame:
if not mids:
log.warning("No MIDs — nothing to fetch.")
return pl.DataFrame()
log.info(f" Start Fetching data for these {len(mids):} MIDs ")
mid_list = ",".join(str(mid) for mid in mids)
log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}")
sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
with open(sql_file, "r", encoding="utf-8") as f:
sql_template = f.read()
sql = sql_template.format(
mid_list=mid_list,
run_date=run_date.strftime("%Y-%m-%d")
)
log.info(f"Fetching data for {len(mids):,} MIDs")
df = pl.read_database(
query=sql,
connection=engine
)
log.info(f"Fetched {len(df):,} rows from SQL Server")
return df
@register
def fetch_additional_visibility( engine: Engine,
table_name: str,
table_type: str,
mids: list[int],
run_date: date
) -> pl.DataFrame:
if not mids:
log.warning("No MIDs — nothing to fetch.")
return pl.DataFrame()
log.info(f" Start Fetching data for these {len(mids):} MIDs ")
mid_list = ",".join(str(mid) for mid in mids)
log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}")
sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
with open(sql_file, "r", encoding="utf-8") as f:
sql_template = f.read()
sql = sql_template.format(
mid_list=mid_list,
run_date=run_date.strftime("%Y-%m-%d")
)
log.info(f"Fetching data for {len(mids):,} MIDs")
df = pl.read_database(
query=sql,
connection=engine
)
log.info(f"Fetched {len(df):,} rows from SQL Server")
return df
# def fetch_OQaD(
# sql_engine: Engine,
# table_name: str,
# table_type: str,
# mids: list[int],
# run_date: date
# ) -> pl.DataFrame:
# # ─────────────────────────────────────────────
# # INNER HELPERS (defined once, used below)
# # ─────────────────────────────────────────────
# client = get_clickhouse_client()
# # ── Does a ClickHouse table exist? ────────────
# def table_exists(client, table_name: str) -> bool:
# return bool(client.command(f"EXISTS TABLE {table_name}"))
# # ── STEP 1: Who submitted yesterday in SQL Server? ───
# def fetch_quiz_empids(engine: Engine, run_date: date) -> pl.DataFrame:
# # Format date ONCE safely — avoids f-string injection bugs
# run_date_str = run_date.strftime("%Y-%m-%d")
# next_date_str = (run_date + timedelta(days=1)).strftime("%Y-%m-%d")
# sql = f"""
# WITH MID_TABLE_COV1 AS
# (
# -- Records CREATED yesterday
# SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate
# FROM OneApp_KelloggsMT.dbo.T_OQAD
# WHERE CreateDate >= '{run_date_str}'
# AND CreateDate < '{next_date_str}'
# UNION ALL
# -- Records UPDATED yesterday (different rows, safe to UNION ALL)
# SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate
# FROM OneApp_KelloggsMT.dbo.T_OQAD
# WHERE UpdateDate >= '{run_date_str}'
# AND UpdateDate < '{next_date_str}'
# ),
# QUIZ AS
# (
# SELECT DISTINCT
# E.EmpId AS empid,
# CAST(DQ.VisitDate AS DATE) AS visitdate
# FROM OneApp_KelloggsMT.dbo.T_OQAD DQ
# INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail E
# ON DQ.EmpId = E.EmpId
# INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Question QU
# ON DQ.QuestionId = QU.QuestionId
# INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Category QC
# ON QU.QuestionCategoryId = QC.QuestionCategoryId
# WHERE E.EmpName NOT LIKE '%TEST%' -- exclude test employees
# AND E.RightId = 6 -- only field reps
# AND (
# E.ResignDate IS NULL
# OR CAST(E.ResignDate AS DATE) >= '{run_date_str}'
# )
# AND EXISTS ( -- ✅ EXISTS beats IN for large sets
# SELECT 1
# FROM MID_TABLE_COV1 A
# WHERE A.EmpId = DQ.EmpId
# AND A.VisitDate = CAST(DQ.VisitDate AS DATE)
# )
# )
# SELECT * FROM QUIZ
# """
# log.info("Fetching quiz empids for run_date=%s", run_date_str)
# df = pl.read_database(query=sql, connection=engine)
# log.info("Fetched %s (EmpId, VisitDate) pairs from SQL Server", len(df))
# return df
# # ── STEP 2: Who do we ALREADY have in ClickHouse? ───
# def get_empids_clickhouse_OQAD(
# client,
# table_name: str = "OQaD",
# ) -> pl.DataFrame:
# if not table_exists(client, table_name):
# log.warning("Table '%s' does not exist in ClickHouse.", table_name)
# return pl.DataFrame(schema={"empid": pl.Int64, "visitdate": pl.Date})
# query = f"""
# SELECT DISTINCT
# employee_id AS empid,
# visit_date AS visitdate
# FROM {table_name}
# """
# arrow_table = client.query_arrow(query)
# df = pl.from_arrow(arrow_table)
# log.info("Fetched %s existing (EmpId, VisitDate) pairs from ClickHouse", len(df))
# return df
# # ── STEP 3: Who is NEW? (in SQL Server but NOT yet in ClickHouse) ───
# def find_new_empids(
# sql_df: pl.DataFrame,
# ch_df: pl.DataFrame,
# ) -> list[int]:
# new_df = sql_df.join(
# ch_df,
# on=["empid", "visitdate"],
# how="anti", # ✅ anti = keep rows NOT found in ch_df
# )
# if new_df.is_empty():
# log.warning("No new EmpIds found for table=%s — nothing to fetch.", table_name)
# return [0] # sentinel value — the .sql WHERE will return 0 rows safely
# empids = new_df["empid"].unique().to_list()
# log.info("Found %s NEW empids to fetch for %s", len(empids), table_name)
# return empids
# # ── STEP 4: Fetch full quiz data for new empids ───
# def fetch_data(
# engine: Engine,
# table_name: str,
# table_type: str,
# empids: list[int],
# run_date: date,
# ) -> pl.DataFrame:
# run_date_str = run_date.strftime("%Y-%m-%d")
# empid_list = ", ".join(str(e) for e in empids) # "101, 102, 103"
# sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql"
# log.info("Loading SQL from: %s (exists=%s)", sql_file.resolve(), sql_file.exists())
# with open(sql_file, "r", encoding="utf-8") as f:
# sql_template = f.read()
# sql = sql_template.format(
# empid_list=empid_list,
# run_date=run_date_str,
# )
# log.info("Fetching full OQaD data for %s empids, run_date=%s", len(empids), run_date_str)
# df = pl.read_database(query=sql, connection=engine)
# log.info("Fetched %s rows from SQL Server for table=%s", len(df), table_name)
# return df
# # ─────────────────────────────────────────────
# # MAIN FLOW (the 4 steps, clearly sequenced)
# # ─────────────────────────────────────────────
# qf = fetch_quiz_empids(sql_engine, run_date) # Step 1
# db_df = get_empids_clickhouse_OQAD(client, table_name) # Step 2
# empids = find_new_empids(qf, db_df) # Step 3
# df = fetch_data( # Step 4
# engine=sql_engine,
# table_name=table_name,
# table_type=table_type,
# empids=empids,
# run_date=run_date,
# )
# log.info("fetch_OQaD complete — returning %s rows", len(df))
# return df
@register
def fetch_OQaD(
sql_engine: Engine,
table_name: str,
table_type: str,
mids: list[int],
run_date: date
) -> pl.DataFrame:
client= get_clickhouse_client()
def table_exists(
client,
table_name: str,
) -> bool:
return bool(
client.command(
f"EXISTS TABLE {table_name}"
)
)
def fetch_quiz_empids(engine: Engine, run_date : date) -> pl.DataFrame:
sql_template = f"""
WITH MID_TABLE_COV1 AS
(
SELECT EmpId, VisitDate
FROM OneApp_KelloggsMT.dbo.T_OQAD
WHERE CreateDate >= {run_date}
AND CreateDate < DATEADD(DAY,1,'{run_date}')
UNION
SELECT EmpId, VisitDate
FROM OneApp_KelloggsMT.dbo.T_OQAD
WHERE UpdateDate >= {run_date}
AND UpdateDate < DATEADD(DAY,1, '{run_date}')
),
QUIZ AS
(
SELECT Distinct E.EmpId as empid
, CONVERT(date,DQ.VisitDate) AS visitdate
FROM OneApp_KelloggsMT.dbo.T_OQAD DQ INNER JOIN
OneApp_KelloggsMT.dbo.vw_Employee_Detail E ON DQ.EmpId = E.EmpId inner join
OneApp_KelloggsMT.dbo.Master_OQAD_Question QU on DQ.QuestionId= qu.QuestionId inner join
OneApp_KelloggsMT.dbo.Master_OQAD_Category qc on qu.QuestionCategoryId= qc.QuestionCategoryId
where e.EmpName not like 'test%' and e.RightId in (6)
and (E.ResignDate is null or E.ResignDate>=''+CONVERT(VARCHAR,'{run_date}')+'') AND E.EmpName NOT LIKE '%TEST%'
AND DQ.EmpId IN (SELECT EmpId FROM MID_TABLE_COV1 A WHERE
DQ.EmpId=A.EmpId AND CONVERT(date,VisitDate)=CONVERT(date,A.VisitDate) )
) select * from quiz
"""
sql = sql_template.format(
run_date=run_date.strftime("%Y-%m-%d")
)
log.info(f"Fetching quiz_empids data for EMPID and Visitid")
df = pl.read_database(
query=sql,
connection=engine
)
log.info(f"Fetched {len(df):,} total empid and visitdate fetched for OQAD from SQL Server")
return df
def get_empids_clickhouse_OQAD(
client,
table_name: str = "OQaD",
) -> pl.DataFrame:
if not table_exists(client, table_name):
log.warning(f"Table '{table_name}' does not exist.")
return pl.DataFrame(
schema={
"empid": pl.Int64,
"visitdate": pl.Date,
}
)
query = f"""
SELECT DISTINCT
employee_id AS empid,
visit_date AS visitdate
FROM {table_name}
"""
# ClickHouse -> PyArrow -> Polars
arrow_table = client.query_arrow(query)
return pl.from_arrow(arrow_table)
qf=fetch_quiz_empids(sql_engine,run_date)
db_df = get_empids_clickhouse_OQAD(client)
matched = qf.join(
db_df,
on=["empid", "visitdate"],
how="inner",
)
if matched.is_empty():
empids=[0]
log.warning(
"%s Matched df in OQaD returned no rows",
table_name,
)
else:
empids=matched["empid"].to_list()
log.info(f"Fetched {len(empids):,} matched empids fetched for OQAD ")
def fetch_data(
engine: Engine,
table_name: str,
table_type: str,
empids: list[int],
run_date: date
) -> pl.DataFrame:
empid_list = ",".join(str(empid) for empid in empids)
sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql"
log.info(f"Exists: {sql_file.exists()}")
log.info(f"Path: {sql_file.resolve()}")
with open(sql_file, "r", encoding="utf-8") as f:
sql_template = f.read()
sql = sql_template.format(
empid_list=empid_list,
run_date=run_date.strftime("%Y-%m-%d")
)
log.info(f"Fetching data for {len(empids):,} EMPIDs")
log.info("Fetching OQaD data for run_date=%s", run_date)
df = pl.read_database(
query=sql,
connection=engine,
)
log.info("fn name is fetch_OQad ------Fetched %s rows", len(df))
return df
df=fetch_data( engine=sql_engine,
table_name=table_name,
table_type=table_type,
empids=empids,
run_date=run_date
)
log.info(f"Fetched {len(df):,} rows from SQL Server")
return df
@register
def fetch_OQaD(
engine: Engine,
table_name: str,
table_type: str,
empids: list[int],
run_date: date
) -> pl.DataFrame:
empid_list = ",".join(str(empid) for empid in empids)
sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql"
log.info(f"Exists: {sql_file.exists()}")
log.info(f"Path: {sql_file.resolve()}")
with open(sql_file, "r", encoding="utf-8") as f:
sql_template = f.read()
sql = sql_template.format(
empid_list=empid_list,
run_date=run_date.strftime("%Y-%m-%d")
)
log.info(f"Fetching data for {len(empids):,} EMPIDs")
log.info("Fetching OQaD data for run_date=%s", run_date)
df = pl.read_database(
query=sql,
connection=engine,
)
log.info("fn name is fetch_OQad ------Fetched %s rows", len(df))
return df
@register
def fetch_Coverage( engine: Engine,
table_name: str,
table_type: str,
mids: list[int],
run_date: date
) -> pl.DataFrame:
if not mids:
log.warning("No MIDs — nothing to fetch.")
return pl.DataFrame()
log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}")
mid_list = ",".join(str(mid) for mid in mids)
sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
with open(sql_file, "r", encoding="utf-8") as f:
sql_template = f.read()
sql = sql_template.format(
mid_list=mid_list,
run_date=run_date.strftime("%Y-%m-%d")
)
log.info(f"Fetching data for {len(mids):,} MIDs")
df = pl.read_database(
query=sql,
connection=engine
)
log.info(f"Fetched {len(df):,} rows from SQL Server")
return df
@register
def fetch_Survey( engine: Engine,
table_name: str,
table_type: str,
mids: list[int],
run_date: date
) -> pl.DataFrame:
if not mids:
log.warning("No MIDs — nothing to fetch.")
return pl.DataFrame()
mid_list = ",".join(str(mid) for mid in mids)
log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}")
sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
with open(sql_file, "r", encoding="utf-8") as f:
sql_template = f.read()
sql = sql_template.format(
mid_list=mid_list,
run_date=run_date.strftime("%Y-%m-%d")
)
log.info(f"Fetching data for {len(mids):,} MIDs")
df = pl.read_database(
query=sql,
connection=engine
)
log.info(f"Fetched {len(df):,} rows from SQL Server")
return df
@register
def fetch_Login( engine: Engine,
table_name: str,
table_type: str,
mids: list[int],
run_date: date
) -> pl.DataFrame:
# if not mids:
# log.warning("No MIDs — nothing to fetch.")
# return pl.DataFrame()
mid_list = ",".join(str(mid) for mid in mids)
log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}")
sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
with open(sql_file, "r", encoding="utf-8") as f:
sql_template = f.read()
sql = sql_template.format(
# mid_list=mid_list,
run_date=run_date.strftime("%Y-%m-%d")
)
log.info(f"Fetching data for {len(mids):,} MIDs")
df = pl.read_database(
query=sql,
connection=engine
)
log.info(f"Fetched {len(df):,} rows from SQL Server")
return df
@register
def fetch_Stock_Details( engine: Engine,
table_name: str,
table_type: str,
mids: list[int],
run_date: date
) -> pl.DataFrame:
if not mids:
log.warning("No MIDs — nothing to fetch.")
return pl.DataFrame()
mid_list = ",".join(str(mid) for mid in mids)
log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}")
sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
with open(sql_file, "r", encoding="utf-8") as f:
sql_template = f.read()
sql = sql_template.format(
mid_list=mid_list,
run_date=run_date.strftime("%Y-%m-%d")
)
log.info(f"Fetching data for {len(mids):,} MIDs")
df = pl.read_database(
query=sql,
connection=engine
)
log.info(f"Fetched {len(df):,} rows from SQL Server")
return df
# def fetch_Coverage( engine: Engine,
# table_name: str,
# table_type: str,
# mids: list[int],
# run_date: date
# ) -> pl.DataFrame:
# if not mids:
# log.warning("No MIDs — nothing to fetch.")
# return pl.DataFrame()
# mid_list = ",".join(str(mid) for mid in mids)
# log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}")
# sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
# with open(sql_file, "r", encoding="utf-8") as f:
# sql_template = f.read()
# sql = sql_template.format(
# mid_list=mid_list,
# run_date=run_date.strftime("%Y-%m-%d")
# )
# log.info(f"Fetching data for {len(mids):,} MIDs")
# df = pl.read_database(
# query=sql,
# connection=engine
# )
# log.info(f"Fetched {len(df):,} rows from SQL Server")
# return df
@register
def fetch_Attendance(
engine: Engine,
table_name: str,
table_type: str,
mids: list[int],
run_date: date | None = None,
days_back: int = 15
) -> pl.DataFrame:
"""
Fetch attendance source data.
Default:
end_date = yesterday
start_date = yesterday - 15 days
"""
if run_date is None:
run_date = date.today() - timedelta(days=1)
start_date = run_date - timedelta(days=days_back)
sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
with open(sql_file, "r", encoding="utf-8") as f:
sql_template = f.read()
sql = sql_template.format(
start_date=start_date.strftime("%Y-%m-%d"),
run_date=run_date.strftime("%Y-%m-%d")
)
log.info(
f"Fetching Attendance data from {start_date} to {run_date}"
)
df = pl.read_database(
query=sql,
connection=engine
)
log.info(
f"Fetched {len(df):,} attendance rows "
f"for {df['employee_id'].n_unique():,} employees"
)
return df
@register
def fetch_Journey_Plan( engine: Engine,
table_name: str,
table_type: str,
mids: list[int],
run_date: date
) -> pl.DataFrame:
if not mids:
log.warning("No MIDs — nothing to fetch.")
return pl.DataFrame()
log.info(f" Start Fetching data for these {len(mids):} MIDs ")
mid_list = ",".join(str(mid) for mid in mids)
log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}")
sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
with open(sql_file, "r", encoding="utf-8") as f:
sql_template = f.read()
sql = sql_template.format(
mid_list=mid_list,
run_date=run_date.strftime("%Y-%m-%d")
)
log.info(f"Fetching data for {len(mids):,} MIDs")
df = pl.read_database(
query=sql,
connection=engine
)
log.info(f"Fetched {len(df):,} rows from SQL Server")
return df
@register
def fetch_PaidVisibility( engine: Engine,
table_name: str,
table_type: str,
mids: list[int],
run_date: date
) -> pl.DataFrame:
if not mids:
log.warning("No MIDs — nothing to fetch.")
return pl.DataFrame()
log.info(f" Start Fetching data for these {len(mids):} MIDs ")
mid_list = ",".join(str(mid) for mid in mids)
log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}")
sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
with open(sql_file, "r", encoding="utf-8") as f:
sql_template = f.read()
sql = sql_template.format(
mid_list=mid_list,
run_date=run_date.strftime("%Y-%m-%d")
)
log.info(f"Fetching data for {len(mids):,} MIDs")
df = pl.read_database(
query=sql,
connection=engine
)
log.info(f"Fetched {len(df):,} rows from SQL Server")
return df
@register
def fetch_Web_Logins( engine: Engine,
table_name: str,
table_type: str,
mids: list[int],
run_date: date
) -> pl.DataFrame:
if not mids:
log.warning("No MIDs — nothing to fetch.")
return pl.DataFrame()
log.info(f" Start Fetching data for these {len(mids):} MIDs ")
mid_list = ",".join(str(mid) for mid in mids)
log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}")
sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
with open(sql_file, "r", encoding="utf-8") as f:
sql_template = f.read()
sql = sql_template.format(
mid_list=mid_list,
run_date=run_date.strftime("%Y-%m-%d")
)
log.info(f"Fetching data for {len(mids):,} MIDs")
df = pl.read_database(
query=sql,
connection=engine
)
log.info(f"Fetched {len(df):,} rows from SQL Server")
return df
@register
def fetch_Promotion(
engine: Engine,
table_name: str,
table_type: str,
mids: list[int],
run_date: date
) -> pl.DataFrame:
if not mids:
log.warning("No MIDs — nothing to fetch.")
return pl.DataFrame()
mid_list = ",".join(str(mid) for mid in mids)
sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
with open(sql_file, "r", encoding="utf-8") as f:
sql_template = f.read()
sql = sql_template.format(
mid_list=mid_list,
run_date=run_date.strftime("%Y-%m-%d")
)
log.info(f"Fetching data for {len(mids):,} MIDs")
df = pl.read_database(
query=sql,
connection=engine
)
log.info(f"Fetched {len(df):,} rows from SQL Server")
return df
+35 -71
View File
@@ -1,76 +1,40 @@
WITH MID_TABLE_COV1 AS
(
SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate
FROM OneApp_KelloggsMT.dbo.T_OQAD
WHERE CreateDate >= '{run_date}'
AND CreateDate < DATEADD(DAY, 1, '{run_date}')
UNION ALL
SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate
FROM OneApp_KelloggsMT.dbo.T_OQAD
WHERE UpdateDate >= '{run_date}'
AND UpdateDate < DATEADD(DAY, 1, '{run_date}')
SELECT EmpId,VisitDate FROM OneApp_KelloggsMT.dbo.T_OQAD
WHERE CONVERT(VARCHAR,CreateDate,101)= CONVERT(VARCHAR,cast('{run_date}' as date),101)
UNION
SELECT EmpId,VisitDate FROM OneApp_KelloggsMT.dbo.T_OQAD
WHERE CONVERT(VARCHAR,UpdateDate,101)= CONVERT(VARCHAR,cast('{run_date}' as date),101)
),
QUIZ AS
QUIZ
As
(
SELECT DISTINCT
E.EmpId,
E.EmpName,
E.SupervisorId,
E.SupervisorName,
E.DesignationName,
E.CityName,
E.StateName,
E.RegionName,
CAST(DQ.VisitDate AS DATE) AS VisitDate,
DQ.QuestionId,
DQ.AnswerId,
QC.QuestionCategoryId,
QC.QuestionCategory
FROM OneApp_KelloggsMT.dbo.T_OQAD DQ
INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail E
ON DQ.EmpId = E.EmpId
INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Question QU
ON DQ.QuestionId = QU.QuestionId
INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Category QC
ON QU.QuestionCategoryId = QC.QuestionCategoryId
WHERE E.EmpName NOT LIKE '%TEST%'
AND E.RightId = 6
AND (E.ResignDate IS NULL OR CAST(E.ResignDate AS DATE) >= '{run_date}')
AND EXISTS (
SELECT 1
FROM MID_TABLE_COV1 A
WHERE A.EmpId = DQ.EmpId
AND A.VisitDate = CAST(DQ.VisitDate AS DATE)
)
-- ✅ Exclude EmpIds already loaded into ClickHouse
AND E.EmpId NOT IN ({empid_list})
SELECT Distinct E.EmpId, E.EmpName, E.SupervisorId, E.SupervisorName, E.DesignationName, E.CityName, E.StateName, E.RegionName
, CONVERT(VARCHAR,DQ.VisitDate,101) AS VisitDate
, DQ.QuestionId, DQ.AnswerId, qc.QuestionCategoryId, qc.QuestionCategory
FROM OneApp_KelloggsMT.dbo.T_OQAD DQ INNER JOIN
OneApp_KelloggsMT.dbo.vw_Employee_Detail E ON DQ.EmpId = E.EmpId inner join
OneApp_KelloggsMT.dbo.Master_OQAD_Question QU on DQ.QuestionId= qu.QuestionId inner join
OneApp_KelloggsMT.dbo.Master_OQAD_Category qc on qu.QuestionCategoryId= qc.QuestionCategoryId
where e.EmpName not like 'test%' and e.RightId in (6)
and (E.ResignDate is null or E.ResignDate>=''+CONVERT(VARCHAR,cast('{run_date}' as date),101)+'') AND E.EmpName NOT LIKE '%TEST%'
AND DQ.EmpId IN (SELECT EmpId FROM MID_TABLE_COV1 A WHERE
DQ.EmpId=A.EmpId AND CONVERT(date,DQ.VisitDate,101)=CONVERT(date,A.VisitDate,101) )
)
SELECT
40148 AS project_id,
Q.EmpId AS employee_id,
0 AS process_id,
Q.VisitDate AS visit_date,
Q.QuestionCategoryId AS question_category_id,
Q.QuestionCategory AS question_category,
QM.QuestionId AS question_id,
QM.Question AS question,
ISNULL(QA.AnswerId, 0) AS answer_id,
ISNULL(QA.Answer, '') AS answer,
CASE
WHEN QA.AnswerId IS NULL THEN 'Not Answer'
WHEN QA.RightAnswer = 1 THEN 'Y'
WHEN QA.RightAnswer IS NULL THEN 'Not Answer'
ELSE 'N'
END AS correct_answer,
GETDATE() AS update_date,
'SP-Pius' AS update_by
FROM QUIZ Q
INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Question QM
ON Q.QuestionId = QM.QuestionId
LEFT JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Answer QA
ON Q.AnswerId = QA.AnswerId
, OQaD (project_id,
employee_id,process_id,visit_date,question_category_id,question_category,question_id,
question,answer_id,answer,correct_answer,update_date,update_by)
AS (
Select '40148' AS ProjectId,Q.EmpId,0,Q.VisitDate,Q.QuestionCategoryId,Q.QuestionCategory,
QM.QuestionId, QM.Question
, CASE WHEN QA.AnswerId IS NULL THEN 0 ELSE QA.AnswerId END AS AnswerId
,CASE WHEN QA.ANSWER IS NULL THEN '' ELSE QA.ANSWER END AS Answer
, CASE WHEN QA.AnswerId IS NULL THEN 'Not Answer' ELSE CASE WHEN QA.RightAnswer = 1 THEN 'Y'
ELSE CASE WHEN QA.RightAnswer IS NULL THEN 'Not Answer' ELSE 'N' END END END AS RightAnswer,GETDATE(),'SP-Pius'
FROM QUIZ Q INNER JOIN
OneApp_KelloggsMT.dbo.Master_OQAD_Question QM ON Q.QuestionId = QM.QuestionId Left JOIN
OneApp_KelloggsMT.dbo.Master_OQAD_Answer QA ON Q.AnswerId= QA.AnswerId
where Q.EmpId not in ( {empid_list})
)
select * from OQaD
+2 -2
View File
@@ -1,2 +1,2 @@
- pipeline_trigeered_on_date: '2026-06-23'
last_successful_run_date: 2026-06-22
- pipeline_trigeered_on_date: '2026-06-25'
last_successful_run_date: 2026-06-24
+1
View File
@@ -0,0 +1 @@
fn(sql_engine, table_name , table_type, mids, run_date)