193 lines
7.2 KiB
Python
193 lines
7.2 KiB
Python
"""
|
|
import_CZ_contacts.py
|
|
Importuje kontakty středisek Czechia z PANORAMA Dashboard xlsx do MySQL tabulky CTMS_contacts.
|
|
- Zpracuje všechny *.xlsx soubory ve SOURCE_DIR
|
|
- Filtruje pouze řádky Country Name == 'Czechia'
|
|
- file_date bere z document properties xlsx (dcterms:created)
|
|
- Každý soubor vždy přepíše (delete + insert podle file_date + protocol_id + country)
|
|
"""
|
|
|
|
import zipfile
|
|
import xml.etree.ElementTree as ET
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import pandas as pd
|
|
import mysql.connector
|
|
|
|
# ── Konfigurace ────────────────────────────────────────────────────────────────
|
|
SOURCE_DIR = Path(r"U:\PythonProject\Janssen\CTMS\PanoramaContacts\SourceData")
|
|
|
|
DB_CONFIG = {
|
|
"host": "192.168.1.76",
|
|
"port": 3306,
|
|
"user": "root",
|
|
"password": "Vlado9674+",
|
|
"database": "studie",
|
|
"charset": "utf8mb4",
|
|
}
|
|
|
|
TABLE = "CTMS_contacts"
|
|
COUNTRY = "Czechia"
|
|
SHEET = "Site Contacts"
|
|
HEADER_ROW = 5 # 0-based → řádek č. 6 v Excelu
|
|
|
|
COL_MAP = {
|
|
"Sector": "sector",
|
|
"TA": "ta",
|
|
"Protocol ID": "protocol_id",
|
|
"GTL-GTM/CTM": "gtl_ctm",
|
|
"Country Name": "country_name",
|
|
"LTM Name": "ltm_name",
|
|
"Site ID": "site_id",
|
|
"SM Name": "sm_name",
|
|
"PI Full Name": "pi_full_name",
|
|
"Institution Name": "institution_name",
|
|
"Contact Identifier": "contact_identifier",
|
|
"Title": "contact_title",
|
|
"Last Name": "last_name",
|
|
"First Name": "first_name",
|
|
"Contact Role": "contact_role",
|
|
"Contact Type": "contact_type",
|
|
"Pr St Cont Primary Indicator": "primary_indicator",
|
|
"SUA Reporting Indicator": "sua_reporting_indicator",
|
|
"Financial Disclosure Indicator": "financial_disclosure_indicator",
|
|
"Contact Phone Number": "phone",
|
|
"Alternative Phone Number": "phone_alt",
|
|
"Mobile Phone Number": "phone_mobile",
|
|
"Contact Fax Number": "fax",
|
|
"Contact Email Address": "email",
|
|
"SUA Reporting Email Address": "email_sua",
|
|
"Contact Start Date": "contact_start_date",
|
|
"Contact End Date": "contact_end_date",
|
|
"Degree/qualification": "degree_qualification",
|
|
"Job Title": "job_title",
|
|
"Contact Address Line 1": "address_line1",
|
|
"Contact Address Line 2": "address_line2",
|
|
"Contact Address Line 3": "address_line3",
|
|
"Contact City": "city",
|
|
"Contact Addr State/Province": "state_province",
|
|
"Contact Zip/Postal Code": "zip_postal_code",
|
|
}
|
|
|
|
|
|
# ── Pomocné funkce ─────────────────────────────────────────────────────────────
|
|
def get_file_created_date(xlsx_path: Path):
|
|
"""Vrátí date z dcterms:created v docProps/core.xml."""
|
|
with zipfile.ZipFile(xlsx_path) as z:
|
|
with z.open("docProps/core.xml") as f:
|
|
root = ET.parse(f).getroot()
|
|
el = root.find("{http://purl.org/dc/terms/}created")
|
|
dt = datetime.fromisoformat(el.text.replace("Z", "+00:00"))
|
|
return dt.astimezone(timezone.utc).date()
|
|
|
|
|
|
def clean_value(val):
|
|
"""Převede NaN / NaT / Timestamp na typy přijatelné MySQL driverem."""
|
|
import math
|
|
if val is None:
|
|
return None
|
|
if isinstance(val, float):
|
|
return None if math.isnan(val) else val
|
|
if isinstance(val, pd.Timestamp):
|
|
return None if pd.isnull(val) else val.date()
|
|
try:
|
|
if pd.isna(val):
|
|
return None
|
|
except Exception:
|
|
pass
|
|
return val
|
|
|
|
|
|
def get_protocol_id(xlsx_path: Path) -> str:
|
|
"""Přečte protocol_id z prvního datového řádku (rychle, bez načtení celého souboru)."""
|
|
df = pd.read_excel(xlsx_path, sheet_name=SHEET, header=HEADER_ROW,
|
|
usecols=["Protocol ID"], nrows=1)
|
|
return str(df["Protocol ID"].iloc[0])
|
|
|
|
|
|
def import_file(xlsx_path: Path, cursor, conn):
|
|
"""Zpracuje jeden xlsx soubor — vždy přepíše (delete + insert)."""
|
|
file_date = get_file_created_date(xlsx_path)
|
|
protocol_id = get_protocol_id(xlsx_path)
|
|
print(f" file_date : {file_date}")
|
|
print(f" protocol_id : {protocol_id}")
|
|
|
|
df = pd.read_excel(xlsx_path, sheet_name=SHEET, header=HEADER_ROW)
|
|
df_cz = df[df["Country Name"] == COUNTRY].copy()
|
|
print(f" radku CZ : {len(df_cz)}")
|
|
|
|
if df_cz.empty:
|
|
print(" -> zadne CZ radky, preskoceno")
|
|
return 0
|
|
|
|
df_cz = df_cz.rename(columns=COL_MAP)
|
|
db_cols = list(COL_MAP.values())
|
|
|
|
# Smazání stávajících záznamů pro tento soubor (přepis)
|
|
cursor.execute(
|
|
f"DELETE FROM {TABLE} "
|
|
f"WHERE file_date = %s AND protocol_id = %s AND country_name = %s",
|
|
(file_date, protocol_id, COUNTRY)
|
|
)
|
|
deleted = cursor.rowcount
|
|
if deleted:
|
|
print(f" prepis : smazano {deleted} starych radku")
|
|
|
|
placeholders = ", ".join(["%s"] * (len(db_cols) + 1))
|
|
insert_cols = "file_date, " + ", ".join(db_cols)
|
|
sql_insert = f"INSERT INTO {TABLE} ({insert_cols}) VALUES ({placeholders})"
|
|
|
|
for _, row in df_cz.iterrows():
|
|
values = [file_date] + [clean_value(row.get(col)) for col in db_cols]
|
|
cursor.execute(sql_insert, values)
|
|
|
|
conn.commit()
|
|
return len(df_cz)
|
|
|
|
|
|
# ── Hlavní logika ──────────────────────────────────────────────────────────────
|
|
def main():
|
|
files = sorted(SOURCE_DIR.glob("*.xlsx"))
|
|
if not files:
|
|
print(f"Zadne xlsx soubory v {SOURCE_DIR}")
|
|
return
|
|
|
|
today = datetime.now(timezone.utc).date()
|
|
print(f"Nalezeno souboru: {len(files)} | dnesni datum: {today}")
|
|
print(f"Pripojuji se k MySQL...")
|
|
conn = mysql.connector.connect(**DB_CONFIG)
|
|
cursor = conn.cursor()
|
|
|
|
summary = []
|
|
for xlsx in files:
|
|
print(f"\n[{xlsx.name}]")
|
|
try:
|
|
file_date = get_file_created_date(xlsx)
|
|
if file_date != today:
|
|
print(f" file_date : {file_date} -> PRESKOCENO (neni dnesni datum)")
|
|
summary.append((xlsx.name, f"preskoceno (file_date={file_date})"))
|
|
continue
|
|
n = import_file(xlsx, cursor, conn)
|
|
if n is None:
|
|
summary.append((xlsx.name, "preskoceno"))
|
|
else:
|
|
summary.append((xlsx.name, f"importovano {n} radku"))
|
|
except Exception as e:
|
|
conn.rollback()
|
|
summary.append((xlsx.name, f"CHYBA: {e}"))
|
|
print(f" CHYBA: {e}")
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
print("\n" + "=" * 60)
|
|
print("SOUHRN:")
|
|
for name, status in summary:
|
|
print(f" {name:<45} {status}")
|
|
print("=" * 60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|