Files
janssen/Panorama/import_to_mongo.py

355 lines
12 KiB
Python

"""
Import Panorama XLSX reportů do MongoDB (databáze: Panorama).
Podporované typy:
- Issues & Deviations → kolekce IssuesAndDeviations (klíč: ID / fuzzy+hash)
- Site Visit Details → kolekce Visits (klíč: Site Visit ID (Technical))
- FUL details → kolekce FUL (klíč: SVR Document Number)
- PANORAMA Site Contacts → kolekce contacts (klíč: Contact Identifier)
Filtr: pouze řádky s Country Name == "Czechia"
Historie: při změně fields se stará verze uloží do pole history[]
Použití:
python import_to_mongo.py # importuje všechny xlsx z Downloads/
python import_to_mongo.py Downloads/konkretni.xlsx # jeden soubor
"""
import hashlib
import re
import shutil
import sys
from datetime import datetime, date
from pathlib import Path
import openpyxl
from pymongo import MongoClient, ASCENDING
from rapidfuzz import fuzz
FUZZY_FIELDS = ("Description", "Comments", "Action Taken")
FUZZY_MIN_FIELDS = 2 # počet polí, která musí dosáhnout prahu
FUZZY_THRESHOLD = 90.0 # % shoda pro pole v 2-of-3 logice
FALLBACK_DESC_THRESHOLD = 95.0 # % shoda Description, když chybí druhé pole
MONGO_URI = "mongodb://192.168.1.76:27017"
DB_NAME = "Panorama"
DOWNLOADS_DIR = Path(__file__).parent / "Downloads"
PROCESSED_DIR = DOWNLOADS_DIR / "Zpracovano"
COUNTRY_FILTER = "Czechia" # None = všechny země
HEADER_ROW = 5 # 0-indexed řádek s hlavičkou
DATA_START_ROW = 6 # 0-indexed první datový řádek
REPORT_TYPES = {
"IssuesAndDeviations": {
"pattern": re.compile(r"Deviations and Issues\.xlsx$", re.IGNORECASE),
"collection": "IssuesAndDeviations",
"upsert_key": "ID",
"indexes": [
"fields.Country Name", "fields.Site ID", "fields.Status",
"fields.Brief Description - Subject ID",
],
},
"Visits": {
"pattern": re.compile(r"Site Visit Details\.xlsx$", re.IGNORECASE),
"collection": "Visits",
"upsert_key": "Site Visit ID (Technical)",
"indexes": [
"fields.Country Name", "fields.Site ID",
"fields.Site Visit Status", "fields.Site Visit Type",
],
},
"FUL": {
"pattern": re.compile(r"FUL details\.xlsx$", re.IGNORECASE),
"collection": "FUL",
"upsert_key": "SVR Document Number",
"indexes": [
"fields.Country Name", "fields.Site ID",
"fields.FUL Missing?", "fields.FUL Document Status",
],
},
"contacts": {
"pattern": re.compile(r"PANORAMA Site Contacts\.xlsx$", re.IGNORECASE),
"collection": "contacts",
"upsert_key": None,
"composite_keys": ["Contact Identifier", "Protocol ID", "Site ID", "Contact Role"],
"no_country_filter": True,
"indexes": [
"fields.Country Name", "fields.Site ID",
"fields.Protocol ID", "fields.Contact Role",
"fields.Contact Email Address", "fields.Contact Identifier",
],
},
}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def extract_snapshot_date(filename: str) -> str:
match = re.match(r"(\d{4}-\d{2}-\d{2})", Path(filename).name)
return match.group(1) if match else datetime.now().strftime("%Y-%m-%d")
def fuzzy_match_existing(collection, fields: dict):
"""Najde existující záznam pomocí fuzzy match nad Description/Comments/Action Taken.
Vrací existující dokument nebo None.
"""
candidates = collection.find({
"fields.Part": fields.get("Part"),
"fields.Protocol ID": fields.get("Protocol ID"),
"fields.Site ID": fields.get("Site ID"),
"fields.Create Date": fields.get("Create Date"),
"fields.Brief Description - Subject ID": fields.get("Brief Description - Subject ID"),
"fields.ID": None,
})
new_vals = {f: (fields.get(f) or "").strip() for f in FUZZY_FIELDS}
for cand in candidates:
cand_fields = cand.get("fields", {})
cand_vals = {f: (cand_fields.get(f) or "").strip() for f in FUZZY_FIELDS}
scores = {}
for f in FUZZY_FIELDS:
if new_vals[f] and cand_vals[f]:
scores[f] = fuzz.ratio(new_vals[f], cand_vals[f])
passing = [f for f, s in scores.items() if s >= FUZZY_THRESHOLD]
if len(scores) >= FUZZY_MIN_FIELDS and len(passing) >= FUZZY_MIN_FIELDS:
return cand
if len(scores) < FUZZY_MIN_FIELDS and "Description" in scores and scores["Description"] >= FALLBACK_DESC_THRESHOLD:
return cand
return None
def detect_report_type(filename: str) -> dict | None:
for cfg in REPORT_TYPES.values():
if cfg["pattern"].search(filename):
return cfg
return None
def clean_value(val):
"""Převede datetime na ISO string, None nechá, zbytek strip."""
if val is None:
return None
if isinstance(val, datetime):
return val.isoformat()
if isinstance(val, date):
return val.isoformat()
if isinstance(val, str):
val = val.strip()
return val if val else None
return val
# ---------------------------------------------------------------------------
# Import jednoho souboru
# ---------------------------------------------------------------------------
def import_file(xlsx_path: str, collection, report_cfg: dict) -> dict:
filename = Path(xlsx_path).name
snapshot_date = extract_snapshot_date(filename)
upsert_key = report_cfg["upsert_key"]
collection_name = report_cfg["collection"]
use_fuzzy = (collection_name == "IssuesAndDeviations")
apply_country_filter = COUNTRY_FILTER and not report_cfg.get("no_country_filter")
wb = openpyxl.load_workbook(xlsx_path, read_only=True)
ws = wb[wb.sheetnames[0]]
rows = list(ws.iter_rows(values_only=True))
wb.close()
header = rows[HEADER_ROW]
data_rows = [r for r in rows[DATA_START_ROW:] if any(v is not None for v in r)]
xlsx_count = len(data_rows)
inserted = changed = unchanged = filtered_out = 0
for row in data_rows:
raw = dict(zip(header, row))
country = (raw.get("Country Name") or "")
if apply_country_filter and country != COUNTRY_FILTER:
filtered_out += 1
continue
fields = {}
for k, v in raw.items():
if k is None:
continue
fields[k] = clean_value(v)
composite_keys = report_cfg.get("composite_keys")
record_id = raw.get(upsert_key) if upsert_key else None
has_id = record_id is not None
if composite_keys:
key_parts = [str(raw.get(k) or "").strip() for k in composite_keys]
h = hashlib.sha1("|".join(key_parts).encode("utf-8")).hexdigest()[:16]
record_id = f"C-{h}"
existing = collection.find_one({"record_id": record_id})
elif has_id:
record_id = str(int(record_id)) if isinstance(record_id, (int, float)) else str(record_id).strip()
existing = collection.find_one({"record_id": record_id})
elif use_fuzzy:
existing = fuzzy_match_existing(collection, fields)
if existing is not None:
record_id = existing["record_id"]
else:
key_parts = [
str(raw.get("Part") or ""),
str(raw.get("Site ID") or ""),
str(raw.get("Create Date") or ""),
str(raw.get("Description") or ""),
str(raw.get("Brief Description - Subject ID") or ""),
str(raw.get("Comments") or ""),
str(raw.get("Action Taken") or ""),
]
h = hashlib.sha1("|".join(key_parts).encode("utf-8")).hexdigest()[:16]
record_id = f"H-{h}"
else:
filtered_out += 1
continue
if existing is None:
doc = {
"record_id": record_id,
"fields": fields,
"sourceFile": filename,
"firstSeen": snapshot_date,
"lastSeen": snapshot_date,
"history": [],
}
collection.insert_one(doc)
inserted += 1
elif existing.get("fields") != fields:
old_entry = {
"date": existing.get("lastSeen", snapshot_date),
"fields": existing["fields"],
}
collection.update_one(
{"_id": existing["_id"]},
{
"$push": {"history": old_entry},
"$set": {
"fields": fields,
"sourceFile": filename,
"lastSeen": snapshot_date,
},
},
)
changed += 1
else:
collection.update_one(
{"_id": existing["_id"]},
{"$set": {"lastSeen": snapshot_date, "sourceFile": filename}},
)
unchanged += 1
processed = inserted + changed + unchanged + filtered_out
protocol_id = None
for row in data_rows[:50]:
raw = dict(zip(header, row))
pid = raw.get("Protocol ID")
if pid:
protocol_id = str(pid).strip()
break
db_count = collection.count_documents({"fields.Protocol ID": protocol_id}) if protocol_id else None
stats = {
"snapshot": snapshot_date,
"inserted": inserted,
"changed": changed,
"unchanged": unchanged,
"filtered_out": filtered_out,
"xlsx_count": xlsx_count,
"db_count": db_count,
"protocol_id": protocol_id,
}
print(f" {collection_name} [{snapshot_date}]: +{inserted} new, ~{changed} changed, ={unchanged} same, -{filtered_out} non-CZ")
expected_in_db = xlsx_count - filtered_out
if processed != xlsx_count:
print(f" !!! VAROVANI: zpracovano {processed} radku, ale v XLSX je {xlsx_count} datovych radku")
if db_count is not None and db_count != expected_in_db and not report_cfg.get("composite_keys"):
print(f" !!! VAROVANI: v DB je {db_count} dokumentu pro Protocol ID {protocol_id}, ocekavano {expected_in_db} (XLSX {xlsx_count} - filtered {filtered_out})")
return stats
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
paths: list[Path] = []
if len(sys.argv) > 1:
for arg in sys.argv[1:]:
p = Path(arg)
if p.is_file():
paths.append(p)
else:
print(f"Soubor nenalezen: {arg}")
else:
paths = sorted(DOWNLOADS_DIR.glob("*.xlsx"))
if not paths:
print("Zadne XLSX soubory k importu.")
return
print(f"Nalezeno {len(paths)} souboru.\n")
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
client.admin.command("ping")
db = client[DB_NAME]
collections_cache: dict[str, any] = {}
def get_collection(cfg: dict):
name = cfg["collection"]
if name not in collections_cache:
col = db[name]
col.create_index([("record_id", ASCENDING)], unique=True)
for idx_field in cfg["indexes"]:
col.create_index([(idx_field, ASCENDING)])
collections_cache[name] = col
return collections_cache[name]
PROCESSED_DIR.mkdir(exist_ok=True)
total = {"inserted": 0, "changed": 0, "unchanged": 0}
for xlsx_path in paths:
report_cfg = detect_report_type(xlsx_path.name)
if report_cfg is None:
print(f"PRESKAKUJI (neznamy typ): {xlsx_path.name}")
continue
collection = get_collection(report_cfg)
print(f"Import: {xlsx_path.name} -> {report_cfg['collection']}")
stats = import_file(str(xlsx_path), collection, report_cfg)
for k in total:
total[k] += stats.get(k, 0)
dest = PROCESSED_DIR / xlsx_path.name
shutil.move(str(xlsx_path), str(dest))
print(f" -> presunut do Zpracovano/")
client.close()
print(f"\nCelkem: +{total['inserted']} new, ~{total['changed']} changed, ={total['unchanged']} same")
if __name__ == "__main__":
main()