Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1d5ad2d793 |
@@ -20,13 +20,23 @@ def load_to_clickhouse(
|
||||
log.warning(f"{table_name}: DataFrame is empty. Skipping.")
|
||||
return
|
||||
|
||||
arrow_table = df.to_arrow()
|
||||
chunk_size = 10000
|
||||
|
||||
for start in range(0, len(df), chunk_size):
|
||||
end = start + chunk_size
|
||||
|
||||
chunk_df = df.slice(start, chunk_size)
|
||||
arrow_table = chunk_df.to_arrow()
|
||||
|
||||
client.insert_arrow(
|
||||
table=table_name,
|
||||
arrow_table=arrow_table,
|
||||
)
|
||||
|
||||
log.info(
|
||||
f"Inserted rows {start:,} to {min(end, len(df)):,}"
|
||||
)
|
||||
|
||||
log.info(
|
||||
f"{table_name}: inserted {len(df):,} rows into ClickHouse"
|
||||
)
|
||||
@@ -86,5 +86,7 @@ def get_clickhouse_client():
|
||||
port=CH_PORT,
|
||||
username=CH_USER,
|
||||
password=CH_PASS,
|
||||
database=CH_DB
|
||||
database=CH_DB,
|
||||
connect_timeout=30,
|
||||
send_receive_timeout=600,
|
||||
)
|
||||
@@ -0,0 +1,266 @@
|
||||
|
||||
from __future__ import annotations
|
||||
from time import sleep
|
||||
import sys
|
||||
from datetime import date, datetime, timedelta
|
||||
|
||||
import polars as pl
|
||||
import yaml
|
||||
|
||||
|
||||
from log import log
|
||||
|
||||
from clickhouse_task.create_table import create_clickhouse_table
|
||||
from clickhouse_task.delete_task import (
|
||||
delete_existing_data,
|
||||
truncate_table,
|
||||
)
|
||||
|
||||
from clickhouse_task.load_table import load_to_clickhouse
|
||||
|
||||
from db_con.connection import (
|
||||
build_sql_server_engine,
|
||||
build_clickhouse_engine,
|
||||
get_clickhouse_client,
|
||||
)
|
||||
|
||||
from mids import (
|
||||
MID_TABLE_COV,
|
||||
MID_TABLE_COV1,
|
||||
)
|
||||
|
||||
from src.bridge import *
|
||||
from src.fact import *
|
||||
from src.dim import *
|
||||
|
||||
|
||||
# ==========================================================
|
||||
# Helpers
|
||||
# ==========================================================
|
||||
|
||||
def get_dates_from_yaml(filename: str):
|
||||
with open(filename, "r") as file:
|
||||
data = yaml.safe_load(file)
|
||||
|
||||
start_date = date.fromisoformat(
|
||||
str(data["pipeline"]["start_date"])
|
||||
)
|
||||
|
||||
end_date = date.fromisoformat(
|
||||
str(data["pipeline"]["end_date"])
|
||||
)
|
||||
flag=str(data["pipeline"]["flag"])
|
||||
|
||||
return start_date, end_date , flag
|
||||
|
||||
|
||||
def write_table_to_yaml(
|
||||
data: dict,
|
||||
run_date: date,
|
||||
filename: str | None = None
|
||||
):
|
||||
"""Write table data to a YAML file."""
|
||||
|
||||
if filename is None:
|
||||
filename = f"elt_pipeline_{run_date}.yml"
|
||||
|
||||
with open(filename, "w") as file:
|
||||
yaml.dump(
|
||||
data,
|
||||
file,
|
||||
default_flow_style=False,
|
||||
sort_keys=False
|
||||
)
|
||||
|
||||
print(f"Table written to {filename}")
|
||||
|
||||
|
||||
|
||||
|
||||
def table_exists(
|
||||
client,
|
||||
table_name: str,
|
||||
) -> bool:
|
||||
|
||||
return bool(
|
||||
client.command(
|
||||
f"EXISTS TABLE {table_name}"
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
# ==========================================================
|
||||
# elt ()
|
||||
# ==========================================================
|
||||
|
||||
def elt(run_date : date , config: dict):
|
||||
|
||||
log.info("=" * 80)
|
||||
log.info("Hello from data-move Python data pipeline!")
|
||||
|
||||
# ------------------------------------------------------
|
||||
# Run Date
|
||||
# ------------------------------------------------------
|
||||
|
||||
|
||||
|
||||
log.info(
|
||||
"Pipeline Run Date: %s",
|
||||
run_date,
|
||||
)
|
||||
|
||||
# ------------------------------------------------------
|
||||
# Connections
|
||||
# ------------------------------------------------------
|
||||
|
||||
log.info(
|
||||
"Connecting to databases..."
|
||||
)
|
||||
|
||||
sql_engine = build_sql_server_engine()
|
||||
clickhouse_engine = build_clickhouse_engine()
|
||||
client = get_clickhouse_client()
|
||||
|
||||
log.info(
|
||||
"Database connections established"
|
||||
)
|
||||
|
||||
# ------------------------------------------------------
|
||||
# mids Keys
|
||||
# ------------------------------------------------------
|
||||
|
||||
mids = MID_TABLE_COV(
|
||||
sql_engine,
|
||||
run_date,
|
||||
)
|
||||
|
||||
emp_visit_df = MID_TABLE_COV1(
|
||||
sql_engine,
|
||||
run_date,
|
||||
)
|
||||
|
||||
|
||||
|
||||
|
||||
# ------------------------------------------------------
|
||||
# Process Tables
|
||||
# ------------------------------------------------------
|
||||
for table in config["tables"]:
|
||||
table_name = table["name"]
|
||||
operation = table["operation"]
|
||||
fetch_by = table["fetch_by"]
|
||||
table_type=table["type"]
|
||||
log.info(
|
||||
"Pipeline Run Date: %s",
|
||||
run_date,
|
||||
)
|
||||
|
||||
|
||||
log.info("=" * 80)
|
||||
log.info(f"Processing Table-:{table_name} | Table type -:{table_type} | fetcht by-:{fetch_by} | operation-:{operation}" )
|
||||
|
||||
try:
|
||||
|
||||
# ------------------------------------------
|
||||
# Fetch Data
|
||||
# ------------------------------------------
|
||||
|
||||
|
||||
log.info(f"Fetching Data from sql server for table-: {table_name} ..............")
|
||||
fetch_list=["mids" ,"run_date", "reason_id"]
|
||||
if fetch_by in fetch_list :
|
||||
fn_name = f"fetch_{table_name}"
|
||||
fn = globals()[fn_name]
|
||||
df=fn(sql_engine, table_name , table_type, mids, run_date)
|
||||
else:
|
||||
df = fetch_data(sql_engine ,table_name,table_type)
|
||||
|
||||
log.info(f"Fetched total row -: {len(df)} from sql server for table-:{table_name} ...........!!!")
|
||||
|
||||
if df.is_empty():
|
||||
|
||||
log.warning(
|
||||
"%s returned no rows",
|
||||
table_name,
|
||||
)
|
||||
|
||||
continue
|
||||
|
||||
log.info(
|
||||
"Fetched %s rows",
|
||||
len(df),
|
||||
)
|
||||
|
||||
# ------------------------------------------
|
||||
# Create Table If Missing
|
||||
# ------------------------------------------
|
||||
|
||||
exists = table_exists(
|
||||
client,
|
||||
table_name,
|
||||
)
|
||||
|
||||
if not exists:
|
||||
|
||||
log.info(
|
||||
"Creating table %s",
|
||||
table_name,
|
||||
)
|
||||
|
||||
create_clickhouse_table(
|
||||
df=df,
|
||||
table_name=table_name,
|
||||
clickhouse_engine=clickhouse_engine,
|
||||
)
|
||||
|
||||
# ------------------------------------------
|
||||
# Existing Table Logic
|
||||
# ------------------------------------------
|
||||
|
||||
else:
|
||||
|
||||
if operation == "DELETE+INSERT":
|
||||
|
||||
truncate_table(
|
||||
client,
|
||||
table_name,
|
||||
)
|
||||
|
||||
else:
|
||||
|
||||
delete_existing_data(
|
||||
client=client,
|
||||
table_name=table_name,
|
||||
run_date=run_date,
|
||||
mids=mids,
|
||||
emp_visit_df=emp_visit_df,
|
||||
)
|
||||
|
||||
# ------------------------------------------
|
||||
# Load Data
|
||||
# ------------------------------------------
|
||||
log.info("_ _ _ _Inserting data into clickhouse db from sql server_ _ _ _")
|
||||
load_to_clickhouse(
|
||||
client=client,
|
||||
table_name=table_name,
|
||||
df=df,
|
||||
)
|
||||
|
||||
log.info(
|
||||
"%s loaded successfully (%s rows)",
|
||||
table_name,
|
||||
len(df),
|
||||
)
|
||||
|
||||
except Exception:
|
||||
|
||||
log.exception(
|
||||
"Failed processing table %s",
|
||||
table_name,
|
||||
)
|
||||
|
||||
raise
|
||||
|
||||
log.info("=" * 80)
|
||||
log.info("Pipeline Completed Successfully")
|
||||
log.info("=" * 80)
|
||||
@@ -0,0 +1,264 @@
|
||||
|
||||
from __future__ import annotations
|
||||
from time import sleep
|
||||
import sys
|
||||
from datetime import date, datetime, timedelta
|
||||
|
||||
import polars as pl
|
||||
import yaml
|
||||
|
||||
|
||||
from log import log
|
||||
|
||||
from clickhouse_task.create_table import create_clickhouse_table
|
||||
from clickhouse_task.delete_task import (
|
||||
delete_existing_data,
|
||||
truncate_table,
|
||||
)
|
||||
|
||||
from clickhouse_task.load_table import load_to_clickhouse
|
||||
|
||||
from db_con.connection import (
|
||||
build_sql_server_engine,
|
||||
build_clickhouse_engine,
|
||||
get_clickhouse_client,
|
||||
)
|
||||
|
||||
from mids import (
|
||||
MID_TABLE_COV,
|
||||
MID_TABLE_COV1,
|
||||
)
|
||||
|
||||
from src.bridge import *
|
||||
from src.fact_updated import *
|
||||
from src.dim import *
|
||||
|
||||
|
||||
# ==========================================================
|
||||
# Helpers
|
||||
# ==========================================================
|
||||
|
||||
def get_dates_from_yaml(filename: str):
|
||||
with open(filename, "r") as file:
|
||||
data = yaml.safe_load(file)
|
||||
|
||||
start_date = date.fromisoformat(
|
||||
str(data["pipeline"]["start_date"])
|
||||
)
|
||||
|
||||
end_date = date.fromisoformat(
|
||||
str(data["pipeline"]["end_date"])
|
||||
)
|
||||
flag=str(data["pipeline"]["flag"])
|
||||
|
||||
return start_date, end_date , flag
|
||||
|
||||
|
||||
def write_table_to_yaml(
|
||||
data: dict,
|
||||
run_date: date,
|
||||
filename: str | None = None
|
||||
):
|
||||
"""Write table data to a YAML file."""
|
||||
|
||||
if filename is None:
|
||||
filename = f"elt_pipeline_{run_date}.yml"
|
||||
|
||||
with open(filename, "w") as file:
|
||||
yaml.dump(
|
||||
data,
|
||||
file,
|
||||
default_flow_style=False,
|
||||
sort_keys=False
|
||||
)
|
||||
|
||||
print(f"Table written to {filename}")
|
||||
|
||||
|
||||
|
||||
|
||||
def table_exists(
|
||||
client,
|
||||
table_name: str,
|
||||
) -> bool:
|
||||
|
||||
return bool(
|
||||
client.command(
|
||||
f"EXISTS TABLE {table_name}"
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
# ==========================================================
|
||||
# elt ()
|
||||
# ==========================================================
|
||||
|
||||
def elt(run_date : date , config: dict):
|
||||
|
||||
log.info("=" * 80)
|
||||
log.info("Hello from data-move Python data pipeline!")
|
||||
|
||||
# ------------------------------------------------------
|
||||
# Run Date
|
||||
# ------------------------------------------------------
|
||||
|
||||
|
||||
|
||||
log.info(
|
||||
"Pipeline Run Date: %s",
|
||||
run_date,
|
||||
)
|
||||
|
||||
# ------------------------------------------------------
|
||||
# Connections
|
||||
# ------------------------------------------------------
|
||||
|
||||
log.info(
|
||||
"Connecting to databases..."
|
||||
)
|
||||
|
||||
sql_engine = build_sql_server_engine()
|
||||
clickhouse_engine = build_clickhouse_engine()
|
||||
client = get_clickhouse_client()
|
||||
|
||||
log.info(
|
||||
"Database connections established"
|
||||
)
|
||||
|
||||
# ------------------------------------------------------
|
||||
# mids Keys
|
||||
# ------------------------------------------------------
|
||||
|
||||
mids = MID_TABLE_COV(
|
||||
sql_engine,
|
||||
run_date,
|
||||
)
|
||||
|
||||
emp_visit_df = MID_TABLE_COV1(
|
||||
sql_engine,
|
||||
run_date,
|
||||
)
|
||||
|
||||
|
||||
|
||||
|
||||
# ------------------------------------------------------
|
||||
# Process Tables
|
||||
# ------------------------------------------------------
|
||||
for table in config["tables"]:
|
||||
table_name = table["name"]
|
||||
operation = table["operation"]
|
||||
fetch_by = table["fetch_by"]
|
||||
table_type=table["type"]
|
||||
log.info(
|
||||
"Pipeline Run Date: %s",
|
||||
run_date,
|
||||
)
|
||||
|
||||
|
||||
log.info("=" * 80)
|
||||
log.info(f"Processing Table-:{table_name} | Table type -:{table_type} | fetcht by-:{fetch_by} | operation-:{operation}" )
|
||||
|
||||
try:
|
||||
|
||||
# ------------------------------------------
|
||||
# Fetch Data
|
||||
# ------------------------------------------
|
||||
|
||||
|
||||
log.info(f"Fetching Data from sql server for table-: {table_name} ..............")
|
||||
fetch_list=["mids" ,"run_date", "reason_id"]
|
||||
if fetch_by in fetch_list :
|
||||
df=registry[f"fetch_{table_name}"](sql_engine, table_name , table_type, mids, run_date)
|
||||
else:
|
||||
df = fetch_data(sql_engine ,table_name,table_type)
|
||||
|
||||
log.info(f"Fetched total row -: {len(df)} from sql server for table-:{table_name} ...........!!!")
|
||||
|
||||
if df.is_empty():
|
||||
|
||||
log.warning(
|
||||
"%s returned no rows",
|
||||
table_name,
|
||||
)
|
||||
|
||||
continue
|
||||
|
||||
log.info(
|
||||
"Fetched %s rows",
|
||||
len(df),
|
||||
)
|
||||
|
||||
# ------------------------------------------
|
||||
# Create Table If Missing
|
||||
# ------------------------------------------
|
||||
|
||||
exists = table_exists(
|
||||
client,
|
||||
table_name,
|
||||
)
|
||||
|
||||
if not exists:
|
||||
|
||||
log.info(
|
||||
"Creating table %s",
|
||||
table_name,
|
||||
)
|
||||
|
||||
create_clickhouse_table(
|
||||
df=df,
|
||||
table_name=table_name,
|
||||
clickhouse_engine=clickhouse_engine,
|
||||
)
|
||||
|
||||
# ------------------------------------------
|
||||
# Existing Table Logic
|
||||
# ------------------------------------------
|
||||
|
||||
else:
|
||||
|
||||
if operation == "DELETE+INSERT":
|
||||
|
||||
truncate_table(
|
||||
client,
|
||||
table_name,
|
||||
)
|
||||
|
||||
else:
|
||||
|
||||
delete_existing_data(
|
||||
client=client,
|
||||
table_name=table_name,
|
||||
run_date=run_date,
|
||||
mids=mids,
|
||||
emp_visit_df=emp_visit_df,
|
||||
)
|
||||
|
||||
# ------------------------------------------
|
||||
# Load Data
|
||||
# ------------------------------------------
|
||||
log.info("_ _ _ _Inserting data into clickhouse db from sql server_ _ _ _")
|
||||
load_to_clickhouse(
|
||||
client=client,
|
||||
table_name=table_name,
|
||||
df=df,
|
||||
)
|
||||
|
||||
log.info(
|
||||
"%s loaded successfully (%s rows)",
|
||||
table_name,
|
||||
len(df),
|
||||
)
|
||||
|
||||
except Exception:
|
||||
|
||||
log.exception(
|
||||
"Failed processing table %s",
|
||||
table_name,
|
||||
)
|
||||
|
||||
raise
|
||||
|
||||
log.info("=" * 80)
|
||||
log.info("Pipeline Completed Successfully")
|
||||
log.info("=" * 80)
|
||||
@@ -1,3 +1,3 @@
|
||||
- pipeline_trigeered_on_date: '2026-06-23'
|
||||
- pipeline_trigeered_on_date: '2026-06-25'
|
||||
failed_run_date: none
|
||||
attempt: none
|
||||
|
||||
@@ -0,0 +1,240 @@
|
||||
2026-06-24 15:55:46 | INFO | Pipeline Start Date: 2026-06-23
|
||||
2026-06-24 15:55:46 | INFO | ================================================================================
|
||||
2026-06-24 15:55:46 | INFO | Hello from data-move Python data pipeline!
|
||||
2026-06-24 15:55:46 | INFO | Pipeline Run Date: 2026-06-23
|
||||
2026-06-24 15:55:46 | INFO | Connecting to databases...
|
||||
2026-06-24 15:55:48 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000002C8151D4730>
|
||||
2026-06-24 15:55:52 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000002C8162F0B90>
|
||||
2026-06-24 15:55:52 | INFO | Database connections established
|
||||
2026-06-24 15:55:52 | INFO | Collecting MIDs for: 2026-06-23
|
||||
2026-06-24 15:55:52 | INFO | Found 863 MIDs
|
||||
2026-06-24 15:55:52 | WARNING | Pipeline failed. Retry 1/3. Error: 'tables'
|
||||
2026-06-24 15:55:57 | INFO | ================================================================================
|
||||
2026-06-24 15:55:57 | INFO | Hello from data-move Python data pipeline!
|
||||
2026-06-24 15:55:57 | INFO | Pipeline Run Date: 2026-06-23
|
||||
2026-06-24 15:55:57 | INFO | Connecting to databases...
|
||||
2026-06-24 15:55:58 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000002C81600B550>
|
||||
2026-06-24 15:55:58 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000002C8162F5A90>
|
||||
2026-06-24 15:55:59 | INFO | Database connections established
|
||||
2026-06-24 15:55:59 | INFO | Collecting MIDs for: 2026-06-23
|
||||
2026-06-24 15:55:59 | INFO | Found 863 MIDs
|
||||
2026-06-24 15:55:59 | WARNING | Pipeline failed. Retry 2/3. Error: 'tables'
|
||||
2026-06-24 15:56:04 | INFO | ================================================================================
|
||||
2026-06-24 15:56:04 | INFO | Hello from data-move Python data pipeline!
|
||||
2026-06-24 15:56:04 | INFO | Pipeline Run Date: 2026-06-23
|
||||
2026-06-24 15:56:04 | INFO | Connecting to databases...
|
||||
2026-06-24 15:56:04 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000002C8162F59A0>
|
||||
2026-06-24 15:56:05 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000002C8162F68A0>
|
||||
2026-06-24 15:56:05 | INFO | Database connections established
|
||||
2026-06-24 15:56:05 | INFO | Collecting MIDs for: 2026-06-23
|
||||
2026-06-24 15:56:05 | INFO | Found 863 MIDs
|
||||
2026-06-24 16:10:30 | INFO | Pipeline Start Date: 2026-06-23
|
||||
2026-06-24 16:10:30 | INFO | ================================================================================
|
||||
2026-06-24 16:10:30 | INFO | Hello from data-move Python data pipeline!
|
||||
2026-06-24 16:10:30 | INFO | Pipeline Run Date: 2026-06-23
|
||||
2026-06-24 16:10:30 | INFO | Connecting to databases...
|
||||
2026-06-24 16:10:31 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001B29EE8C470>
|
||||
2026-06-24 16:10:32 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001B29FEBCA50>
|
||||
2026-06-24 16:10:32 | INFO | Database connections established
|
||||
2026-06-24 16:10:32 | INFO | Collecting MIDs for: 2026-06-23
|
||||
2026-06-24 16:10:33 | INFO | Found 863 MIDs
|
||||
2026-06-24 16:10:33 | WARNING | Pipeline failed. Retry 1/3. Error: 'tables'
|
||||
2026-06-24 16:10:38 | INFO | ================================================================================
|
||||
2026-06-24 16:10:38 | INFO | Hello from data-move Python data pipeline!
|
||||
2026-06-24 16:10:38 | INFO | Pipeline Run Date: 2026-06-23
|
||||
2026-06-24 16:10:38 | INFO | Connecting to databases...
|
||||
2026-06-24 16:10:38 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001B29FBB7550>
|
||||
2026-06-24 16:10:39 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001B29FECD9A0>
|
||||
2026-06-24 16:10:40 | INFO | Database connections established
|
||||
2026-06-24 16:10:40 | INFO | Collecting MIDs for: 2026-06-23
|
||||
2026-06-24 16:10:40 | INFO | Found 863 MIDs
|
||||
2026-06-24 16:10:40 | WARNING | Pipeline failed. Retry 2/3. Error: 'tables'
|
||||
2026-06-24 16:10:45 | INFO | ================================================================================
|
||||
2026-06-24 16:10:45 | INFO | Hello from data-move Python data pipeline!
|
||||
2026-06-24 16:10:45 | INFO | Pipeline Run Date: 2026-06-23
|
||||
2026-06-24 16:10:45 | INFO | Connecting to databases...
|
||||
2026-06-24 16:10:45 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001B29FECD8B0>
|
||||
2026-06-24 16:10:46 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001B29FECE7B0>
|
||||
2026-06-24 16:10:46 | INFO | Database connections established
|
||||
2026-06-24 16:10:46 | INFO | Collecting MIDs for: 2026-06-23
|
||||
2026-06-24 16:10:47 | INFO | Found 863 MIDs
|
||||
2026-06-24 16:12:02 | INFO | Pipeline Start Date: 2026-06-23
|
||||
2026-06-24 16:12:02 | INFO | ================================================================================
|
||||
2026-06-24 16:12:02 | INFO | Hello from data-move Python data pipeline!
|
||||
2026-06-24 16:12:02 | INFO | Pipeline Run Date: 2026-06-23
|
||||
2026-06-24 16:12:02 | INFO | Connecting to databases...
|
||||
2026-06-24 16:12:02 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x00000258DBAE05D0>
|
||||
2026-06-24 16:12:03 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x00000258DCB8CA50>
|
||||
2026-06-24 16:12:04 | INFO | Database connections established
|
||||
2026-06-24 16:12:04 | INFO | Collecting MIDs for: 2026-06-23
|
||||
2026-06-24 16:12:04 | INFO | Found 863 MIDs
|
||||
2026-06-24 16:12:05 | INFO | Pipeline Run Date: 2026-06-23
|
||||
2026-06-24 16:12:05 | INFO | ================================================================================
|
||||
2026-06-24 16:12:05 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT
|
||||
2026-06-24 16:12:05 | INFO | Fetching Data from sql server for table-: OQaD ..............
|
||||
2026-06-24 16:12:05 | INFO | Exists: True
|
||||
2026-06-24 16:12:05 | INFO | Path: D:\z\airflow-project\src\sql\fact\OQaD.sql
|
||||
2026-06-24 16:12:05 | INFO | Fetching data for 863 EMPIDs
|
||||
2026-06-24 16:12:05 | INFO | Fetching OQaD data for run_date=2026-06-23
|
||||
2026-06-24 16:12:11 | INFO | fn name is fetch_OQad ------Fetched 458 rows
|
||||
2026-06-24 16:12:11 | INFO | Fetched total row -: 458 from sql server for table-:OQaD ...........!!!
|
||||
2026-06-24 16:12:11 | INFO | Fetched 458 rows
|
||||
2026-06-24 16:12:11 | INFO | _ _ _ _ Deleting Data from ClickHouse for OQaD _ _ _ _
|
||||
2026-06-24 16:12:11 | INFO | _ _ _ _Inserting data into clickhouse db from sql server_ _ _ _
|
||||
2026-06-24 16:12:11 | ERROR | Failed processing table OQaD
|
||||
Traceback (most recent call last):
|
||||
File "D:\z\airflow-project\elt.py", line 243, in elt
|
||||
load_to_clickhouse(
|
||||
~~~~~~~~~~~~~~~~~~^
|
||||
client=client,
|
||||
^^^^^^^^^^^^^^
|
||||
table_name=table_name,
|
||||
^^^^^^^^^^^^^^^^^^^^^^
|
||||
df=df,
|
||||
^^^^^^
|
||||
)
|
||||
^
|
||||
File "D:\z\airflow-project\clickhouse_task\load_table.py", line 28, in load_to_clickhouse
|
||||
chunk_df = df.iloc[start:end]
|
||||
^^^^^^^
|
||||
AttributeError: 'DataFrame' object has no attribute 'iloc'
|
||||
2026-06-24 16:12:11 | WARNING | Pipeline failed. Retry 1/3. Error: 'DataFrame' object has no attribute 'iloc'
|
||||
2026-06-24 16:12:16 | INFO | ================================================================================
|
||||
2026-06-24 16:12:16 | INFO | Hello from data-move Python data pipeline!
|
||||
2026-06-24 16:12:16 | INFO | Pipeline Run Date: 2026-06-23
|
||||
2026-06-24 16:12:16 | INFO | Connecting to databases...
|
||||
2026-06-24 16:12:17 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x00000258DCB6C650>
|
||||
2026-06-24 16:12:17 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x00000258DCB99A90>
|
||||
2026-06-24 16:12:18 | INFO | Database connections established
|
||||
2026-06-24 16:12:18 | INFO | Collecting MIDs for: 2026-06-23
|
||||
2026-06-24 16:12:18 | INFO | Found 863 MIDs
|
||||
2026-06-24 16:12:18 | INFO | Pipeline Run Date: 2026-06-23
|
||||
2026-06-24 16:12:18 | INFO | ================================================================================
|
||||
2026-06-24 16:12:18 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT
|
||||
2026-06-24 16:12:18 | INFO | Fetching Data from sql server for table-: OQaD ..............
|
||||
2026-06-24 16:12:18 | INFO | Exists: True
|
||||
2026-06-24 16:12:18 | INFO | Path: D:\z\airflow-project\src\sql\fact\OQaD.sql
|
||||
2026-06-24 16:12:18 | INFO | Fetching data for 863 EMPIDs
|
||||
2026-06-24 16:12:18 | INFO | Fetching OQaD data for run_date=2026-06-23
|
||||
2026-06-24 16:12:24 | INFO | fn name is fetch_OQad ------Fetched 458 rows
|
||||
2026-06-24 16:12:24 | INFO | Fetched total row -: 458 from sql server for table-:OQaD ...........!!!
|
||||
2026-06-24 16:12:24 | INFO | Fetched 458 rows
|
||||
2026-06-24 16:12:24 | INFO | _ _ _ _ Deleting Data from ClickHouse for OQaD _ _ _ _
|
||||
2026-06-24 16:12:24 | INFO | _ _ _ _Inserting data into clickhouse db from sql server_ _ _ _
|
||||
2026-06-24 16:12:24 | ERROR | Failed processing table OQaD
|
||||
Traceback (most recent call last):
|
||||
File "D:\z\airflow-project\elt.py", line 243, in elt
|
||||
load_to_clickhouse(
|
||||
~~~~~~~~~~~~~~~~~~^
|
||||
client=client,
|
||||
^^^^^^^^^^^^^^
|
||||
table_name=table_name,
|
||||
^^^^^^^^^^^^^^^^^^^^^^
|
||||
df=df,
|
||||
^^^^^^
|
||||
)
|
||||
^
|
||||
File "D:\z\airflow-project\clickhouse_task\load_table.py", line 28, in load_to_clickhouse
|
||||
chunk_df = df.iloc[start:end]
|
||||
^^^^^^^
|
||||
AttributeError: 'DataFrame' object has no attribute 'iloc'
|
||||
2026-06-24 16:12:24 | WARNING | Pipeline failed. Retry 2/3. Error: 'DataFrame' object has no attribute 'iloc'
|
||||
2026-06-24 16:12:29 | INFO | ================================================================================
|
||||
2026-06-24 16:12:29 | INFO | Hello from data-move Python data pipeline!
|
||||
2026-06-24 16:12:29 | INFO | Pipeline Run Date: 2026-06-23
|
||||
2026-06-24 16:12:29 | INFO | Connecting to databases...
|
||||
2026-06-24 16:12:30 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x00000258DCB9A120>
|
||||
2026-06-24 16:12:30 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x00000258DCB9A8A0>
|
||||
2026-06-24 16:12:31 | INFO | Database connections established
|
||||
2026-06-24 16:12:31 | INFO | Collecting MIDs for: 2026-06-23
|
||||
2026-06-24 16:12:31 | INFO | Found 863 MIDs
|
||||
2026-06-24 16:12:31 | INFO | Pipeline Run Date: 2026-06-23
|
||||
2026-06-24 16:12:31 | INFO | ================================================================================
|
||||
2026-06-24 16:12:31 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT
|
||||
2026-06-24 16:12:31 | INFO | Fetching Data from sql server for table-: OQaD ..............
|
||||
2026-06-24 16:12:31 | INFO | Exists: True
|
||||
2026-06-24 16:12:31 | INFO | Path: D:\z\airflow-project\src\sql\fact\OQaD.sql
|
||||
2026-06-24 16:12:31 | INFO | Fetching data for 863 EMPIDs
|
||||
2026-06-24 16:12:31 | INFO | Fetching OQaD data for run_date=2026-06-23
|
||||
2026-06-24 16:12:43 | INFO | fn name is fetch_OQad ------Fetched 458 rows
|
||||
2026-06-24 16:12:43 | INFO | Fetched total row -: 458 from sql server for table-:OQaD ...........!!!
|
||||
2026-06-24 16:12:43 | INFO | Fetched 458 rows
|
||||
2026-06-24 16:12:43 | INFO | _ _ _ _ Deleting Data from ClickHouse for OQaD _ _ _ _
|
||||
2026-06-24 16:12:43 | INFO | _ _ _ _Inserting data into clickhouse db from sql server_ _ _ _
|
||||
2026-06-24 16:12:43 | ERROR | Failed processing table OQaD
|
||||
Traceback (most recent call last):
|
||||
File "D:\z\airflow-project\elt.py", line 243, in elt
|
||||
load_to_clickhouse(
|
||||
~~~~~~~~~~~~~~~~~~^
|
||||
client=client,
|
||||
^^^^^^^^^^^^^^
|
||||
table_name=table_name,
|
||||
^^^^^^^^^^^^^^^^^^^^^^
|
||||
df=df,
|
||||
^^^^^^
|
||||
)
|
||||
^
|
||||
File "D:\z\airflow-project\clickhouse_task\load_table.py", line 28, in load_to_clickhouse
|
||||
chunk_df = df.iloc[start:end]
|
||||
^^^^^^^
|
||||
AttributeError: 'DataFrame' object has no attribute 'iloc'
|
||||
2026-06-24 16:17:01 | INFO | Pipeline Start Date: 2026-06-23
|
||||
2026-06-24 16:17:01 | INFO | ================================================================================
|
||||
2026-06-24 16:17:01 | INFO | Hello from data-move Python data pipeline!
|
||||
2026-06-24 16:17:01 | INFO | Pipeline Run Date: 2026-06-23
|
||||
2026-06-24 16:17:01 | INFO | Connecting to databases...
|
||||
2026-06-24 16:17:02 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x0000018D3B1F05D0>
|
||||
2026-06-24 16:17:03 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x0000018D3C29CA50>
|
||||
2026-06-24 16:17:04 | INFO | Database connections established
|
||||
2026-06-24 16:17:04 | INFO | Collecting MIDs for: 2026-06-23
|
||||
2026-06-24 16:17:04 | INFO | Found 863 MIDs
|
||||
2026-06-24 16:17:04 | INFO | Pipeline Run Date: 2026-06-23
|
||||
2026-06-24 16:17:04 | INFO | ================================================================================
|
||||
2026-06-24 16:17:04 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT
|
||||
2026-06-24 16:17:04 | INFO | Fetching Data from sql server for table-: OQaD ..............
|
||||
2026-06-24 16:17:04 | INFO | Exists: True
|
||||
2026-06-24 16:17:04 | INFO | Path: D:\z\airflow-project\src\sql\fact\OQaD.sql
|
||||
2026-06-24 16:17:04 | INFO | Fetching data for 863 EMPIDs
|
||||
2026-06-24 16:17:04 | INFO | Fetching OQaD data for run_date=2026-06-23
|
||||
2026-06-24 16:17:10 | INFO | fn name is fetch_OQad ------Fetched 458 rows
|
||||
2026-06-24 16:17:10 | INFO | Fetched total row -: 458 from sql server for table-:OQaD ...........!!!
|
||||
2026-06-24 16:17:10 | INFO | Fetched 458 rows
|
||||
2026-06-24 16:17:10 | INFO | _ _ _ _ Deleting Data from ClickHouse for OQaD _ _ _ _
|
||||
2026-06-24 16:17:11 | INFO | _ _ _ _Inserting data into clickhouse db from sql server_ _ _ _
|
||||
2026-06-24 16:17:11 | INFO | Inserted rows 0 to 458
|
||||
2026-06-24 16:17:11 | INFO | OQaD: inserted 458 rows into ClickHouse
|
||||
2026-06-24 16:17:11 | INFO | OQaD loaded successfully (458 rows)
|
||||
2026-06-24 16:17:11 | INFO | ================================================================================
|
||||
2026-06-24 16:17:11 | INFO | Pipeline Completed Successfully
|
||||
2026-06-24 16:17:11 | INFO | ================================================================================
|
||||
2026-06-24 16:17:11 | INFO | Pipeline completed successfully. pipeline_trigeered_on_date=2026-06-24last_successful_run_date=2026-06-23
|
||||
2026-06-24 16:17:31 | INFO | Pipeline Start Date: 2026-06-23
|
||||
2026-06-24 16:17:31 | INFO | ================================================================================
|
||||
2026-06-24 16:17:31 | INFO | Hello from data-move Python data pipeline!
|
||||
2026-06-24 16:17:31 | INFO | Pipeline Run Date: 2026-06-23
|
||||
2026-06-24 16:17:31 | INFO | Connecting to databases...
|
||||
2026-06-24 16:17:31 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x0000022F667F05D0>
|
||||
2026-06-24 16:17:33 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x0000022F678CCA50>
|
||||
2026-06-24 16:17:33 | INFO | Database connections established
|
||||
2026-06-24 16:17:33 | INFO | Collecting MIDs for: 2026-06-23
|
||||
2026-06-24 16:17:34 | INFO | Found 863 MIDs
|
||||
2026-06-24 16:17:34 | INFO | Pipeline Run Date: 2026-06-23
|
||||
2026-06-24 16:17:34 | INFO | ================================================================================
|
||||
2026-06-24 16:17:34 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT
|
||||
2026-06-24 16:17:34 | INFO | Fetching Data from sql server for table-: OQaD ..............
|
||||
2026-06-24 16:17:34 | INFO | Exists: True
|
||||
2026-06-24 16:17:34 | INFO | Path: D:\z\airflow-project\src\sql\fact\OQaD.sql
|
||||
2026-06-24 16:17:34 | INFO | Fetching data for 863 EMPIDs
|
||||
2026-06-24 16:17:34 | INFO | Fetching OQaD data for run_date=2026-06-23
|
||||
2026-06-24 16:17:40 | INFO | fn name is fetch_OQad ------Fetched 458 rows
|
||||
2026-06-24 16:17:40 | INFO | Fetched total row -: 458 from sql server for table-:OQaD ...........!!!
|
||||
2026-06-24 16:17:40 | INFO | Fetched 458 rows
|
||||
2026-06-24 16:17:40 | INFO | _ _ _ _ Deleting Data from ClickHouse for OQaD _ _ _ _
|
||||
2026-06-24 16:17:41 | INFO | _ _ _ _Inserting data into clickhouse db from sql server_ _ _ _
|
||||
2026-06-24 16:17:41 | INFO | Inserted rows 0 to 458
|
||||
2026-06-24 16:17:41 | INFO | OQaD: inserted 458 rows into ClickHouse
|
||||
2026-06-24 16:17:41 | INFO | OQaD loaded successfully (458 rows)
|
||||
2026-06-24 16:17:41 | INFO | ================================================================================
|
||||
2026-06-24 16:17:41 | INFO | Pipeline Completed Successfully
|
||||
2026-06-24 16:17:41 | INFO | ================================================================================
|
||||
2026-06-24 16:17:41 | INFO | Pipeline completed successfully. pipeline_trigeered_on_date=2026-06-24last_successful_run_date=2026-06-23
|
||||
@@ -0,0 +1,90 @@
|
||||
2026-06-25 10:23:52 | INFO | Pipeline Start Date: 2026-06-24
|
||||
2026-06-25 10:23:52 | INFO | ================================================================================
|
||||
2026-06-25 10:23:52 | INFO | Hello from data-move Python data pipeline!
|
||||
2026-06-25 10:23:52 | INFO | Pipeline Run Date: 2026-06-24
|
||||
2026-06-25 10:23:52 | INFO | Connecting to databases...
|
||||
2026-06-25 10:23:54 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001D3489C90D0>
|
||||
2026-06-25 10:23:57 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001D349ACA490>
|
||||
2026-06-25 10:23:58 | INFO | Database connections established
|
||||
2026-06-25 10:23:58 | INFO | Collecting MIDs for: 2026-06-24
|
||||
2026-06-25 10:23:58 | INFO | Found 868 MIDs
|
||||
2026-06-25 10:23:59 | INFO | Pipeline Run Date: 2026-06-24
|
||||
2026-06-25 10:23:59 | INFO | ================================================================================
|
||||
2026-06-25 10:23:59 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT
|
||||
2026-06-25 10:23:59 | INFO | Fetching Data from sql server for table-: OQaD ..............
|
||||
2026-06-25 10:23:59 | ERROR | Failed processing table OQaD
|
||||
Traceback (most recent call last):
|
||||
File "D:\z\airflow-project\elt2.py", line 172, in elt
|
||||
df=registry[f"fetch_{table_name}"]()(sql_engine, table_name , table_type, mids, run_date)
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^
|
||||
TypeError: fetch_OQaD() missing 5 required positional arguments: 'engine', 'table_name', 'table_type', 'empids', and 'run_date'
|
||||
2026-06-25 10:23:59 | WARNING | Pipeline failed. Retry 1/3. Error: fetch_OQaD() missing 5 required positional arguments: 'engine', 'table_name', 'table_type', 'empids', and 'run_date'
|
||||
2026-06-25 10:24:04 | INFO | ================================================================================
|
||||
2026-06-25 10:24:04 | INFO | Hello from data-move Python data pipeline!
|
||||
2026-06-25 10:24:04 | INFO | Pipeline Run Date: 2026-06-24
|
||||
2026-06-25 10:24:04 | INFO | Connecting to databases...
|
||||
2026-06-25 10:24:05 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001D349AC3950>
|
||||
2026-06-25 10:24:05 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001D349B68E60>
|
||||
2026-06-25 10:24:06 | INFO | Database connections established
|
||||
2026-06-25 10:24:06 | INFO | Collecting MIDs for: 2026-06-24
|
||||
2026-06-25 10:24:06 | INFO | Found 868 MIDs
|
||||
2026-06-25 10:24:06 | INFO | Pipeline Run Date: 2026-06-24
|
||||
2026-06-25 10:24:06 | INFO | ================================================================================
|
||||
2026-06-25 10:24:06 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT
|
||||
2026-06-25 10:24:06 | INFO | Fetching Data from sql server for table-: OQaD ..............
|
||||
2026-06-25 10:24:06 | ERROR | Failed processing table OQaD
|
||||
Traceback (most recent call last):
|
||||
File "D:\z\airflow-project\elt2.py", line 172, in elt
|
||||
df=registry[f"fetch_{table_name}"]()(sql_engine, table_name , table_type, mids, run_date)
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^
|
||||
TypeError: fetch_OQaD() missing 5 required positional arguments: 'engine', 'table_name', 'table_type', 'empids', and 'run_date'
|
||||
2026-06-25 10:24:06 | WARNING | Pipeline failed. Retry 2/3. Error: fetch_OQaD() missing 5 required positional arguments: 'engine', 'table_name', 'table_type', 'empids', and 'run_date'
|
||||
2026-06-25 10:24:11 | INFO | ================================================================================
|
||||
2026-06-25 10:24:11 | INFO | Hello from data-move Python data pipeline!
|
||||
2026-06-25 10:24:11 | INFO | Pipeline Run Date: 2026-06-24
|
||||
2026-06-25 10:24:11 | INFO | Connecting to databases...
|
||||
2026-06-25 10:24:12 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001D349B68D70>
|
||||
2026-06-25 10:24:12 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000001D349B69C70>
|
||||
2026-06-25 10:24:12 | INFO | Database connections established
|
||||
2026-06-25 10:24:12 | INFO | Collecting MIDs for: 2026-06-24
|
||||
2026-06-25 10:24:13 | INFO | Found 868 MIDs
|
||||
2026-06-25 10:24:13 | INFO | Pipeline Run Date: 2026-06-24
|
||||
2026-06-25 10:24:13 | INFO | ================================================================================
|
||||
2026-06-25 10:24:13 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT
|
||||
2026-06-25 10:24:13 | INFO | Fetching Data from sql server for table-: OQaD ..............
|
||||
2026-06-25 10:24:13 | ERROR | Failed processing table OQaD
|
||||
Traceback (most recent call last):
|
||||
File "D:\z\airflow-project\elt2.py", line 172, in elt
|
||||
df=registry[f"fetch_{table_name}"]()(sql_engine, table_name , table_type, mids, run_date)
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^
|
||||
TypeError: fetch_OQaD() missing 5 required positional arguments: 'engine', 'table_name', 'table_type', 'empids', and 'run_date'
|
||||
2026-06-25 11:19:10 | INFO | Pipeline Start Date: 2026-06-24
|
||||
2026-06-25 11:19:10 | INFO | ================================================================================
|
||||
2026-06-25 11:19:10 | INFO | Hello from data-move Python data pipeline!
|
||||
2026-06-25 11:19:10 | INFO | Pipeline Run Date: 2026-06-24
|
||||
2026-06-25 11:19:10 | INFO | Connecting to databases...
|
||||
2026-06-25 11:19:11 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000002B17CBD0890>
|
||||
2026-06-25 11:19:13 | INFO | <sqlalchemy.engine.cursor.CursorResult object at 0x000002B17DCE0910>
|
||||
2026-06-25 11:19:14 | INFO | Database connections established
|
||||
2026-06-25 11:19:14 | INFO | Collecting MIDs for: 2026-06-24
|
||||
2026-06-25 11:19:14 | INFO | Found 868 MIDs
|
||||
2026-06-25 11:19:15 | INFO | Pipeline Run Date: 2026-06-24
|
||||
2026-06-25 11:19:15 | INFO | ================================================================================
|
||||
2026-06-25 11:19:15 | INFO | Processing Table-:OQaD | Table type -:FACT | fetcht by-:run_date | operation-:INSERT
|
||||
2026-06-25 11:19:15 | INFO | Fetching Data from sql server for table-: OQaD ..............
|
||||
2026-06-25 11:19:15 | INFO | Exists: True
|
||||
2026-06-25 11:19:15 | INFO | Path: D:\z\airflow-project\src\sql\fact\OQaD.sql
|
||||
2026-06-25 11:19:15 | INFO | Fetching data for 868 EMPIDs
|
||||
2026-06-25 11:19:15 | INFO | Fetching OQaD data for run_date=2026-06-24
|
||||
2026-06-25 11:19:21 | INFO | fn name is fetch_OQad ------Fetched 469 rows
|
||||
2026-06-25 11:19:21 | INFO | Fetched total row -: 469 from sql server for table-:OQaD ...........!!!
|
||||
2026-06-25 11:19:21 | INFO | Fetched 469 rows
|
||||
2026-06-25 11:19:21 | INFO | _ _ _ _ Deleting Data from ClickHouse for OQaD _ _ _ _
|
||||
2026-06-25 11:19:21 | INFO | _ _ _ _Inserting data into clickhouse db from sql server_ _ _ _
|
||||
2026-06-25 11:19:21 | INFO | Inserted rows 0 to 469
|
||||
2026-06-25 11:19:21 | INFO | OQaD: inserted 469 rows into ClickHouse
|
||||
2026-06-25 11:19:21 | INFO | OQaD loaded successfully (469 rows)
|
||||
2026-06-25 11:19:21 | INFO | ================================================================================
|
||||
2026-06-25 11:19:21 | INFO | Pipeline Completed Successfully
|
||||
2026-06-25 11:19:21 | INFO | ================================================================================
|
||||
2026-06-25 11:19:21 | INFO | Pipeline completed successfully. pipeline_trigeered_on_date=2026-06-25last_successful_run_date=2026-06-24
|
||||
@@ -46,113 +46,13 @@ from mids import (
|
||||
from src.bridge import *
|
||||
from src.fact import *
|
||||
from src.dim import *
|
||||
|
||||
|
||||
# ==========================================================
|
||||
# Helpers
|
||||
# ==========================================================
|
||||
|
||||
def get_dates_from_yaml(filename: str):
|
||||
with open(filename, "r") as file:
|
||||
data = yaml.safe_load(file)
|
||||
|
||||
start_date = date.fromisoformat(
|
||||
str(data["pipeline"]["start_date"])
|
||||
)
|
||||
|
||||
end_date = date.fromisoformat(
|
||||
str(data["pipeline"]["end_date"])
|
||||
)
|
||||
flag=str(data["pipeline"]["flag"])
|
||||
|
||||
return start_date, end_date , flag
|
||||
|
||||
|
||||
def write_table_to_yaml(
|
||||
data: dict,
|
||||
run_date: date,
|
||||
filename: str | None = None
|
||||
):
|
||||
"""Write table data to a YAML file."""
|
||||
|
||||
if filename is None:
|
||||
filename = f"elt_pipeline_{run_date}.yml"
|
||||
|
||||
with open(filename, "w") as file:
|
||||
yaml.dump(
|
||||
data,
|
||||
file,
|
||||
default_flow_style=False,
|
||||
sort_keys=False
|
||||
)
|
||||
|
||||
print(f"Table written to {filename}")
|
||||
from elt import *
|
||||
|
||||
|
||||
|
||||
|
||||
def table_exists(
|
||||
client,
|
||||
table_name: str,
|
||||
) -> bool:
|
||||
|
||||
return bool(
|
||||
client.command(
|
||||
f"EXISTS TABLE {table_name}"
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
# ==========================================================
|
||||
# Main
|
||||
# ==========================================================
|
||||
|
||||
def elt(run_date : date):
|
||||
|
||||
log.info("=" * 80)
|
||||
log.info("Hello from data-move Python data pipeline!")
|
||||
|
||||
# ------------------------------------------------------
|
||||
# Run Date
|
||||
# ------------------------------------------------------
|
||||
|
||||
|
||||
|
||||
log.info(
|
||||
"Pipeline Run Date: %s",
|
||||
run_date,
|
||||
)
|
||||
|
||||
# ------------------------------------------------------
|
||||
# Connections
|
||||
# ------------------------------------------------------
|
||||
|
||||
log.info(
|
||||
"Connecting to databases..."
|
||||
)
|
||||
|
||||
sql_engine = build_sql_server_engine()
|
||||
clickhouse_engine = build_clickhouse_engine()
|
||||
client = get_clickhouse_client()
|
||||
|
||||
log.info(
|
||||
"Database connections established"
|
||||
)
|
||||
|
||||
# ------------------------------------------------------
|
||||
# mids Keys
|
||||
# ------------------------------------------------------
|
||||
|
||||
mids = MID_TABLE_COV(
|
||||
sql_engine,
|
||||
run_date,
|
||||
)
|
||||
|
||||
emp_visit_df = MID_TABLE_COV1(
|
||||
sql_engine,
|
||||
run_date,
|
||||
)
|
||||
|
||||
def main() :
|
||||
|
||||
# ------------------------------------------------------
|
||||
# Config
|
||||
@@ -163,128 +63,8 @@ def elt(run_date : date):
|
||||
"r",
|
||||
) as file:
|
||||
|
||||
config = yaml.safe_load(file)
|
||||
table_config = yaml.safe_load(file)
|
||||
|
||||
# ------------------------------------------------------
|
||||
# Process Tables
|
||||
# ------------------------------------------------------
|
||||
for table in config["tables"]:
|
||||
table_name = table["name"]
|
||||
operation = table["operation"]
|
||||
fetch_by = table["fetch_by"]
|
||||
table_type=table["type"]
|
||||
|
||||
log.info("=" * 80)
|
||||
log.info(f"Processing Table-:{table_name} | Table type -:{table_type} | fetcht by-:{fetch_by} | operation-:{operation}" )
|
||||
|
||||
try:
|
||||
|
||||
# ------------------------------------------
|
||||
# Fetch Data
|
||||
# ------------------------------------------
|
||||
|
||||
|
||||
log.info(f"Fetching Data from sql server for table-: {table_name} ..............")
|
||||
fetch_list=["mids" ,"run_date", "reason_id"]
|
||||
if fetch_by in fetch_list :
|
||||
fn_name = f"fetch_{table_name}"
|
||||
fn = globals()[fn_name]
|
||||
df=fn(sql_engine, table_name , table_type, mids, run_date)
|
||||
else:
|
||||
df = fetch_data(sql_engine ,table_name,table_type)
|
||||
|
||||
log.info(f"Fetched total row -: {len(df)} from sql server for table-:{table_name} ...........!!!")
|
||||
|
||||
if df.is_empty():
|
||||
|
||||
log.warning(
|
||||
"%s returned no rows",
|
||||
table_name,
|
||||
)
|
||||
|
||||
continue
|
||||
|
||||
log.info(
|
||||
"Fetched %s rows",
|
||||
len(df),
|
||||
)
|
||||
|
||||
# ------------------------------------------
|
||||
# Create Table If Missing
|
||||
# ------------------------------------------
|
||||
|
||||
exists = table_exists(
|
||||
client,
|
||||
table_name,
|
||||
)
|
||||
|
||||
if not exists:
|
||||
|
||||
log.info(
|
||||
"Creating table %s",
|
||||
table_name,
|
||||
)
|
||||
|
||||
create_clickhouse_table(
|
||||
df=df,
|
||||
table_name=table_name,
|
||||
clickhouse_engine=clickhouse_engine,
|
||||
)
|
||||
|
||||
# ------------------------------------------
|
||||
# Existing Table Logic
|
||||
# ------------------------------------------
|
||||
|
||||
else:
|
||||
|
||||
if operation == "DELETE+INSERT":
|
||||
|
||||
truncate_table(
|
||||
client,
|
||||
table_name,
|
||||
)
|
||||
|
||||
else:
|
||||
|
||||
delete_existing_data(
|
||||
client=client,
|
||||
table_name=table_name,
|
||||
run_date=run_date,
|
||||
mids=mids,
|
||||
emp_visit_df=emp_visit_df,
|
||||
)
|
||||
|
||||
# ------------------------------------------
|
||||
# Load Data
|
||||
# ------------------------------------------
|
||||
log.info("_ _ _ _Inserting data into clickhouse db from sql server_ _ _ _")
|
||||
load_to_clickhouse(
|
||||
client=client,
|
||||
table_name=table_name,
|
||||
df=df,
|
||||
)
|
||||
|
||||
log.info(
|
||||
"%s loaded successfully (%s rows)",
|
||||
table_name,
|
||||
len(df),
|
||||
)
|
||||
|
||||
except Exception:
|
||||
|
||||
log.exception(
|
||||
"Failed processing table %s",
|
||||
table_name,
|
||||
)
|
||||
|
||||
raise
|
||||
|
||||
log.info("=" * 80)
|
||||
log.info("Pipeline Completed Successfully")
|
||||
log.info("=" * 80)
|
||||
|
||||
|
||||
def main() :
|
||||
|
||||
config_file = Path("Pipeline_config.yml")
|
||||
|
||||
@@ -336,7 +116,7 @@ def main() :
|
||||
|
||||
for attempt in range(3):
|
||||
try:
|
||||
elt(run_date)
|
||||
elt(run_date , table_config)
|
||||
|
||||
successful_dates.append({
|
||||
'pipeline_trigeered_on_date': str(date.today()),
|
||||
@@ -395,6 +175,7 @@ def main() :
|
||||
sort_keys=False)
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
main()
|
||||
@@ -0,0 +1,181 @@
|
||||
# /// script
|
||||
# requires-python = ">=3.11"
|
||||
# dependencies = [
|
||||
# "polars>=0.20.0",
|
||||
# "pyarrow>=18.0.0",
|
||||
# "sqlalchemy>=2.0.0",
|
||||
# "pyodbc>=5.0.0",
|
||||
# "clickhouse-connect>=0.7.0",
|
||||
# "clickhouse-sqlalchemy>=0.3.2",
|
||||
# "pyyaml>=6.0.3",
|
||||
# "python-dotenv>=1.0.0",
|
||||
# ]
|
||||
# ///
|
||||
|
||||
|
||||
from __future__ import annotations
|
||||
from time import sleep
|
||||
import sys
|
||||
from datetime import date, datetime, timedelta
|
||||
|
||||
import polars as pl
|
||||
import yaml
|
||||
|
||||
|
||||
from log import log
|
||||
|
||||
from clickhouse_task.create_table import create_clickhouse_table
|
||||
from clickhouse_task.delete_task import (
|
||||
delete_existing_data,
|
||||
truncate_table,
|
||||
)
|
||||
|
||||
from clickhouse_task.load_table import load_to_clickhouse
|
||||
|
||||
from db_con.connection import (
|
||||
build_sql_server_engine,
|
||||
build_clickhouse_engine,
|
||||
get_clickhouse_client,
|
||||
)
|
||||
|
||||
from mids import (
|
||||
MID_TABLE_COV,
|
||||
MID_TABLE_COV1,
|
||||
)
|
||||
|
||||
from src.bridge import *
|
||||
from src.fact_updated import *
|
||||
from src.dim import *
|
||||
from elt2 import *
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def main() :
|
||||
|
||||
# ------------------------------------------------------
|
||||
# Config
|
||||
# ------------------------------------------------------
|
||||
|
||||
with open(
|
||||
"y.yml",
|
||||
"r",
|
||||
) as file:
|
||||
|
||||
table_config = yaml.safe_load(file)
|
||||
|
||||
|
||||
config_file = Path("Pipeline_config.yml")
|
||||
|
||||
if not config_file.exists():
|
||||
default_config = {
|
||||
"pipeline": {
|
||||
"run_date": None,
|
||||
"status": None,
|
||||
"error_message": None,
|
||||
}
|
||||
}
|
||||
|
||||
with open(config_file, "w") as f:
|
||||
yaml.safe_dump(default_config, f)
|
||||
|
||||
with open(config_file, "r") as f:
|
||||
config = yaml.safe_load(f)
|
||||
|
||||
|
||||
|
||||
p_start_date, p_end_date , flag= get_dates_from_yaml("elt_pipeline_custom_dates.yml")
|
||||
if flag =="Y" :
|
||||
start_date=p_start_date
|
||||
end_date=p_end_date
|
||||
|
||||
elif len(sys.argv) > 1:
|
||||
start_date = datetime.strptime(
|
||||
sys.argv[1],
|
||||
"%Y-%m-%d",
|
||||
).date()
|
||||
end_date=start_date + timedelta(days=1)
|
||||
else:
|
||||
start_date = date.today() - timedelta(days=1)
|
||||
end_date=start_date
|
||||
|
||||
log.info(
|
||||
"Pipeline Start Date: %s",
|
||||
start_date,
|
||||
)
|
||||
|
||||
|
||||
failed_dates=[]
|
||||
successful_dates=[]
|
||||
filename_successful = "successful_Pipeline_dates_config.yml"
|
||||
filename_failed = "failed_Pipeline_dates_config.yml"
|
||||
|
||||
while start_date <=end_date:
|
||||
run_date = start_date
|
||||
|
||||
for attempt in range(3):
|
||||
try:
|
||||
elt(run_date , table_config)
|
||||
|
||||
successful_dates.append({
|
||||
'pipeline_trigeered_on_date': str(date.today()),
|
||||
'last_successful_run_date': run_date,
|
||||
})
|
||||
|
||||
log.info(
|
||||
f"Pipeline completed successfully. "
|
||||
f"pipeline_trigeered_on_date={date.today()}"
|
||||
f"last_successful_run_date={run_date}"
|
||||
)
|
||||
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
|
||||
|
||||
|
||||
failed_dates.append({
|
||||
'pipeline_trigeered_on_date': str(date.today()),
|
||||
'failed_run_date': run_date,
|
||||
"attempt" : attempt
|
||||
})
|
||||
|
||||
|
||||
if attempt == 2:
|
||||
raise
|
||||
|
||||
log.warning(
|
||||
f"Pipeline failed. Retry {attempt + 1}/3. Error: {e}"
|
||||
)
|
||||
|
||||
sleep(5)
|
||||
|
||||
|
||||
start_date=start_date + timedelta(days=1)
|
||||
|
||||
|
||||
|
||||
with open(filename_successful, "w") as f:
|
||||
yaml.dump(
|
||||
successful_dates,
|
||||
f,
|
||||
default_flow_style=False,
|
||||
sort_keys=False,
|
||||
)
|
||||
if len(failed_dates) == 0 :
|
||||
failed_dates.append({
|
||||
'pipeline_trigeered_on_date': str(date.today()),
|
||||
'failed_run_date': "none",
|
||||
"attempt" : "none"
|
||||
})
|
||||
with open(filename_failed, "w") as f:
|
||||
yaml.dump(failed_dates,
|
||||
f, default_flow_style=False,
|
||||
sort_keys=False)
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
main()
|
||||
+290
-290
@@ -135,175 +135,6 @@ def fetch_additional_visibility( engine: Engine,
|
||||
|
||||
|
||||
|
||||
def fetch_OQaD(
|
||||
sql_engine: Engine,
|
||||
table_name: str,
|
||||
table_type: str,
|
||||
mids: list[int],
|
||||
run_date: date
|
||||
) -> pl.DataFrame:
|
||||
|
||||
# ─────────────────────────────────────────────
|
||||
# INNER HELPERS (defined once, used below)
|
||||
# ─────────────────────────────────────────────
|
||||
|
||||
client = get_clickhouse_client()
|
||||
|
||||
# ── Does a ClickHouse table exist? ────────────
|
||||
def table_exists(client, table_name: str) -> bool:
|
||||
|
||||
return bool(client.command(f"EXISTS TABLE {table_name}"))
|
||||
|
||||
# ── STEP 1: Who submitted yesterday in SQL Server? ───
|
||||
def fetch_quiz_empids(engine: Engine, run_date: date) -> pl.DataFrame:
|
||||
|
||||
|
||||
# Format date ONCE safely — avoids f-string injection bugs
|
||||
run_date_str = run_date.strftime("%Y-%m-%d")
|
||||
next_date_str = (run_date + timedelta(days=1)).strftime("%Y-%m-%d")
|
||||
|
||||
sql = f"""
|
||||
WITH MID_TABLE_COV1 AS
|
||||
(
|
||||
-- Records CREATED yesterday
|
||||
SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate
|
||||
FROM OneApp_KelloggsMT.dbo.T_OQAD
|
||||
WHERE CreateDate >= '{run_date_str}'
|
||||
AND CreateDate < '{next_date_str}'
|
||||
|
||||
UNION ALL
|
||||
|
||||
-- Records UPDATED yesterday (different rows, safe to UNION ALL)
|
||||
SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate
|
||||
FROM OneApp_KelloggsMT.dbo.T_OQAD
|
||||
WHERE UpdateDate >= '{run_date_str}'
|
||||
AND UpdateDate < '{next_date_str}'
|
||||
),
|
||||
QUIZ AS
|
||||
(
|
||||
SELECT DISTINCT
|
||||
E.EmpId AS empid,
|
||||
CAST(DQ.VisitDate AS DATE) AS visitdate
|
||||
FROM OneApp_KelloggsMT.dbo.T_OQAD DQ
|
||||
INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail E
|
||||
ON DQ.EmpId = E.EmpId
|
||||
INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Question QU
|
||||
ON DQ.QuestionId = QU.QuestionId
|
||||
INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Category QC
|
||||
ON QU.QuestionCategoryId = QC.QuestionCategoryId
|
||||
WHERE E.EmpName NOT LIKE '%TEST%' -- exclude test employees
|
||||
AND E.RightId = 6 -- only field reps
|
||||
AND (
|
||||
E.ResignDate IS NULL
|
||||
OR CAST(E.ResignDate AS DATE) >= '{run_date_str}'
|
||||
)
|
||||
AND EXISTS ( -- ✅ EXISTS beats IN for large sets
|
||||
SELECT 1
|
||||
FROM MID_TABLE_COV1 A
|
||||
WHERE A.EmpId = DQ.EmpId
|
||||
AND A.VisitDate = CAST(DQ.VisitDate AS DATE)
|
||||
)
|
||||
)
|
||||
SELECT * FROM QUIZ
|
||||
"""
|
||||
|
||||
log.info("Fetching quiz empids for run_date=%s", run_date_str)
|
||||
df = pl.read_database(query=sql, connection=engine)
|
||||
log.info("Fetched %s (EmpId, VisitDate) pairs from SQL Server", len(df))
|
||||
return df
|
||||
|
||||
# ── STEP 2: Who do we ALREADY have in ClickHouse? ───
|
||||
def get_empids_clickhouse_OQAD(
|
||||
client,
|
||||
table_name: str = "OQaD",
|
||||
) -> pl.DataFrame:
|
||||
|
||||
|
||||
if not table_exists(client, table_name):
|
||||
log.warning("Table '%s' does not exist in ClickHouse.", table_name)
|
||||
return pl.DataFrame(schema={"empid": pl.Int64, "visitdate": pl.Date})
|
||||
|
||||
query = f"""
|
||||
SELECT DISTINCT
|
||||
employee_id AS empid,
|
||||
visit_date AS visitdate
|
||||
FROM {table_name}
|
||||
"""
|
||||
|
||||
arrow_table = client.query_arrow(query)
|
||||
df = pl.from_arrow(arrow_table)
|
||||
log.info("Fetched %s existing (EmpId, VisitDate) pairs from ClickHouse", len(df))
|
||||
return df
|
||||
|
||||
# ── STEP 3: Who is NEW? (in SQL Server but NOT yet in ClickHouse) ───
|
||||
def find_new_empids(
|
||||
sql_df: pl.DataFrame,
|
||||
ch_df: pl.DataFrame,
|
||||
) -> list[int]:
|
||||
|
||||
|
||||
new_df = sql_df.join(
|
||||
ch_df,
|
||||
on=["empid", "visitdate"],
|
||||
how="anti", # ✅ anti = keep rows NOT found in ch_df
|
||||
)
|
||||
|
||||
if new_df.is_empty():
|
||||
log.warning("No new EmpIds found for table=%s — nothing to fetch.", table_name)
|
||||
return [0] # sentinel value — the .sql WHERE will return 0 rows safely
|
||||
|
||||
empids = new_df["empid"].unique().to_list()
|
||||
log.info("Found %s NEW empids to fetch for %s", len(empids), table_name)
|
||||
return empids
|
||||
|
||||
# ── STEP 4: Fetch full quiz data for new empids ───
|
||||
def fetch_data(
|
||||
engine: Engine,
|
||||
table_name: str,
|
||||
table_type: str,
|
||||
empids: list[int],
|
||||
run_date: date,
|
||||
) -> pl.DataFrame:
|
||||
|
||||
|
||||
run_date_str = run_date.strftime("%Y-%m-%d")
|
||||
empid_list = ", ".join(str(e) for e in empids) # "101, 102, 103"
|
||||
|
||||
sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql"
|
||||
log.info("Loading SQL from: %s (exists=%s)", sql_file.resolve(), sql_file.exists())
|
||||
|
||||
with open(sql_file, "r", encoding="utf-8") as f:
|
||||
sql_template = f.read()
|
||||
|
||||
sql = sql_template.format(
|
||||
empid_list=empid_list,
|
||||
run_date=run_date_str,
|
||||
)
|
||||
|
||||
log.info("Fetching full OQaD data for %s empids, run_date=%s", len(empids), run_date_str)
|
||||
df = pl.read_database(query=sql, connection=engine)
|
||||
log.info("Fetched %s rows from SQL Server for table=%s", len(df), table_name)
|
||||
return df
|
||||
|
||||
# ─────────────────────────────────────────────
|
||||
# MAIN FLOW (the 4 steps, clearly sequenced)
|
||||
# ─────────────────────────────────────────────
|
||||
|
||||
qf = fetch_quiz_empids(sql_engine, run_date) # Step 1
|
||||
db_df = get_empids_clickhouse_OQAD(client, table_name) # Step 2
|
||||
empids = find_new_empids(qf, db_df) # Step 3
|
||||
|
||||
df = fetch_data( # Step 4
|
||||
engine=sql_engine,
|
||||
table_name=table_name,
|
||||
table_type=table_type,
|
||||
empids=empids,
|
||||
run_date=run_date,
|
||||
)
|
||||
|
||||
log.info("fetch_OQaD complete — returning %s rows", len(df))
|
||||
return df
|
||||
|
||||
# def fetch_OQaD(
|
||||
# sql_engine: Engine,
|
||||
# table_name: str,
|
||||
@@ -312,83 +143,85 @@ def fetch_OQaD(
|
||||
# run_date: date
|
||||
# ) -> pl.DataFrame:
|
||||
|
||||
# # ─────────────────────────────────────────────
|
||||
# # INNER HELPERS (defined once, used below)
|
||||
# # ─────────────────────────────────────────────
|
||||
|
||||
# client= get_clickhouse_client()
|
||||
# def table_exists(
|
||||
# client,
|
||||
# table_name: str,
|
||||
# ) -> bool:
|
||||
# client = get_clickhouse_client()
|
||||
|
||||
# return bool(
|
||||
# client.command(
|
||||
# f"EXISTS TABLE {table_name}"
|
||||
# )
|
||||
# )
|
||||
# # ── Does a ClickHouse table exist? ────────────
|
||||
# def table_exists(client, table_name: str) -> bool:
|
||||
|
||||
# return bool(client.command(f"EXISTS TABLE {table_name}"))
|
||||
|
||||
# # ── STEP 1: Who submitted yesterday in SQL Server? ───
|
||||
# def fetch_quiz_empids(engine: Engine, run_date: date) -> pl.DataFrame:
|
||||
|
||||
|
||||
# # Format date ONCE safely — avoids f-string injection bugs
|
||||
# run_date_str = run_date.strftime("%Y-%m-%d")
|
||||
# next_date_str = (run_date + timedelta(days=1)).strftime("%Y-%m-%d")
|
||||
|
||||
# def fetch_quiz_empids(engine: Engine, run_date : date) -> pl.DataFrame:
|
||||
|
||||
# sql_template = f"""
|
||||
# sql = f"""
|
||||
# WITH MID_TABLE_COV1 AS
|
||||
# (
|
||||
# SELECT EmpId, VisitDate
|
||||
# -- Records CREATED yesterday
|
||||
# SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate
|
||||
# FROM OneApp_KelloggsMT.dbo.T_OQAD
|
||||
# WHERE CreateDate >= {run_date}
|
||||
# AND CreateDate < DATEADD(DAY,1,'{run_date}')
|
||||
# WHERE CreateDate >= '{run_date_str}'
|
||||
# AND CreateDate < '{next_date_str}'
|
||||
|
||||
# UNION
|
||||
# UNION ALL
|
||||
|
||||
# SELECT EmpId, VisitDate
|
||||
# -- Records UPDATED yesterday (different rows, safe to UNION ALL)
|
||||
# SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate
|
||||
# FROM OneApp_KelloggsMT.dbo.T_OQAD
|
||||
# WHERE UpdateDate >= {run_date}
|
||||
# AND UpdateDate < DATEADD(DAY,1, '{run_date}')
|
||||
# WHERE UpdateDate >= '{run_date_str}'
|
||||
# AND UpdateDate < '{next_date_str}'
|
||||
# ),
|
||||
|
||||
# QUIZ AS
|
||||
# (
|
||||
# SELECT Distinct E.EmpId as empid
|
||||
# , CONVERT(date,DQ.VisitDate) AS visitdate
|
||||
# FROM OneApp_KelloggsMT.dbo.T_OQAD DQ INNER JOIN
|
||||
# OneApp_KelloggsMT.dbo.vw_Employee_Detail E ON DQ.EmpId = E.EmpId inner join
|
||||
# OneApp_KelloggsMT.dbo.Master_OQAD_Question QU on DQ.QuestionId= qu.QuestionId inner join
|
||||
# OneApp_KelloggsMT.dbo.Master_OQAD_Category qc on qu.QuestionCategoryId= qc.QuestionCategoryId
|
||||
# where e.EmpName not like 'test%' and e.RightId in (6)
|
||||
# and (E.ResignDate is null or E.ResignDate>=''+CONVERT(VARCHAR,'{run_date}')+'') AND E.EmpName NOT LIKE '%TEST%'
|
||||
# AND DQ.EmpId IN (SELECT EmpId FROM MID_TABLE_COV1 A WHERE
|
||||
# DQ.EmpId=A.EmpId AND CONVERT(date,VisitDate)=CONVERT(date,A.VisitDate) )
|
||||
# ) select * from quiz
|
||||
# SELECT DISTINCT
|
||||
# E.EmpId AS empid,
|
||||
# CAST(DQ.VisitDate AS DATE) AS visitdate
|
||||
# FROM OneApp_KelloggsMT.dbo.T_OQAD DQ
|
||||
# INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail E
|
||||
# ON DQ.EmpId = E.EmpId
|
||||
# INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Question QU
|
||||
# ON DQ.QuestionId = QU.QuestionId
|
||||
# INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Category QC
|
||||
# ON QU.QuestionCategoryId = QC.QuestionCategoryId
|
||||
# WHERE E.EmpName NOT LIKE '%TEST%' -- exclude test employees
|
||||
# AND E.RightId = 6 -- only field reps
|
||||
# AND (
|
||||
# E.ResignDate IS NULL
|
||||
# OR CAST(E.ResignDate AS DATE) >= '{run_date_str}'
|
||||
# )
|
||||
# AND EXISTS ( -- ✅ EXISTS beats IN for large sets
|
||||
# SELECT 1
|
||||
# FROM MID_TABLE_COV1 A
|
||||
# WHERE A.EmpId = DQ.EmpId
|
||||
# AND A.VisitDate = CAST(DQ.VisitDate AS DATE)
|
||||
# )
|
||||
# )
|
||||
# SELECT * FROM QUIZ
|
||||
# """
|
||||
# sql = sql_template.format(
|
||||
# run_date=run_date.strftime("%Y-%m-%d")
|
||||
# )
|
||||
|
||||
# log.info(f"Fetching quiz_empids data for EMPID and Visitid")
|
||||
|
||||
# df = pl.read_database(
|
||||
# query=sql,
|
||||
# connection=engine
|
||||
# )
|
||||
|
||||
|
||||
# log.info(f"Fetched {len(df):,} total empid and visitdate fetched for OQAD from SQL Server")
|
||||
|
||||
# log.info("Fetching quiz empids for run_date=%s", run_date_str)
|
||||
# df = pl.read_database(query=sql, connection=engine)
|
||||
# log.info("Fetched %s (EmpId, VisitDate) pairs from SQL Server", len(df))
|
||||
# return df
|
||||
|
||||
|
||||
# # ── STEP 2: Who do we ALREADY have in ClickHouse? ───
|
||||
# def get_empids_clickhouse_OQAD(
|
||||
# client,
|
||||
# table_name: str = "OQaD",
|
||||
# ) -> pl.DataFrame:
|
||||
|
||||
|
||||
# if not table_exists(client, table_name):
|
||||
# log.warning(f"Table '{table_name}' does not exist.")
|
||||
# return pl.DataFrame(
|
||||
# schema={
|
||||
# "empid": pl.Int64,
|
||||
# "visitdate": pl.Date,
|
||||
# }
|
||||
# )
|
||||
# log.warning("Table '%s' does not exist in ClickHouse.", table_name)
|
||||
# return pl.DataFrame(schema={"empid": pl.Int64, "visitdate": pl.Date})
|
||||
|
||||
# query = f"""
|
||||
# SELECT DISTINCT
|
||||
@@ -397,119 +230,286 @@ def fetch_OQaD(
|
||||
# FROM {table_name}
|
||||
# """
|
||||
|
||||
# # ClickHouse -> PyArrow -> Polars
|
||||
# arrow_table = client.query_arrow(query)
|
||||
# df = pl.from_arrow(arrow_table)
|
||||
# log.info("Fetched %s existing (EmpId, VisitDate) pairs from ClickHouse", len(df))
|
||||
# return df
|
||||
|
||||
# return pl.from_arrow(arrow_table)
|
||||
# # ── STEP 3: Who is NEW? (in SQL Server but NOT yet in ClickHouse) ───
|
||||
# def find_new_empids(
|
||||
# sql_df: pl.DataFrame,
|
||||
# ch_df: pl.DataFrame,
|
||||
# ) -> list[int]:
|
||||
|
||||
|
||||
|
||||
# qf=fetch_quiz_empids(sql_engine,run_date)
|
||||
# db_df = get_empids_clickhouse_OQAD(client)
|
||||
|
||||
# matched = qf.join(
|
||||
# db_df,
|
||||
# new_df = sql_df.join(
|
||||
# ch_df,
|
||||
# on=["empid", "visitdate"],
|
||||
# how="inner",
|
||||
# how="anti", # ✅ anti = keep rows NOT found in ch_df
|
||||
# )
|
||||
|
||||
# if matched.is_empty():
|
||||
# if new_df.is_empty():
|
||||
# log.warning("No new EmpIds found for table=%s — nothing to fetch.", table_name)
|
||||
# return [0] # sentinel value — the .sql WHERE will return 0 rows safely
|
||||
|
||||
# empids=[0]
|
||||
# log.warning(
|
||||
# "%s Matched df in OQaD returned no rows",
|
||||
# table_name,
|
||||
# )
|
||||
|
||||
# else:
|
||||
# empids=matched["empid"].to_list()
|
||||
|
||||
|
||||
# log.info(f"Fetched {len(empids):,} matched empids fetched for OQAD ")
|
||||
# empids = new_df["empid"].unique().to_list()
|
||||
# log.info("Found %s NEW empids to fetch for %s", len(empids), table_name)
|
||||
# return empids
|
||||
|
||||
# # ── STEP 4: Fetch full quiz data for new empids ───
|
||||
# def fetch_data(
|
||||
# engine: Engine,
|
||||
# table_name: str,
|
||||
# table_type: str,
|
||||
# empids: list[int],
|
||||
# run_date: date
|
||||
# run_date: date,
|
||||
# ) -> pl.DataFrame:
|
||||
|
||||
# empid_list = ",".join(str(empid) for empid in empids)
|
||||
|
||||
# run_date_str = run_date.strftime("%Y-%m-%d")
|
||||
# empid_list = ", ".join(str(e) for e in empids) # "101, 102, 103"
|
||||
|
||||
# sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql"
|
||||
|
||||
# log.info(f"Exists: {sql_file.exists()}")
|
||||
# log.info(f"Path: {sql_file.resolve()}")
|
||||
# log.info("Loading SQL from: %s (exists=%s)", sql_file.resolve(), sql_file.exists())
|
||||
|
||||
# with open(sql_file, "r", encoding="utf-8") as f:
|
||||
# sql_template = f.read()
|
||||
|
||||
# sql = sql_template.format(
|
||||
# empid_list=empid_list,
|
||||
# run_date=run_date.strftime("%Y-%m-%d")
|
||||
# run_date=run_date_str,
|
||||
# )
|
||||
|
||||
# log.info(f"Fetching data for {len(empids):,} EMPIDs")
|
||||
|
||||
# log.info("Fetching OQaD data for run_date=%s", run_date)
|
||||
|
||||
# df = pl.read_database(
|
||||
# query=sql,
|
||||
# connection=engine,
|
||||
# )
|
||||
|
||||
# log.info("fn name is fetch_OQad ------Fetched %s rows", len(df))
|
||||
|
||||
# log.info("Fetching full OQaD data for %s empids, run_date=%s", len(empids), run_date_str)
|
||||
# df = pl.read_database(query=sql, connection=engine)
|
||||
# log.info("Fetched %s rows from SQL Server for table=%s", len(df), table_name)
|
||||
# return df
|
||||
# df=fetch_data( engine=sql_engine,
|
||||
|
||||
# # ─────────────────────────────────────────────
|
||||
# # MAIN FLOW (the 4 steps, clearly sequenced)
|
||||
# # ─────────────────────────────────────────────
|
||||
|
||||
# qf = fetch_quiz_empids(sql_engine, run_date) # Step 1
|
||||
# db_df = get_empids_clickhouse_OQAD(client, table_name) # Step 2
|
||||
# empids = find_new_empids(qf, db_df) # Step 3
|
||||
|
||||
# df = fetch_data( # Step 4
|
||||
# engine=sql_engine,
|
||||
# table_name=table_name,
|
||||
# table_type=table_type,
|
||||
# empids=empids,
|
||||
# run_date=run_date
|
||||
# run_date=run_date,
|
||||
# )
|
||||
# log.info(f"Fetched {len(df):,} rows from SQL Server")
|
||||
|
||||
# log.info("fetch_OQaD complete — returning %s rows", len(df))
|
||||
# return df
|
||||
|
||||
|
||||
# def fetch_OQaD(
|
||||
# engine: Engine,
|
||||
# table_name: str,
|
||||
# table_type: str,
|
||||
# empids: list[int],
|
||||
# run_date: date
|
||||
# ) -> pl.DataFrame:
|
||||
|
||||
# empid_list = ",".join(str(empid) for empid in empids)
|
||||
def fetch_OQaD(
|
||||
sql_engine: Engine,
|
||||
table_name: str,
|
||||
table_type: str,
|
||||
mids: list[int],
|
||||
run_date: date
|
||||
) -> pl.DataFrame:
|
||||
|
||||
|
||||
# sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql"
|
||||
client= get_clickhouse_client()
|
||||
def table_exists(
|
||||
client,
|
||||
table_name: str,
|
||||
) -> bool:
|
||||
|
||||
# log.info(f"Exists: {sql_file.exists()}")
|
||||
# log.info(f"Path: {sql_file.resolve()}")
|
||||
return bool(
|
||||
client.command(
|
||||
f"EXISTS TABLE {table_name}"
|
||||
)
|
||||
)
|
||||
|
||||
# with open(sql_file, "r", encoding="utf-8") as f:
|
||||
# sql_template = f.read()
|
||||
|
||||
# sql = sql_template.format(
|
||||
# empid_list=empid_list,
|
||||
# run_date=run_date.strftime("%Y-%m-%d")
|
||||
# )
|
||||
|
||||
# log.info(f"Fetching data for {len(empids):,} EMPIDs")
|
||||
def fetch_quiz_empids(engine: Engine, run_date : date) -> pl.DataFrame:
|
||||
|
||||
# log.info("Fetching OQaD data for run_date=%s", run_date)
|
||||
sql_template = f"""
|
||||
WITH MID_TABLE_COV1 AS
|
||||
(
|
||||
SELECT EmpId, VisitDate
|
||||
FROM OneApp_KelloggsMT.dbo.T_OQAD
|
||||
WHERE CreateDate >= {run_date}
|
||||
AND CreateDate < DATEADD(DAY,1,'{run_date}')
|
||||
|
||||
# df = pl.read_database(
|
||||
# query=sql,
|
||||
# connection=engine,
|
||||
# )
|
||||
UNION
|
||||
|
||||
# log.info("fn name is fetch_OQad ------Fetched %s rows", len(df))
|
||||
SELECT EmpId, VisitDate
|
||||
FROM OneApp_KelloggsMT.dbo.T_OQAD
|
||||
WHERE UpdateDate >= {run_date}
|
||||
AND UpdateDate < DATEADD(DAY,1, '{run_date}')
|
||||
),
|
||||
|
||||
# return df
|
||||
QUIZ AS
|
||||
(
|
||||
SELECT Distinct E.EmpId as empid
|
||||
, CONVERT(date,DQ.VisitDate) AS visitdate
|
||||
FROM OneApp_KelloggsMT.dbo.T_OQAD DQ INNER JOIN
|
||||
OneApp_KelloggsMT.dbo.vw_Employee_Detail E ON DQ.EmpId = E.EmpId inner join
|
||||
OneApp_KelloggsMT.dbo.Master_OQAD_Question QU on DQ.QuestionId= qu.QuestionId inner join
|
||||
OneApp_KelloggsMT.dbo.Master_OQAD_Category qc on qu.QuestionCategoryId= qc.QuestionCategoryId
|
||||
where e.EmpName not like 'test%' and e.RightId in (6)
|
||||
and (E.ResignDate is null or E.ResignDate>=''+CONVERT(VARCHAR,'{run_date}')+'') AND E.EmpName NOT LIKE '%TEST%'
|
||||
AND DQ.EmpId IN (SELECT EmpId FROM MID_TABLE_COV1 A WHERE
|
||||
DQ.EmpId=A.EmpId AND CONVERT(date,VisitDate)=CONVERT(date,A.VisitDate) )
|
||||
) select * from quiz
|
||||
"""
|
||||
sql = sql_template.format(
|
||||
run_date=run_date.strftime("%Y-%m-%d")
|
||||
)
|
||||
|
||||
log.info(f"Fetching quiz_empids data for EMPID and Visitid")
|
||||
|
||||
df = pl.read_database(
|
||||
query=sql,
|
||||
connection=engine
|
||||
)
|
||||
|
||||
|
||||
log.info(f"Fetched {len(df):,} total empid and visitdate fetched for OQAD from SQL Server")
|
||||
|
||||
return df
|
||||
|
||||
|
||||
def get_empids_clickhouse_OQAD(
|
||||
client,
|
||||
table_name: str = "OQaD",
|
||||
) -> pl.DataFrame:
|
||||
|
||||
if not table_exists(client, table_name):
|
||||
log.warning(f"Table '{table_name}' does not exist.")
|
||||
return pl.DataFrame(
|
||||
schema={
|
||||
"empid": pl.Int64,
|
||||
"visitdate": pl.Date,
|
||||
}
|
||||
)
|
||||
|
||||
query = f"""
|
||||
SELECT DISTINCT
|
||||
employee_id AS empid,
|
||||
visit_date AS visitdate
|
||||
FROM {table_name}
|
||||
"""
|
||||
|
||||
# ClickHouse -> PyArrow -> Polars
|
||||
arrow_table = client.query_arrow(query)
|
||||
|
||||
return pl.from_arrow(arrow_table)
|
||||
|
||||
|
||||
|
||||
qf=fetch_quiz_empids(sql_engine,run_date)
|
||||
db_df = get_empids_clickhouse_OQAD(client)
|
||||
|
||||
matched = qf.join(
|
||||
db_df,
|
||||
on=["empid", "visitdate"],
|
||||
how="inner",
|
||||
)
|
||||
|
||||
if matched.is_empty():
|
||||
|
||||
empids=[0]
|
||||
log.warning(
|
||||
"%s Matched df in OQaD returned no rows",
|
||||
table_name,
|
||||
)
|
||||
|
||||
else:
|
||||
empids=matched["empid"].to_list()
|
||||
|
||||
|
||||
log.info(f"Fetched {len(empids):,} matched empids fetched for OQAD ")
|
||||
|
||||
def fetch_data(
|
||||
engine: Engine,
|
||||
table_name: str,
|
||||
table_type: str,
|
||||
empids: list[int],
|
||||
run_date: date
|
||||
) -> pl.DataFrame:
|
||||
|
||||
empid_list = ",".join(str(empid) for empid in empids)
|
||||
|
||||
|
||||
sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql"
|
||||
|
||||
log.info(f"Exists: {sql_file.exists()}")
|
||||
log.info(f"Path: {sql_file.resolve()}")
|
||||
|
||||
with open(sql_file, "r", encoding="utf-8") as f:
|
||||
sql_template = f.read()
|
||||
|
||||
sql = sql_template.format(
|
||||
empid_list=empid_list,
|
||||
run_date=run_date.strftime("%Y-%m-%d")
|
||||
)
|
||||
|
||||
log.info(f"Fetching data for {len(empids):,} EMPIDs")
|
||||
|
||||
log.info("Fetching OQaD data for run_date=%s", run_date)
|
||||
|
||||
df = pl.read_database(
|
||||
query=sql,
|
||||
connection=engine,
|
||||
)
|
||||
|
||||
log.info("fn name is fetch_OQad ------Fetched %s rows", len(df))
|
||||
|
||||
return df
|
||||
df=fetch_data( engine=sql_engine,
|
||||
table_name=table_name,
|
||||
table_type=table_type,
|
||||
empids=empids,
|
||||
run_date=run_date
|
||||
)
|
||||
log.info(f"Fetched {len(df):,} rows from SQL Server")
|
||||
|
||||
return df
|
||||
|
||||
|
||||
def fetch_OQaD(
|
||||
engine: Engine,
|
||||
table_name: str,
|
||||
table_type: str,
|
||||
empids: list[int],
|
||||
run_date: date
|
||||
) -> pl.DataFrame:
|
||||
|
||||
empid_list = ",".join(str(empid) for empid in empids)
|
||||
|
||||
|
||||
sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql"
|
||||
|
||||
log.info(f"Exists: {sql_file.exists()}")
|
||||
log.info(f"Path: {sql_file.resolve()}")
|
||||
|
||||
with open(sql_file, "r", encoding="utf-8") as f:
|
||||
sql_template = f.read()
|
||||
|
||||
sql = sql_template.format(
|
||||
empid_list=empid_list,
|
||||
run_date=run_date.strftime("%Y-%m-%d")
|
||||
)
|
||||
|
||||
log.info(f"Fetching data for {len(empids):,} EMPIDs")
|
||||
|
||||
log.info("Fetching OQaD data for run_date=%s", run_date)
|
||||
|
||||
df = pl.read_database(
|
||||
query=sql,
|
||||
connection=engine,
|
||||
)
|
||||
|
||||
log.info("fn name is fetch_OQad ------Fetched %s rows", len(df))
|
||||
|
||||
return df
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,903 @@
|
||||
from pathlib import Path
|
||||
import polars as pl
|
||||
from sqlalchemy import Engine
|
||||
from datetime import date , timedelta
|
||||
from log import log
|
||||
|
||||
|
||||
|
||||
|
||||
from db_con.connection import (
|
||||
build_sql_server_engine,
|
||||
build_clickhouse_engine,
|
||||
get_clickhouse_client,
|
||||
)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
registry = {}
|
||||
|
||||
def register(func):
|
||||
registry[func.__name__] = func
|
||||
return func
|
||||
|
||||
|
||||
|
||||
|
||||
# def fetch_data(
|
||||
# engine: Engine,
|
||||
# table_name: str,
|
||||
# table_type: str,
|
||||
# mids: list[int],
|
||||
# run_date: date
|
||||
# ) -> pl.DataFrame:
|
||||
|
||||
# if not mids:
|
||||
# log.warning("No MIDs — nothing to fetch.")
|
||||
# return pl.DataFrame()
|
||||
|
||||
# mid_list = ",".join(str(mid) for mid in mids)
|
||||
|
||||
# sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
|
||||
|
||||
# with open(sql_file, "r", encoding="utf-8") as f:
|
||||
# sql_template = f.read()
|
||||
|
||||
# sql = sql_template.format(
|
||||
# mid_list=mid_list,
|
||||
# run_date=run_date.strftime("%Y-%m-%d")
|
||||
# )
|
||||
|
||||
# log.info(f"Fetching data for {len(mids):,} MIDs")
|
||||
|
||||
# df = pl.read_database(
|
||||
# query=sql,
|
||||
# connection=engine
|
||||
# )
|
||||
|
||||
# log.info(f"Fetched {len(df):,} rows from SQL Server")
|
||||
|
||||
# return df
|
||||
|
||||
|
||||
@register
|
||||
def fetch_SOS_OneApp( engine: Engine,
|
||||
table_name: str,
|
||||
table_type: str,
|
||||
mids: list[int],
|
||||
run_date: date
|
||||
) -> pl.DataFrame:
|
||||
|
||||
|
||||
|
||||
if not mids:
|
||||
log.warning("No MIDs — nothing to fetch.")
|
||||
return pl.DataFrame()
|
||||
log.info(f" Start Fetching data for these {len(mids):} MIDs ")
|
||||
mid_list = ",".join(str(mid) for mid in mids)
|
||||
log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}")
|
||||
sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
|
||||
|
||||
with open(sql_file, "r", encoding="utf-8") as f:
|
||||
sql_template = f.read()
|
||||
|
||||
sql = sql_template.format(
|
||||
mid_list=mid_list,
|
||||
run_date=run_date.strftime("%Y-%m-%d")
|
||||
)
|
||||
log.info(f"Fetching data for {len(mids):,} MIDs")
|
||||
|
||||
df = pl.read_database(
|
||||
query=sql,
|
||||
connection=engine
|
||||
)
|
||||
log.info(f"Fetched {len(df):,} rows from SQL Server")
|
||||
|
||||
return df
|
||||
|
||||
|
||||
@register
|
||||
def fetch_additional_visibility( engine: Engine,
|
||||
table_name: str,
|
||||
table_type: str,
|
||||
mids: list[int],
|
||||
run_date: date
|
||||
) -> pl.DataFrame:
|
||||
|
||||
|
||||
|
||||
if not mids:
|
||||
log.warning("No MIDs — nothing to fetch.")
|
||||
return pl.DataFrame()
|
||||
|
||||
log.info(f" Start Fetching data for these {len(mids):} MIDs ")
|
||||
mid_list = ",".join(str(mid) for mid in mids)
|
||||
log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}")
|
||||
sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
|
||||
|
||||
with open(sql_file, "r", encoding="utf-8") as f:
|
||||
sql_template = f.read()
|
||||
|
||||
sql = sql_template.format(
|
||||
mid_list=mid_list,
|
||||
run_date=run_date.strftime("%Y-%m-%d")
|
||||
)
|
||||
log.info(f"Fetching data for {len(mids):,} MIDs")
|
||||
|
||||
df = pl.read_database(
|
||||
query=sql,
|
||||
connection=engine
|
||||
)
|
||||
log.info(f"Fetched {len(df):,} rows from SQL Server")
|
||||
|
||||
return df
|
||||
|
||||
|
||||
|
||||
# def fetch_OQaD(
|
||||
# sql_engine: Engine,
|
||||
# table_name: str,
|
||||
# table_type: str,
|
||||
# mids: list[int],
|
||||
# run_date: date
|
||||
# ) -> pl.DataFrame:
|
||||
|
||||
# # ─────────────────────────────────────────────
|
||||
# # INNER HELPERS (defined once, used below)
|
||||
# # ─────────────────────────────────────────────
|
||||
|
||||
# client = get_clickhouse_client()
|
||||
|
||||
# # ── Does a ClickHouse table exist? ────────────
|
||||
# def table_exists(client, table_name: str) -> bool:
|
||||
|
||||
# return bool(client.command(f"EXISTS TABLE {table_name}"))
|
||||
|
||||
# # ── STEP 1: Who submitted yesterday in SQL Server? ───
|
||||
# def fetch_quiz_empids(engine: Engine, run_date: date) -> pl.DataFrame:
|
||||
|
||||
|
||||
# # Format date ONCE safely — avoids f-string injection bugs
|
||||
# run_date_str = run_date.strftime("%Y-%m-%d")
|
||||
# next_date_str = (run_date + timedelta(days=1)).strftime("%Y-%m-%d")
|
||||
|
||||
# sql = f"""
|
||||
# WITH MID_TABLE_COV1 AS
|
||||
# (
|
||||
# -- Records CREATED yesterday
|
||||
# SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate
|
||||
# FROM OneApp_KelloggsMT.dbo.T_OQAD
|
||||
# WHERE CreateDate >= '{run_date_str}'
|
||||
# AND CreateDate < '{next_date_str}'
|
||||
|
||||
# UNION ALL
|
||||
|
||||
# -- Records UPDATED yesterday (different rows, safe to UNION ALL)
|
||||
# SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate
|
||||
# FROM OneApp_KelloggsMT.dbo.T_OQAD
|
||||
# WHERE UpdateDate >= '{run_date_str}'
|
||||
# AND UpdateDate < '{next_date_str}'
|
||||
# ),
|
||||
# QUIZ AS
|
||||
# (
|
||||
# SELECT DISTINCT
|
||||
# E.EmpId AS empid,
|
||||
# CAST(DQ.VisitDate AS DATE) AS visitdate
|
||||
# FROM OneApp_KelloggsMT.dbo.T_OQAD DQ
|
||||
# INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail E
|
||||
# ON DQ.EmpId = E.EmpId
|
||||
# INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Question QU
|
||||
# ON DQ.QuestionId = QU.QuestionId
|
||||
# INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Category QC
|
||||
# ON QU.QuestionCategoryId = QC.QuestionCategoryId
|
||||
# WHERE E.EmpName NOT LIKE '%TEST%' -- exclude test employees
|
||||
# AND E.RightId = 6 -- only field reps
|
||||
# AND (
|
||||
# E.ResignDate IS NULL
|
||||
# OR CAST(E.ResignDate AS DATE) >= '{run_date_str}'
|
||||
# )
|
||||
# AND EXISTS ( -- ✅ EXISTS beats IN for large sets
|
||||
# SELECT 1
|
||||
# FROM MID_TABLE_COV1 A
|
||||
# WHERE A.EmpId = DQ.EmpId
|
||||
# AND A.VisitDate = CAST(DQ.VisitDate AS DATE)
|
||||
# )
|
||||
# )
|
||||
# SELECT * FROM QUIZ
|
||||
# """
|
||||
|
||||
# log.info("Fetching quiz empids for run_date=%s", run_date_str)
|
||||
# df = pl.read_database(query=sql, connection=engine)
|
||||
# log.info("Fetched %s (EmpId, VisitDate) pairs from SQL Server", len(df))
|
||||
# return df
|
||||
|
||||
# # ── STEP 2: Who do we ALREADY have in ClickHouse? ───
|
||||
# def get_empids_clickhouse_OQAD(
|
||||
# client,
|
||||
# table_name: str = "OQaD",
|
||||
# ) -> pl.DataFrame:
|
||||
|
||||
|
||||
# if not table_exists(client, table_name):
|
||||
# log.warning("Table '%s' does not exist in ClickHouse.", table_name)
|
||||
# return pl.DataFrame(schema={"empid": pl.Int64, "visitdate": pl.Date})
|
||||
|
||||
# query = f"""
|
||||
# SELECT DISTINCT
|
||||
# employee_id AS empid,
|
||||
# visit_date AS visitdate
|
||||
# FROM {table_name}
|
||||
# """
|
||||
|
||||
# arrow_table = client.query_arrow(query)
|
||||
# df = pl.from_arrow(arrow_table)
|
||||
# log.info("Fetched %s existing (EmpId, VisitDate) pairs from ClickHouse", len(df))
|
||||
# return df
|
||||
|
||||
# # ── STEP 3: Who is NEW? (in SQL Server but NOT yet in ClickHouse) ───
|
||||
# def find_new_empids(
|
||||
# sql_df: pl.DataFrame,
|
||||
# ch_df: pl.DataFrame,
|
||||
# ) -> list[int]:
|
||||
|
||||
|
||||
# new_df = sql_df.join(
|
||||
# ch_df,
|
||||
# on=["empid", "visitdate"],
|
||||
# how="anti", # ✅ anti = keep rows NOT found in ch_df
|
||||
# )
|
||||
|
||||
# if new_df.is_empty():
|
||||
# log.warning("No new EmpIds found for table=%s — nothing to fetch.", table_name)
|
||||
# return [0] # sentinel value — the .sql WHERE will return 0 rows safely
|
||||
|
||||
# empids = new_df["empid"].unique().to_list()
|
||||
# log.info("Found %s NEW empids to fetch for %s", len(empids), table_name)
|
||||
# return empids
|
||||
|
||||
# # ── STEP 4: Fetch full quiz data for new empids ───
|
||||
# def fetch_data(
|
||||
# engine: Engine,
|
||||
# table_name: str,
|
||||
# table_type: str,
|
||||
# empids: list[int],
|
||||
# run_date: date,
|
||||
# ) -> pl.DataFrame:
|
||||
|
||||
|
||||
# run_date_str = run_date.strftime("%Y-%m-%d")
|
||||
# empid_list = ", ".join(str(e) for e in empids) # "101, 102, 103"
|
||||
|
||||
# sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql"
|
||||
# log.info("Loading SQL from: %s (exists=%s)", sql_file.resolve(), sql_file.exists())
|
||||
|
||||
# with open(sql_file, "r", encoding="utf-8") as f:
|
||||
# sql_template = f.read()
|
||||
|
||||
# sql = sql_template.format(
|
||||
# empid_list=empid_list,
|
||||
# run_date=run_date_str,
|
||||
# )
|
||||
|
||||
# log.info("Fetching full OQaD data for %s empids, run_date=%s", len(empids), run_date_str)
|
||||
# df = pl.read_database(query=sql, connection=engine)
|
||||
# log.info("Fetched %s rows from SQL Server for table=%s", len(df), table_name)
|
||||
# return df
|
||||
|
||||
# # ─────────────────────────────────────────────
|
||||
# # MAIN FLOW (the 4 steps, clearly sequenced)
|
||||
# # ─────────────────────────────────────────────
|
||||
|
||||
# qf = fetch_quiz_empids(sql_engine, run_date) # Step 1
|
||||
# db_df = get_empids_clickhouse_OQAD(client, table_name) # Step 2
|
||||
# empids = find_new_empids(qf, db_df) # Step 3
|
||||
|
||||
# df = fetch_data( # Step 4
|
||||
# engine=sql_engine,
|
||||
# table_name=table_name,
|
||||
# table_type=table_type,
|
||||
# empids=empids,
|
||||
# run_date=run_date,
|
||||
# )
|
||||
|
||||
# log.info("fetch_OQaD complete — returning %s rows", len(df))
|
||||
# return df
|
||||
|
||||
|
||||
@register
|
||||
def fetch_OQaD(
|
||||
sql_engine: Engine,
|
||||
table_name: str,
|
||||
table_type: str,
|
||||
mids: list[int],
|
||||
run_date: date
|
||||
) -> pl.DataFrame:
|
||||
|
||||
|
||||
client= get_clickhouse_client()
|
||||
def table_exists(
|
||||
client,
|
||||
table_name: str,
|
||||
) -> bool:
|
||||
|
||||
return bool(
|
||||
client.command(
|
||||
f"EXISTS TABLE {table_name}"
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
|
||||
def fetch_quiz_empids(engine: Engine, run_date : date) -> pl.DataFrame:
|
||||
|
||||
sql_template = f"""
|
||||
WITH MID_TABLE_COV1 AS
|
||||
(
|
||||
SELECT EmpId, VisitDate
|
||||
FROM OneApp_KelloggsMT.dbo.T_OQAD
|
||||
WHERE CreateDate >= {run_date}
|
||||
AND CreateDate < DATEADD(DAY,1,'{run_date}')
|
||||
|
||||
UNION
|
||||
|
||||
SELECT EmpId, VisitDate
|
||||
FROM OneApp_KelloggsMT.dbo.T_OQAD
|
||||
WHERE UpdateDate >= {run_date}
|
||||
AND UpdateDate < DATEADD(DAY,1, '{run_date}')
|
||||
),
|
||||
|
||||
QUIZ AS
|
||||
(
|
||||
SELECT Distinct E.EmpId as empid
|
||||
, CONVERT(date,DQ.VisitDate) AS visitdate
|
||||
FROM OneApp_KelloggsMT.dbo.T_OQAD DQ INNER JOIN
|
||||
OneApp_KelloggsMT.dbo.vw_Employee_Detail E ON DQ.EmpId = E.EmpId inner join
|
||||
OneApp_KelloggsMT.dbo.Master_OQAD_Question QU on DQ.QuestionId= qu.QuestionId inner join
|
||||
OneApp_KelloggsMT.dbo.Master_OQAD_Category qc on qu.QuestionCategoryId= qc.QuestionCategoryId
|
||||
where e.EmpName not like 'test%' and e.RightId in (6)
|
||||
and (E.ResignDate is null or E.ResignDate>=''+CONVERT(VARCHAR,'{run_date}')+'') AND E.EmpName NOT LIKE '%TEST%'
|
||||
AND DQ.EmpId IN (SELECT EmpId FROM MID_TABLE_COV1 A WHERE
|
||||
DQ.EmpId=A.EmpId AND CONVERT(date,VisitDate)=CONVERT(date,A.VisitDate) )
|
||||
) select * from quiz
|
||||
"""
|
||||
sql = sql_template.format(
|
||||
run_date=run_date.strftime("%Y-%m-%d")
|
||||
)
|
||||
|
||||
log.info(f"Fetching quiz_empids data for EMPID and Visitid")
|
||||
|
||||
df = pl.read_database(
|
||||
query=sql,
|
||||
connection=engine
|
||||
)
|
||||
|
||||
|
||||
log.info(f"Fetched {len(df):,} total empid and visitdate fetched for OQAD from SQL Server")
|
||||
|
||||
return df
|
||||
|
||||
|
||||
def get_empids_clickhouse_OQAD(
|
||||
client,
|
||||
table_name: str = "OQaD",
|
||||
) -> pl.DataFrame:
|
||||
|
||||
if not table_exists(client, table_name):
|
||||
log.warning(f"Table '{table_name}' does not exist.")
|
||||
return pl.DataFrame(
|
||||
schema={
|
||||
"empid": pl.Int64,
|
||||
"visitdate": pl.Date,
|
||||
}
|
||||
)
|
||||
|
||||
query = f"""
|
||||
SELECT DISTINCT
|
||||
employee_id AS empid,
|
||||
visit_date AS visitdate
|
||||
FROM {table_name}
|
||||
"""
|
||||
|
||||
# ClickHouse -> PyArrow -> Polars
|
||||
arrow_table = client.query_arrow(query)
|
||||
|
||||
return pl.from_arrow(arrow_table)
|
||||
|
||||
|
||||
|
||||
qf=fetch_quiz_empids(sql_engine,run_date)
|
||||
db_df = get_empids_clickhouse_OQAD(client)
|
||||
|
||||
matched = qf.join(
|
||||
db_df,
|
||||
on=["empid", "visitdate"],
|
||||
how="inner",
|
||||
)
|
||||
|
||||
if matched.is_empty():
|
||||
|
||||
empids=[0]
|
||||
log.warning(
|
||||
"%s Matched df in OQaD returned no rows",
|
||||
table_name,
|
||||
)
|
||||
|
||||
else:
|
||||
empids=matched["empid"].to_list()
|
||||
|
||||
|
||||
log.info(f"Fetched {len(empids):,} matched empids fetched for OQAD ")
|
||||
|
||||
def fetch_data(
|
||||
engine: Engine,
|
||||
table_name: str,
|
||||
table_type: str,
|
||||
empids: list[int],
|
||||
run_date: date
|
||||
) -> pl.DataFrame:
|
||||
|
||||
empid_list = ",".join(str(empid) for empid in empids)
|
||||
|
||||
|
||||
sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql"
|
||||
|
||||
log.info(f"Exists: {sql_file.exists()}")
|
||||
log.info(f"Path: {sql_file.resolve()}")
|
||||
|
||||
with open(sql_file, "r", encoding="utf-8") as f:
|
||||
sql_template = f.read()
|
||||
|
||||
sql = sql_template.format(
|
||||
empid_list=empid_list,
|
||||
run_date=run_date.strftime("%Y-%m-%d")
|
||||
)
|
||||
|
||||
log.info(f"Fetching data for {len(empids):,} EMPIDs")
|
||||
|
||||
log.info("Fetching OQaD data for run_date=%s", run_date)
|
||||
|
||||
df = pl.read_database(
|
||||
query=sql,
|
||||
connection=engine,
|
||||
)
|
||||
|
||||
log.info("fn name is fetch_OQad ------Fetched %s rows", len(df))
|
||||
|
||||
return df
|
||||
df=fetch_data( engine=sql_engine,
|
||||
table_name=table_name,
|
||||
table_type=table_type,
|
||||
empids=empids,
|
||||
run_date=run_date
|
||||
)
|
||||
log.info(f"Fetched {len(df):,} rows from SQL Server")
|
||||
|
||||
return df
|
||||
|
||||
|
||||
@register
|
||||
def fetch_OQaD(
|
||||
engine: Engine,
|
||||
table_name: str,
|
||||
table_type: str,
|
||||
empids: list[int],
|
||||
run_date: date
|
||||
) -> pl.DataFrame:
|
||||
|
||||
empid_list = ",".join(str(empid) for empid in empids)
|
||||
|
||||
|
||||
sql_file = Path("src") / "sql" / "fact" / f"{table_name}.sql"
|
||||
|
||||
log.info(f"Exists: {sql_file.exists()}")
|
||||
log.info(f"Path: {sql_file.resolve()}")
|
||||
|
||||
with open(sql_file, "r", encoding="utf-8") as f:
|
||||
sql_template = f.read()
|
||||
|
||||
sql = sql_template.format(
|
||||
empid_list=empid_list,
|
||||
run_date=run_date.strftime("%Y-%m-%d")
|
||||
)
|
||||
|
||||
log.info(f"Fetching data for {len(empids):,} EMPIDs")
|
||||
|
||||
log.info("Fetching OQaD data for run_date=%s", run_date)
|
||||
|
||||
df = pl.read_database(
|
||||
query=sql,
|
||||
connection=engine,
|
||||
)
|
||||
|
||||
log.info("fn name is fetch_OQad ------Fetched %s rows", len(df))
|
||||
|
||||
return df
|
||||
|
||||
|
||||
|
||||
|
||||
@register
|
||||
|
||||
def fetch_Coverage( engine: Engine,
|
||||
table_name: str,
|
||||
table_type: str,
|
||||
mids: list[int],
|
||||
run_date: date
|
||||
) -> pl.DataFrame:
|
||||
|
||||
|
||||
|
||||
if not mids:
|
||||
log.warning("No MIDs — nothing to fetch.")
|
||||
return pl.DataFrame()
|
||||
log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}")
|
||||
mid_list = ",".join(str(mid) for mid in mids)
|
||||
|
||||
sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
|
||||
|
||||
with open(sql_file, "r", encoding="utf-8") as f:
|
||||
sql_template = f.read()
|
||||
|
||||
sql = sql_template.format(
|
||||
mid_list=mid_list,
|
||||
run_date=run_date.strftime("%Y-%m-%d")
|
||||
)
|
||||
log.info(f"Fetching data for {len(mids):,} MIDs")
|
||||
|
||||
df = pl.read_database(
|
||||
query=sql,
|
||||
connection=engine
|
||||
)
|
||||
log.info(f"Fetched {len(df):,} rows from SQL Server")
|
||||
|
||||
return df
|
||||
|
||||
|
||||
@register
|
||||
def fetch_Survey( engine: Engine,
|
||||
table_name: str,
|
||||
table_type: str,
|
||||
mids: list[int],
|
||||
run_date: date
|
||||
) -> pl.DataFrame:
|
||||
|
||||
|
||||
|
||||
if not mids:
|
||||
log.warning("No MIDs — nothing to fetch.")
|
||||
return pl.DataFrame()
|
||||
|
||||
mid_list = ",".join(str(mid) for mid in mids)
|
||||
log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}")
|
||||
sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
|
||||
|
||||
with open(sql_file, "r", encoding="utf-8") as f:
|
||||
sql_template = f.read()
|
||||
|
||||
sql = sql_template.format(
|
||||
mid_list=mid_list,
|
||||
run_date=run_date.strftime("%Y-%m-%d")
|
||||
)
|
||||
log.info(f"Fetching data for {len(mids):,} MIDs")
|
||||
|
||||
df = pl.read_database(
|
||||
query=sql,
|
||||
connection=engine
|
||||
)
|
||||
log.info(f"Fetched {len(df):,} rows from SQL Server")
|
||||
|
||||
return df
|
||||
|
||||
|
||||
|
||||
|
||||
@register
|
||||
def fetch_Login( engine: Engine,
|
||||
table_name: str,
|
||||
table_type: str,
|
||||
mids: list[int],
|
||||
run_date: date
|
||||
) -> pl.DataFrame:
|
||||
|
||||
|
||||
|
||||
# if not mids:
|
||||
# log.warning("No MIDs — nothing to fetch.")
|
||||
# return pl.DataFrame()
|
||||
|
||||
mid_list = ",".join(str(mid) for mid in mids)
|
||||
log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}")
|
||||
sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
|
||||
|
||||
with open(sql_file, "r", encoding="utf-8") as f:
|
||||
sql_template = f.read()
|
||||
|
||||
sql = sql_template.format(
|
||||
# mid_list=mid_list,
|
||||
run_date=run_date.strftime("%Y-%m-%d")
|
||||
)
|
||||
log.info(f"Fetching data for {len(mids):,} MIDs")
|
||||
|
||||
df = pl.read_database(
|
||||
query=sql,
|
||||
connection=engine
|
||||
)
|
||||
log.info(f"Fetched {len(df):,} rows from SQL Server")
|
||||
|
||||
return df
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@register
|
||||
def fetch_Stock_Details( engine: Engine,
|
||||
table_name: str,
|
||||
table_type: str,
|
||||
mids: list[int],
|
||||
run_date: date
|
||||
) -> pl.DataFrame:
|
||||
|
||||
|
||||
|
||||
if not mids:
|
||||
log.warning("No MIDs — nothing to fetch.")
|
||||
return pl.DataFrame()
|
||||
|
||||
mid_list = ",".join(str(mid) for mid in mids)
|
||||
log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}")
|
||||
sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
|
||||
|
||||
with open(sql_file, "r", encoding="utf-8") as f:
|
||||
sql_template = f.read()
|
||||
|
||||
sql = sql_template.format(
|
||||
mid_list=mid_list,
|
||||
run_date=run_date.strftime("%Y-%m-%d")
|
||||
)
|
||||
log.info(f"Fetching data for {len(mids):,} MIDs")
|
||||
|
||||
df = pl.read_database(
|
||||
query=sql,
|
||||
connection=engine
|
||||
)
|
||||
log.info(f"Fetched {len(df):,} rows from SQL Server")
|
||||
|
||||
return df
|
||||
|
||||
|
||||
|
||||
|
||||
# def fetch_Coverage( engine: Engine,
|
||||
# table_name: str,
|
||||
# table_type: str,
|
||||
# mids: list[int],
|
||||
# run_date: date
|
||||
# ) -> pl.DataFrame:
|
||||
|
||||
|
||||
|
||||
# if not mids:
|
||||
# log.warning("No MIDs — nothing to fetch.")
|
||||
# return pl.DataFrame()
|
||||
|
||||
# mid_list = ",".join(str(mid) for mid in mids)
|
||||
# log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}")
|
||||
# sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
|
||||
|
||||
# with open(sql_file, "r", encoding="utf-8") as f:
|
||||
# sql_template = f.read()
|
||||
|
||||
# sql = sql_template.format(
|
||||
# mid_list=mid_list,
|
||||
# run_date=run_date.strftime("%Y-%m-%d")
|
||||
# )
|
||||
# log.info(f"Fetching data for {len(mids):,} MIDs")
|
||||
|
||||
# df = pl.read_database(
|
||||
# query=sql,
|
||||
# connection=engine
|
||||
# )
|
||||
# log.info(f"Fetched {len(df):,} rows from SQL Server")
|
||||
|
||||
# return df
|
||||
|
||||
|
||||
@register
|
||||
def fetch_Attendance(
|
||||
engine: Engine,
|
||||
table_name: str,
|
||||
table_type: str,
|
||||
mids: list[int],
|
||||
run_date: date | None = None,
|
||||
days_back: int = 15
|
||||
) -> pl.DataFrame:
|
||||
"""
|
||||
Fetch attendance source data.
|
||||
|
||||
Default:
|
||||
end_date = yesterday
|
||||
start_date = yesterday - 15 days
|
||||
"""
|
||||
|
||||
if run_date is None:
|
||||
run_date = date.today() - timedelta(days=1)
|
||||
|
||||
start_date = run_date - timedelta(days=days_back)
|
||||
|
||||
sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
|
||||
|
||||
with open(sql_file, "r", encoding="utf-8") as f:
|
||||
sql_template = f.read()
|
||||
|
||||
sql = sql_template.format(
|
||||
start_date=start_date.strftime("%Y-%m-%d"),
|
||||
run_date=run_date.strftime("%Y-%m-%d")
|
||||
)
|
||||
log.info(
|
||||
f"Fetching Attendance data from {start_date} to {run_date}"
|
||||
)
|
||||
|
||||
df = pl.read_database(
|
||||
query=sql,
|
||||
connection=engine
|
||||
)
|
||||
|
||||
log.info(
|
||||
f"Fetched {len(df):,} attendance rows "
|
||||
f"for {df['employee_id'].n_unique():,} employees"
|
||||
)
|
||||
|
||||
return df
|
||||
|
||||
|
||||
@register
|
||||
def fetch_Journey_Plan( engine: Engine,
|
||||
table_name: str,
|
||||
table_type: str,
|
||||
mids: list[int],
|
||||
run_date: date
|
||||
) -> pl.DataFrame:
|
||||
|
||||
|
||||
|
||||
if not mids:
|
||||
log.warning("No MIDs — nothing to fetch.")
|
||||
return pl.DataFrame()
|
||||
|
||||
log.info(f" Start Fetching data for these {len(mids):} MIDs ")
|
||||
mid_list = ",".join(str(mid) for mid in mids)
|
||||
log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}")
|
||||
sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
|
||||
|
||||
with open(sql_file, "r", encoding="utf-8") as f:
|
||||
sql_template = f.read()
|
||||
|
||||
sql = sql_template.format(
|
||||
mid_list=mid_list,
|
||||
run_date=run_date.strftime("%Y-%m-%d")
|
||||
)
|
||||
log.info(f"Fetching data for {len(mids):,} MIDs")
|
||||
|
||||
df = pl.read_database(
|
||||
query=sql,
|
||||
connection=engine
|
||||
)
|
||||
log.info(f"Fetched {len(df):,} rows from SQL Server")
|
||||
|
||||
return df
|
||||
|
||||
|
||||
@register
|
||||
def fetch_PaidVisibility( engine: Engine,
|
||||
table_name: str,
|
||||
table_type: str,
|
||||
mids: list[int],
|
||||
run_date: date
|
||||
) -> pl.DataFrame:
|
||||
|
||||
|
||||
|
||||
if not mids:
|
||||
log.warning("No MIDs — nothing to fetch.")
|
||||
return pl.DataFrame()
|
||||
|
||||
log.info(f" Start Fetching data for these {len(mids):} MIDs ")
|
||||
mid_list = ",".join(str(mid) for mid in mids)
|
||||
log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}")
|
||||
sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
|
||||
|
||||
with open(sql_file, "r", encoding="utf-8") as f:
|
||||
sql_template = f.read()
|
||||
|
||||
sql = sql_template.format(
|
||||
mid_list=mid_list,
|
||||
run_date=run_date.strftime("%Y-%m-%d")
|
||||
)
|
||||
log.info(f"Fetching data for {len(mids):,} MIDs")
|
||||
|
||||
df = pl.read_database(
|
||||
query=sql,
|
||||
connection=engine
|
||||
)
|
||||
log.info(f"Fetched {len(df):,} rows from SQL Server")
|
||||
|
||||
return df
|
||||
|
||||
@register
|
||||
def fetch_Web_Logins( engine: Engine,
|
||||
table_name: str,
|
||||
table_type: str,
|
||||
mids: list[int],
|
||||
run_date: date
|
||||
) -> pl.DataFrame:
|
||||
|
||||
|
||||
|
||||
if not mids:
|
||||
log.warning("No MIDs — nothing to fetch.")
|
||||
return pl.DataFrame()
|
||||
|
||||
log.info(f" Start Fetching data for these {len(mids):} MIDs ")
|
||||
mid_list = ",".join(str(mid) for mid in mids)
|
||||
log.info(f"Start Fetching data for these {len(mids):} MIDs or based on this date {run_date}")
|
||||
sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
|
||||
|
||||
with open(sql_file, "r", encoding="utf-8") as f:
|
||||
sql_template = f.read()
|
||||
|
||||
sql = sql_template.format(
|
||||
mid_list=mid_list,
|
||||
run_date=run_date.strftime("%Y-%m-%d")
|
||||
)
|
||||
log.info(f"Fetching data for {len(mids):,} MIDs")
|
||||
|
||||
df = pl.read_database(
|
||||
query=sql,
|
||||
connection=engine
|
||||
)
|
||||
log.info(f"Fetched {len(df):,} rows from SQL Server")
|
||||
|
||||
return df
|
||||
|
||||
@register
|
||||
def fetch_Promotion(
|
||||
engine: Engine,
|
||||
table_name: str,
|
||||
table_type: str,
|
||||
mids: list[int],
|
||||
run_date: date
|
||||
) -> pl.DataFrame:
|
||||
|
||||
if not mids:
|
||||
log.warning("No MIDs — nothing to fetch.")
|
||||
return pl.DataFrame()
|
||||
|
||||
mid_list = ",".join(str(mid) for mid in mids)
|
||||
|
||||
sql_file = Path("src") / "sql" / f"{table_type.lower()}" / f"{table_name}.sql"
|
||||
|
||||
with open(sql_file, "r", encoding="utf-8") as f:
|
||||
sql_template = f.read()
|
||||
|
||||
sql = sql_template.format(
|
||||
mid_list=mid_list,
|
||||
run_date=run_date.strftime("%Y-%m-%d")
|
||||
)
|
||||
|
||||
log.info(f"Fetching data for {len(mids):,} MIDs")
|
||||
|
||||
df = pl.read_database(
|
||||
query=sql,
|
||||
connection=engine
|
||||
)
|
||||
|
||||
log.info(f"Fetched {len(df):,} rows from SQL Server")
|
||||
|
||||
return df
|
||||
+35
-71
@@ -1,76 +1,40 @@
|
||||
|
||||
WITH MID_TABLE_COV1 AS
|
||||
(
|
||||
|
||||
SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate
|
||||
FROM OneApp_KelloggsMT.dbo.T_OQAD
|
||||
WHERE CreateDate >= '{run_date}'
|
||||
AND CreateDate < DATEADD(DAY, 1, '{run_date}')
|
||||
|
||||
UNION ALL
|
||||
|
||||
SELECT EmpId, CAST(VisitDate AS DATE) AS VisitDate
|
||||
FROM OneApp_KelloggsMT.dbo.T_OQAD
|
||||
WHERE UpdateDate >= '{run_date}'
|
||||
AND UpdateDate < DATEADD(DAY, 1, '{run_date}')
|
||||
SELECT EmpId,VisitDate FROM OneApp_KelloggsMT.dbo.T_OQAD
|
||||
WHERE CONVERT(VARCHAR,CreateDate,101)= CONVERT(VARCHAR,cast('{run_date}' as date),101)
|
||||
UNION
|
||||
SELECT EmpId,VisitDate FROM OneApp_KelloggsMT.dbo.T_OQAD
|
||||
WHERE CONVERT(VARCHAR,UpdateDate,101)= CONVERT(VARCHAR,cast('{run_date}' as date),101)
|
||||
),
|
||||
|
||||
QUIZ AS
|
||||
QUIZ
|
||||
As
|
||||
(
|
||||
SELECT DISTINCT
|
||||
E.EmpId,
|
||||
E.EmpName,
|
||||
E.SupervisorId,
|
||||
E.SupervisorName,
|
||||
E.DesignationName,
|
||||
E.CityName,
|
||||
E.StateName,
|
||||
E.RegionName,
|
||||
CAST(DQ.VisitDate AS DATE) AS VisitDate,
|
||||
DQ.QuestionId,
|
||||
DQ.AnswerId,
|
||||
QC.QuestionCategoryId,
|
||||
QC.QuestionCategory
|
||||
FROM OneApp_KelloggsMT.dbo.T_OQAD DQ
|
||||
INNER JOIN OneApp_KelloggsMT.dbo.vw_Employee_Detail E
|
||||
ON DQ.EmpId = E.EmpId
|
||||
INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Question QU
|
||||
ON DQ.QuestionId = QU.QuestionId
|
||||
INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Category QC
|
||||
ON QU.QuestionCategoryId = QC.QuestionCategoryId
|
||||
WHERE E.EmpName NOT LIKE '%TEST%'
|
||||
AND E.RightId = 6
|
||||
AND (E.ResignDate IS NULL OR CAST(E.ResignDate AS DATE) >= '{run_date}')
|
||||
AND EXISTS (
|
||||
SELECT 1
|
||||
FROM MID_TABLE_COV1 A
|
||||
WHERE A.EmpId = DQ.EmpId
|
||||
AND A.VisitDate = CAST(DQ.VisitDate AS DATE)
|
||||
)
|
||||
-- ✅ Exclude EmpIds already loaded into ClickHouse
|
||||
AND E.EmpId NOT IN ({empid_list})
|
||||
SELECT Distinct E.EmpId, E.EmpName, E.SupervisorId, E.SupervisorName, E.DesignationName, E.CityName, E.StateName, E.RegionName
|
||||
, CONVERT(VARCHAR,DQ.VisitDate,101) AS VisitDate
|
||||
, DQ.QuestionId, DQ.AnswerId, qc.QuestionCategoryId, qc.QuestionCategory
|
||||
FROM OneApp_KelloggsMT.dbo.T_OQAD DQ INNER JOIN
|
||||
OneApp_KelloggsMT.dbo.vw_Employee_Detail E ON DQ.EmpId = E.EmpId inner join
|
||||
OneApp_KelloggsMT.dbo.Master_OQAD_Question QU on DQ.QuestionId= qu.QuestionId inner join
|
||||
OneApp_KelloggsMT.dbo.Master_OQAD_Category qc on qu.QuestionCategoryId= qc.QuestionCategoryId
|
||||
where e.EmpName not like 'test%' and e.RightId in (6)
|
||||
and (E.ResignDate is null or E.ResignDate>=''+CONVERT(VARCHAR,cast('{run_date}' as date),101)+'') AND E.EmpName NOT LIKE '%TEST%'
|
||||
AND DQ.EmpId IN (SELECT EmpId FROM MID_TABLE_COV1 A WHERE
|
||||
DQ.EmpId=A.EmpId AND CONVERT(date,DQ.VisitDate,101)=CONVERT(date,A.VisitDate,101) )
|
||||
)
|
||||
|
||||
SELECT
|
||||
40148 AS project_id,
|
||||
Q.EmpId AS employee_id,
|
||||
0 AS process_id,
|
||||
Q.VisitDate AS visit_date,
|
||||
Q.QuestionCategoryId AS question_category_id,
|
||||
Q.QuestionCategory AS question_category,
|
||||
QM.QuestionId AS question_id,
|
||||
QM.Question AS question,
|
||||
ISNULL(QA.AnswerId, 0) AS answer_id,
|
||||
ISNULL(QA.Answer, '') AS answer,
|
||||
CASE
|
||||
WHEN QA.AnswerId IS NULL THEN 'Not Answer'
|
||||
WHEN QA.RightAnswer = 1 THEN 'Y'
|
||||
WHEN QA.RightAnswer IS NULL THEN 'Not Answer'
|
||||
ELSE 'N'
|
||||
END AS correct_answer,
|
||||
GETDATE() AS update_date,
|
||||
'SP-Pius' AS update_by
|
||||
FROM QUIZ Q
|
||||
INNER JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Question QM
|
||||
ON Q.QuestionId = QM.QuestionId
|
||||
LEFT JOIN OneApp_KelloggsMT.dbo.Master_OQAD_Answer QA
|
||||
ON Q.AnswerId = QA.AnswerId
|
||||
, OQaD (project_id,
|
||||
employee_id,process_id,visit_date,question_category_id,question_category,question_id,
|
||||
question,answer_id,answer,correct_answer,update_date,update_by)
|
||||
AS (
|
||||
Select '40148' AS ProjectId,Q.EmpId,0,Q.VisitDate,Q.QuestionCategoryId,Q.QuestionCategory,
|
||||
QM.QuestionId, QM.Question
|
||||
, CASE WHEN QA.AnswerId IS NULL THEN 0 ELSE QA.AnswerId END AS AnswerId
|
||||
,CASE WHEN QA.ANSWER IS NULL THEN '' ELSE QA.ANSWER END AS Answer
|
||||
, CASE WHEN QA.AnswerId IS NULL THEN 'Not Answer' ELSE CASE WHEN QA.RightAnswer = 1 THEN 'Y'
|
||||
ELSE CASE WHEN QA.RightAnswer IS NULL THEN 'Not Answer' ELSE 'N' END END END AS RightAnswer,GETDATE(),'SP-Pius'
|
||||
FROM QUIZ Q INNER JOIN
|
||||
OneApp_KelloggsMT.dbo.Master_OQAD_Question QM ON Q.QuestionId = QM.QuestionId Left JOIN
|
||||
OneApp_KelloggsMT.dbo.Master_OQAD_Answer QA ON Q.AnswerId= QA.AnswerId
|
||||
where Q.EmpId not in ( {empid_list})
|
||||
)
|
||||
select * from OQaD
|
||||
@@ -1,2 +1,2 @@
|
||||
- pipeline_trigeered_on_date: '2026-06-23'
|
||||
last_successful_run_date: 2026-06-22
|
||||
- pipeline_trigeered_on_date: '2026-06-25'
|
||||
last_successful_run_date: 2026-06-24
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
fn(sql_engine, table_name , table_type, mids, run_date)
|
||||
Reference in New Issue
Block a user