""" Import Panorama XLSX reportů do MongoDB (databáze: Panorama). Podporované typy: - Issues & Deviations → kolekce IssuesAndDeviations (klíč: ID / fuzzy+hash) - Site Visit Details → kolekce Visits (klíč: Site Visit ID (Technical)) - FUL details → kolekce FUL (klíč: SVR Document Number) - PANORAMA Site Contacts → kolekce contacts (klíč: Contact Identifier) Filtr: pouze řádky s Country Name == "Czechia" Historie: při změně fields se stará verze uloží do pole history[] Použití: python import_to_mongo.py # importuje všechny xlsx z Downloads/ python import_to_mongo.py Downloads/konkretni.xlsx # jeden soubor """ import hashlib import re import shutil import sys from datetime import datetime, date from pathlib import Path import openpyxl from pymongo import MongoClient, ASCENDING from rapidfuzz import fuzz FUZZY_FIELDS = ("Description", "Comments", "Action Taken") FUZZY_MIN_FIELDS = 2 # počet polí, která musí dosáhnout prahu FUZZY_THRESHOLD = 90.0 # % shoda pro pole v 2-of-3 logice FALLBACK_DESC_THRESHOLD = 95.0 # % shoda Description, když chybí druhé pole MONGO_URI = "mongodb://192.168.1.76:27017" DB_NAME = "Panorama" DOWNLOADS_DIR = Path(__file__).parent / "Downloads" PROCESSED_DIR = DOWNLOADS_DIR / "Zpracovano" COUNTRY_FILTER = "Czechia" # None = všechny země HEADER_ROW = 5 # 0-indexed řádek s hlavičkou DATA_START_ROW = 6 # 0-indexed první datový řádek REPORT_TYPES = { "IssuesAndDeviations": { "pattern": re.compile(r"Deviations and Issues\.xlsx$", re.IGNORECASE), "collection": "IssuesAndDeviations", "upsert_key": "ID", "indexes": [ "fields.Country Name", "fields.Site ID", "fields.Status", "fields.Brief Description - Subject ID", ], }, "Visits": { "pattern": re.compile(r"Site Visit Details\.xlsx$", re.IGNORECASE), "collection": "Visits", "upsert_key": "Site Visit ID (Technical)", "indexes": [ "fields.Country Name", "fields.Site ID", "fields.Site Visit Status", "fields.Site Visit Type", ], }, "FUL": { "pattern": re.compile(r"FUL details\.xlsx$", re.IGNORECASE), "collection": "FUL", "upsert_key": "SVR Document Number", "indexes": [ "fields.Country Name", "fields.Site ID", "fields.FUL Missing?", "fields.FUL Document Status", ], }, "contacts": { "pattern": re.compile(r"PANORAMA Site Contacts\.xlsx$", re.IGNORECASE), "collection": "contacts", "upsert_key": None, "composite_keys": ["Contact Identifier", "Protocol ID", "Site ID", "Contact Role"], "no_country_filter": True, "indexes": [ "fields.Country Name", "fields.Site ID", "fields.Protocol ID", "fields.Contact Role", "fields.Contact Email Address", "fields.Contact Identifier", ], }, } # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def extract_snapshot_date(filename: str) -> str: match = re.match(r"(\d{4}-\d{2}-\d{2})", Path(filename).name) return match.group(1) if match else datetime.now().strftime("%Y-%m-%d") def fuzzy_match_existing(collection, fields: dict): """Najde existující záznam pomocí fuzzy match nad Description/Comments/Action Taken. Vrací existující dokument nebo None. """ candidates = collection.find({ "fields.Part": fields.get("Part"), "fields.Protocol ID": fields.get("Protocol ID"), "fields.Site ID": fields.get("Site ID"), "fields.Create Date": fields.get("Create Date"), "fields.Brief Description - Subject ID": fields.get("Brief Description - Subject ID"), "fields.ID": None, }) new_vals = {f: (fields.get(f) or "").strip() for f in FUZZY_FIELDS} for cand in candidates: cand_fields = cand.get("fields", {}) cand_vals = {f: (cand_fields.get(f) or "").strip() for f in FUZZY_FIELDS} scores = {} for f in FUZZY_FIELDS: if new_vals[f] and cand_vals[f]: scores[f] = fuzz.ratio(new_vals[f], cand_vals[f]) passing = [f for f, s in scores.items() if s >= FUZZY_THRESHOLD] if len(scores) >= FUZZY_MIN_FIELDS and len(passing) >= FUZZY_MIN_FIELDS: return cand if len(scores) < FUZZY_MIN_FIELDS and "Description" in scores and scores["Description"] >= FALLBACK_DESC_THRESHOLD: return cand return None def detect_report_type(filename: str) -> dict | None: for cfg in REPORT_TYPES.values(): if cfg["pattern"].search(filename): return cfg return None def clean_value(val): """Převede datetime na ISO string, None nechá, zbytek strip.""" if val is None: return None if isinstance(val, datetime): return val.isoformat() if isinstance(val, date): return val.isoformat() if isinstance(val, str): val = val.strip() return val if val else None return val # --------------------------------------------------------------------------- # Import jednoho souboru # --------------------------------------------------------------------------- def import_file(xlsx_path: str, collection, report_cfg: dict) -> dict: filename = Path(xlsx_path).name snapshot_date = extract_snapshot_date(filename) upsert_key = report_cfg["upsert_key"] collection_name = report_cfg["collection"] use_fuzzy = (collection_name == "IssuesAndDeviations") apply_country_filter = COUNTRY_FILTER and not report_cfg.get("no_country_filter") wb = openpyxl.load_workbook(xlsx_path, read_only=True) ws = wb[wb.sheetnames[0]] rows = list(ws.iter_rows(values_only=True)) wb.close() header = rows[HEADER_ROW] data_rows = [r for r in rows[DATA_START_ROW:] if any(v is not None for v in r)] xlsx_count = len(data_rows) inserted = changed = unchanged = filtered_out = 0 for row in data_rows: raw = dict(zip(header, row)) country = (raw.get("Country Name") or "") if apply_country_filter and country != COUNTRY_FILTER: filtered_out += 1 continue fields = {} for k, v in raw.items(): if k is None: continue fields[k] = clean_value(v) composite_keys = report_cfg.get("composite_keys") record_id = raw.get(upsert_key) if upsert_key else None has_id = record_id is not None if composite_keys: key_parts = [str(raw.get(k) or "").strip() for k in composite_keys] h = hashlib.sha1("|".join(key_parts).encode("utf-8")).hexdigest()[:16] record_id = f"C-{h}" existing = collection.find_one({"record_id": record_id}) elif has_id: record_id = str(int(record_id)) if isinstance(record_id, (int, float)) else str(record_id).strip() existing = collection.find_one({"record_id": record_id}) elif use_fuzzy: existing = fuzzy_match_existing(collection, fields) if existing is not None: record_id = existing["record_id"] else: key_parts = [ str(raw.get("Part") or ""), str(raw.get("Site ID") or ""), str(raw.get("Create Date") or ""), str(raw.get("Description") or ""), str(raw.get("Brief Description - Subject ID") or ""), str(raw.get("Comments") or ""), str(raw.get("Action Taken") or ""), ] h = hashlib.sha1("|".join(key_parts).encode("utf-8")).hexdigest()[:16] record_id = f"H-{h}" else: filtered_out += 1 continue if existing is None: doc = { "record_id": record_id, "fields": fields, "sourceFile": filename, "firstSeen": snapshot_date, "lastSeen": snapshot_date, "history": [], } collection.insert_one(doc) inserted += 1 elif existing.get("fields") != fields: old_entry = { "date": existing.get("lastSeen", snapshot_date), "fields": existing["fields"], } collection.update_one( {"_id": existing["_id"]}, { "$push": {"history": old_entry}, "$set": { "fields": fields, "sourceFile": filename, "lastSeen": snapshot_date, }, }, ) changed += 1 else: collection.update_one( {"_id": existing["_id"]}, {"$set": {"lastSeen": snapshot_date, "sourceFile": filename}}, ) unchanged += 1 processed = inserted + changed + unchanged + filtered_out protocol_id = None for row in data_rows[:50]: raw = dict(zip(header, row)) pid = raw.get("Protocol ID") if pid: protocol_id = str(pid).strip() break db_count = collection.count_documents({"fields.Protocol ID": protocol_id}) if protocol_id else None stats = { "snapshot": snapshot_date, "inserted": inserted, "changed": changed, "unchanged": unchanged, "filtered_out": filtered_out, "xlsx_count": xlsx_count, "db_count": db_count, "protocol_id": protocol_id, } print(f" {collection_name} [{snapshot_date}]: +{inserted} new, ~{changed} changed, ={unchanged} same, -{filtered_out} non-CZ") expected_in_db = xlsx_count - filtered_out if processed != xlsx_count: print(f" !!! VAROVANI: zpracovano {processed} radku, ale v XLSX je {xlsx_count} datovych radku") if db_count is not None and db_count != expected_in_db and not report_cfg.get("composite_keys"): print(f" !!! VAROVANI: v DB je {db_count} dokumentu pro Protocol ID {protocol_id}, ocekavano {expected_in_db} (XLSX {xlsx_count} - filtered {filtered_out})") return stats # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): paths: list[Path] = [] if len(sys.argv) > 1: for arg in sys.argv[1:]: p = Path(arg) if p.is_file(): paths.append(p) else: print(f"Soubor nenalezen: {arg}") else: paths = sorted(DOWNLOADS_DIR.glob("*.xlsx")) if not paths: print("Zadne XLSX soubory k importu.") return print(f"Nalezeno {len(paths)} souboru.\n") client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) client.admin.command("ping") db = client[DB_NAME] collections_cache: dict[str, any] = {} def get_collection(cfg: dict): name = cfg["collection"] if name not in collections_cache: col = db[name] col.create_index([("record_id", ASCENDING)], unique=True) for idx_field in cfg["indexes"]: col.create_index([(idx_field, ASCENDING)]) collections_cache[name] = col return collections_cache[name] PROCESSED_DIR.mkdir(exist_ok=True) total = {"inserted": 0, "changed": 0, "unchanged": 0} for xlsx_path in paths: report_cfg = detect_report_type(xlsx_path.name) if report_cfg is None: print(f"PRESKAKUJI (neznamy typ): {xlsx_path.name}") continue collection = get_collection(report_cfg) print(f"Import: {xlsx_path.name} -> {report_cfg['collection']}") stats = import_file(str(xlsx_path), collection, report_cfg) for k in total: total[k] += stats.get(k, 0) dest = PROCESSED_DIR / xlsx_path.name shutil.move(str(xlsx_path), str(dest)) print(f" -> presunut do Zpracovano/") client.close() print(f"\nCelkem: +{total['inserted']} new, ~{total['changed']} changed, ={total['unchanged']} same") if __name__ == "__main__": main()