1st commit
This commit is contained in:
@@ -0,0 +1,264 @@
|
||||
|
||||
from __future__ import annotations
|
||||
from time import sleep
|
||||
import sys
|
||||
from datetime import date, datetime, timedelta
|
||||
|
||||
import polars as pl
|
||||
import yaml
|
||||
|
||||
|
||||
from log import log
|
||||
|
||||
from clickhouse_task.create_table import create_clickhouse_table
|
||||
from clickhouse_task.delete_task import (
|
||||
delete_existing_data,
|
||||
truncate_table,
|
||||
)
|
||||
|
||||
from clickhouse_task.load_table import load_to_clickhouse
|
||||
|
||||
from db_con.connection import (
|
||||
build_sql_server_engine,
|
||||
build_clickhouse_engine,
|
||||
get_clickhouse_client,
|
||||
)
|
||||
|
||||
from mids import (
|
||||
MID_TABLE_COV,
|
||||
MID_TABLE_COV1,
|
||||
)
|
||||
|
||||
from src.bridge import *
|
||||
from src.fact_updated import *
|
||||
from src.dim import *
|
||||
|
||||
|
||||
# ==========================================================
|
||||
# Helpers
|
||||
# ==========================================================
|
||||
|
||||
def get_dates_from_yaml(filename: str):
|
||||
with open(filename, "r") as file:
|
||||
data = yaml.safe_load(file)
|
||||
|
||||
start_date = date.fromisoformat(
|
||||
str(data["pipeline"]["start_date"])
|
||||
)
|
||||
|
||||
end_date = date.fromisoformat(
|
||||
str(data["pipeline"]["end_date"])
|
||||
)
|
||||
flag=str(data["pipeline"]["flag"])
|
||||
|
||||
return start_date, end_date , flag
|
||||
|
||||
|
||||
def write_table_to_yaml(
|
||||
data: dict,
|
||||
run_date: date,
|
||||
filename: str | None = None
|
||||
):
|
||||
"""Write table data to a YAML file."""
|
||||
|
||||
if filename is None:
|
||||
filename = f"elt_pipeline_{run_date}.yml"
|
||||
|
||||
with open(filename, "w") as file:
|
||||
yaml.dump(
|
||||
data,
|
||||
file,
|
||||
default_flow_style=False,
|
||||
sort_keys=False
|
||||
)
|
||||
|
||||
print(f"Table written to {filename}")
|
||||
|
||||
|
||||
|
||||
|
||||
def table_exists(
|
||||
client,
|
||||
table_name: str,
|
||||
) -> bool:
|
||||
|
||||
return bool(
|
||||
client.command(
|
||||
f"EXISTS TABLE {table_name}"
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
# ==========================================================
|
||||
# elt ()
|
||||
# ==========================================================
|
||||
|
||||
def elt(run_date : date , config: dict):
|
||||
|
||||
log.info("=" * 80)
|
||||
log.info("Hello from data-move Python data pipeline!")
|
||||
|
||||
# ------------------------------------------------------
|
||||
# Run Date
|
||||
# ------------------------------------------------------
|
||||
|
||||
|
||||
|
||||
log.info(
|
||||
"Pipeline Run Date: %s",
|
||||
run_date,
|
||||
)
|
||||
|
||||
# ------------------------------------------------------
|
||||
# Connections
|
||||
# ------------------------------------------------------
|
||||
|
||||
log.info(
|
||||
"Connecting to databases..."
|
||||
)
|
||||
|
||||
sql_engine = build_sql_server_engine()
|
||||
clickhouse_engine = build_clickhouse_engine()
|
||||
client = get_clickhouse_client()
|
||||
|
||||
log.info(
|
||||
"Database connections established"
|
||||
)
|
||||
|
||||
# ------------------------------------------------------
|
||||
# mids Keys
|
||||
# ------------------------------------------------------
|
||||
|
||||
mids = MID_TABLE_COV(
|
||||
sql_engine,
|
||||
run_date,
|
||||
)
|
||||
|
||||
emp_visit_df = MID_TABLE_COV1(
|
||||
sql_engine,
|
||||
run_date,
|
||||
)
|
||||
|
||||
|
||||
|
||||
|
||||
# ------------------------------------------------------
|
||||
# Process Tables
|
||||
# ------------------------------------------------------
|
||||
for table in config["tables"]:
|
||||
table_name = table["name"]
|
||||
operation = table["operation"]
|
||||
fetch_by = table["fetch_by"]
|
||||
table_type=table["type"]
|
||||
log.info(
|
||||
"Pipeline Run Date: %s",
|
||||
run_date,
|
||||
)
|
||||
|
||||
|
||||
log.info("=" * 80)
|
||||
log.info(f"Processing Table-:{table_name} | Table type -:{table_type} | fetcht by-:{fetch_by} | operation-:{operation}" )
|
||||
|
||||
try:
|
||||
|
||||
# ------------------------------------------
|
||||
# Fetch Data
|
||||
# ------------------------------------------
|
||||
|
||||
|
||||
log.info(f"Fetching Data from sql server for table-: {table_name} ..............")
|
||||
fetch_list=["mids" ,"run_date", "reason_id"]
|
||||
if fetch_by in fetch_list :
|
||||
df=registry[f"fetch_{table_name}"](sql_engine, table_name , table_type, mids, run_date)
|
||||
else:
|
||||
df = fetch_data(sql_engine ,table_name,table_type)
|
||||
|
||||
log.info(f"Fetched total row -: {len(df)} from sql server for table-:{table_name} ...........!!!")
|
||||
|
||||
if df.is_empty():
|
||||
|
||||
log.warning(
|
||||
"%s returned no rows",
|
||||
table_name,
|
||||
)
|
||||
|
||||
continue
|
||||
|
||||
log.info(
|
||||
"Fetched %s rows",
|
||||
len(df),
|
||||
)
|
||||
|
||||
# ------------------------------------------
|
||||
# Create Table If Missing
|
||||
# ------------------------------------------
|
||||
|
||||
exists = table_exists(
|
||||
client,
|
||||
table_name,
|
||||
)
|
||||
|
||||
if not exists:
|
||||
|
||||
log.info(
|
||||
"Creating table %s",
|
||||
table_name,
|
||||
)
|
||||
|
||||
create_clickhouse_table(
|
||||
df=df,
|
||||
table_name=table_name,
|
||||
clickhouse_engine=clickhouse_engine,
|
||||
)
|
||||
|
||||
# ------------------------------------------
|
||||
# Existing Table Logic
|
||||
# ------------------------------------------
|
||||
|
||||
else:
|
||||
|
||||
if operation == "DELETE+INSERT":
|
||||
|
||||
truncate_table(
|
||||
client,
|
||||
table_name,
|
||||
)
|
||||
|
||||
else:
|
||||
|
||||
delete_existing_data(
|
||||
client=client,
|
||||
table_name=table_name,
|
||||
run_date=run_date,
|
||||
mids=mids,
|
||||
emp_visit_df=emp_visit_df,
|
||||
)
|
||||
|
||||
# ------------------------------------------
|
||||
# Load Data
|
||||
# ------------------------------------------
|
||||
log.info("_ _ _ _Inserting data into clickhouse db from sql server_ _ _ _")
|
||||
load_to_clickhouse(
|
||||
client=client,
|
||||
table_name=table_name,
|
||||
df=df,
|
||||
)
|
||||
|
||||
log.info(
|
||||
"%s loaded successfully (%s rows)",
|
||||
table_name,
|
||||
len(df),
|
||||
)
|
||||
|
||||
except Exception:
|
||||
|
||||
log.exception(
|
||||
"Failed processing table %s",
|
||||
table_name,
|
||||
)
|
||||
|
||||
raise
|
||||
|
||||
log.info("=" * 80)
|
||||
log.info("Pipeline Completed Successfully")
|
||||
log.info("=" * 80)
|
||||
Reference in New Issue
Block a user