""" import_CZ_contacts.py Importuje kontakty středisek Czechia z PANORAMA Dashboard xlsx do MySQL tabulky CTMS_contacts. - Filtruje pouze řádky Country Name == 'Czechia' - file_date bere z document properties xlsx (dcterms:created) - Před importem smaže stávající záznamy se stejným file_date + country_name == 'Czechia' """ import zipfile import xml.etree.ElementTree as ET from datetime import datetime, timezone from pathlib import Path import pandas as pd import mysql.connector # ── Konfigurace ──────────────────────────────────────────────────────────────── SOURCE_FILE = Path(r"U:\PythonProject\Janssen\CTMS\PanoramaContacts\SourceData\PANORAMA Dashboard (33).xlsx") DB_CONFIG = { "host": "192.168.1.76", "port": 3306, "user": "root", "password": "Vlado9674+", "database": "studie", "charset": "utf8mb4", } TABLE = "CTMS_contacts" COUNTRY = "Czechia" SHEET = "Site Contacts" HEADER_ROW = 5 # 0-based → řádek č. 6 v Excelu # ── Pomocné funkce ───────────────────────────────────────────────────────────── def get_file_created_date(xlsx_path: Path) -> datetime.date: """Vrátí datum vytvoření souboru z docProps/core.xml (dcterms:created).""" ns = {"dcterms": "http://purl.org/dc/terms/"} with zipfile.ZipFile(xlsx_path) as z: with z.open("docProps/core.xml") as f: root = ET.parse(f).getroot() created_el = root.find("{http://purl.org/dc/terms/}created") dt = datetime.fromisoformat(created_el.text.replace("Z", "+00:00")) return dt.astimezone(timezone.utc).date() def clean_value(val): """Převede NaN / NaT / float na None, jinak vrátí string nebo date.""" if val is None: return None if isinstance(val, float): import math return None if math.isnan(val) else val if hasattr(val, "_value"): # pd.NaT return None try: import pandas as pd if pd.isna(val): return None except Exception: pass if isinstance(val, pd.Timestamp): return val.date() if not pd.isnull(val) else None return val # ── Hlavní logika ────────────────────────────────────────────────────────────── def main(): print(f"Soubor : {SOURCE_FILE}") # 1) Datum vytvoření z properties file_date = get_file_created_date(SOURCE_FILE) print(f"file_date (z docProps): {file_date}") # 2) Načtení dat print("Načítám Excel…") df = pd.read_excel(SOURCE_FILE, sheet_name=SHEET, header=HEADER_ROW) # 3) Filtr CZ df_cz = df[df["Country Name"] == COUNTRY].copy() print(f"Řádků CZ: {len(df_cz)}") # 4) Mapování Excel sloupců → DB sloupce col_map = { "Sector": "sector", "TA": "ta", "Protocol ID": "protocol_id", "GTL-GTM/CTM": "gtl_ctm", "Country Name": "country_name", "LTM Name": "ltm_name", "Site ID": "site_id", "SM Name": "sm_name", "PI Full Name": "pi_full_name", "Institution Name": "institution_name", "Contact Identifier": "contact_identifier", "Title": "contact_title", "Last Name": "last_name", "First Name": "first_name", "Contact Role": "contact_role", "Contact Type": "contact_type", "Pr St Cont Primary Indicator": "primary_indicator", "SUA Reporting Indicator": "sua_reporting_indicator", "Financial Disclosure Indicator": "financial_disclosure_indicator", "Contact Phone Number": "phone", "Alternative Phone Number": "phone_alt", "Mobile Phone Number": "phone_mobile", "Contact Fax Number": "fax", "Contact Email Address": "email", "SUA Reporting Email Address": "email_sua", "Contact Start Date": "contact_start_date", "Contact End Date": "contact_end_date", "Degree/qualification": "degree_qualification", "Job Title": "job_title", "Contact Address Line 1": "address_line1", "Contact Address Line 2": "address_line2", "Contact Address Line 3": "address_line3", "Contact City": "city", "Contact Addr State/Province": "state_province", "Contact Zip/Postal Code": "zip_postal_code", } df_cz = df_cz.rename(columns=col_map) db_cols = list(col_map.values()) # 5) Připojení k DB print("Připojuji se k MySQL…") conn = mysql.connector.connect(**DB_CONFIG) cursor = conn.cursor() # 6) Smazání stávajících záznamů pro stejný file_date + CZ (idempotentní import) cursor.execute( f"DELETE FROM {TABLE} WHERE file_date = %s AND country_name = %s", (file_date, COUNTRY) ) deleted = cursor.rowcount print(f"Smazáno starých záznamů: {deleted}") # 7) Insert placeholders = ", ".join(["%s"] * (len(db_cols) + 1)) # +1 pro file_date insert_cols = "file_date, " + ", ".join(db_cols) sql_insert = f"INSERT INTO {TABLE} ({insert_cols}) VALUES ({placeholders})" inserted = 0 for _, row in df_cz.iterrows(): values = [file_date] + [clean_value(row.get(col)) for col in db_cols] cursor.execute(sql_insert, values) inserted += 1 conn.commit() cursor.close() conn.close() print(f"Importováno záznamů : {inserted}") print("Hotovo OK") if __name__ == "__main__": main()