355 lines
12 KiB
Python
355 lines
12 KiB
Python
"""
|
|
Import Panorama XLSX reportů do MongoDB (databáze: Panorama).
|
|
|
|
Podporované typy:
|
|
- Issues & Deviations → kolekce IssuesAndDeviations (klíč: ID / fuzzy+hash)
|
|
- Site Visit Details → kolekce Visits (klíč: Site Visit ID (Technical))
|
|
- FUL details → kolekce FUL (klíč: SVR Document Number)
|
|
- PANORAMA Site Contacts → kolekce contacts (klíč: Contact Identifier)
|
|
|
|
Filtr: pouze řádky s Country Name == "Czechia"
|
|
Historie: při změně fields se stará verze uloží do pole history[]
|
|
|
|
Použití:
|
|
python import_to_mongo.py # importuje všechny xlsx z Downloads/
|
|
python import_to_mongo.py Downloads/konkretni.xlsx # jeden soubor
|
|
"""
|
|
|
|
import hashlib
|
|
import re
|
|
import shutil
|
|
import sys
|
|
from datetime import datetime, date
|
|
from pathlib import Path
|
|
|
|
import openpyxl
|
|
from pymongo import MongoClient, ASCENDING
|
|
from rapidfuzz import fuzz
|
|
|
|
FUZZY_FIELDS = ("Description", "Comments", "Action Taken")
|
|
FUZZY_MIN_FIELDS = 2 # počet polí, která musí dosáhnout prahu
|
|
FUZZY_THRESHOLD = 90.0 # % shoda pro pole v 2-of-3 logice
|
|
FALLBACK_DESC_THRESHOLD = 95.0 # % shoda Description, když chybí druhé pole
|
|
|
|
MONGO_URI = "mongodb://192.168.1.76:27017"
|
|
DB_NAME = "Panorama"
|
|
DOWNLOADS_DIR = Path(__file__).parent / "Downloads"
|
|
PROCESSED_DIR = DOWNLOADS_DIR / "Zpracovano"
|
|
|
|
COUNTRY_FILTER = "Czechia" # None = všechny země
|
|
HEADER_ROW = 5 # 0-indexed řádek s hlavičkou
|
|
DATA_START_ROW = 6 # 0-indexed první datový řádek
|
|
|
|
REPORT_TYPES = {
|
|
"IssuesAndDeviations": {
|
|
"pattern": re.compile(r"Deviations and Issues\.xlsx$", re.IGNORECASE),
|
|
"collection": "IssuesAndDeviations",
|
|
"upsert_key": "ID",
|
|
"indexes": [
|
|
"fields.Country Name", "fields.Site ID", "fields.Status",
|
|
"fields.Brief Description - Subject ID",
|
|
],
|
|
},
|
|
"Visits": {
|
|
"pattern": re.compile(r"Site Visit Details\.xlsx$", re.IGNORECASE),
|
|
"collection": "Visits",
|
|
"upsert_key": "Site Visit ID (Technical)",
|
|
"indexes": [
|
|
"fields.Country Name", "fields.Site ID",
|
|
"fields.Site Visit Status", "fields.Site Visit Type",
|
|
],
|
|
},
|
|
"FUL": {
|
|
"pattern": re.compile(r"FUL details\.xlsx$", re.IGNORECASE),
|
|
"collection": "FUL",
|
|
"upsert_key": "SVR Document Number",
|
|
"indexes": [
|
|
"fields.Country Name", "fields.Site ID",
|
|
"fields.FUL Missing?", "fields.FUL Document Status",
|
|
],
|
|
},
|
|
"contacts": {
|
|
"pattern": re.compile(r"PANORAMA Site Contacts\.xlsx$", re.IGNORECASE),
|
|
"collection": "contacts",
|
|
"upsert_key": None,
|
|
"composite_keys": ["Contact Identifier", "Protocol ID", "Site ID", "Contact Role"],
|
|
"no_country_filter": True,
|
|
"indexes": [
|
|
"fields.Country Name", "fields.Site ID",
|
|
"fields.Protocol ID", "fields.Contact Role",
|
|
"fields.Contact Email Address", "fields.Contact Identifier",
|
|
],
|
|
},
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def extract_snapshot_date(filename: str) -> str:
|
|
match = re.match(r"(\d{4}-\d{2}-\d{2})", Path(filename).name)
|
|
return match.group(1) if match else datetime.now().strftime("%Y-%m-%d")
|
|
|
|
|
|
def fuzzy_match_existing(collection, fields: dict):
|
|
"""Najde existující záznam pomocí fuzzy match nad Description/Comments/Action Taken.
|
|
Vrací existující dokument nebo None.
|
|
"""
|
|
candidates = collection.find({
|
|
"fields.Part": fields.get("Part"),
|
|
"fields.Protocol ID": fields.get("Protocol ID"),
|
|
"fields.Site ID": fields.get("Site ID"),
|
|
"fields.Create Date": fields.get("Create Date"),
|
|
"fields.Brief Description - Subject ID": fields.get("Brief Description - Subject ID"),
|
|
"fields.ID": None,
|
|
})
|
|
|
|
new_vals = {f: (fields.get(f) or "").strip() for f in FUZZY_FIELDS}
|
|
|
|
for cand in candidates:
|
|
cand_fields = cand.get("fields", {})
|
|
cand_vals = {f: (cand_fields.get(f) or "").strip() for f in FUZZY_FIELDS}
|
|
|
|
scores = {}
|
|
for f in FUZZY_FIELDS:
|
|
if new_vals[f] and cand_vals[f]:
|
|
scores[f] = fuzz.ratio(new_vals[f], cand_vals[f])
|
|
|
|
passing = [f for f, s in scores.items() if s >= FUZZY_THRESHOLD]
|
|
|
|
if len(scores) >= FUZZY_MIN_FIELDS and len(passing) >= FUZZY_MIN_FIELDS:
|
|
return cand
|
|
if len(scores) < FUZZY_MIN_FIELDS and "Description" in scores and scores["Description"] >= FALLBACK_DESC_THRESHOLD:
|
|
return cand
|
|
|
|
return None
|
|
|
|
|
|
def detect_report_type(filename: str) -> dict | None:
|
|
for cfg in REPORT_TYPES.values():
|
|
if cfg["pattern"].search(filename):
|
|
return cfg
|
|
return None
|
|
|
|
|
|
def clean_value(val):
|
|
"""Převede datetime na ISO string, None nechá, zbytek strip."""
|
|
if val is None:
|
|
return None
|
|
if isinstance(val, datetime):
|
|
return val.isoformat()
|
|
if isinstance(val, date):
|
|
return val.isoformat()
|
|
if isinstance(val, str):
|
|
val = val.strip()
|
|
return val if val else None
|
|
return val
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Import jednoho souboru
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def import_file(xlsx_path: str, collection, report_cfg: dict) -> dict:
|
|
filename = Path(xlsx_path).name
|
|
snapshot_date = extract_snapshot_date(filename)
|
|
upsert_key = report_cfg["upsert_key"]
|
|
collection_name = report_cfg["collection"]
|
|
use_fuzzy = (collection_name == "IssuesAndDeviations")
|
|
apply_country_filter = COUNTRY_FILTER and not report_cfg.get("no_country_filter")
|
|
|
|
wb = openpyxl.load_workbook(xlsx_path, read_only=True)
|
|
ws = wb[wb.sheetnames[0]]
|
|
rows = list(ws.iter_rows(values_only=True))
|
|
wb.close()
|
|
|
|
header = rows[HEADER_ROW]
|
|
|
|
data_rows = [r for r in rows[DATA_START_ROW:] if any(v is not None for v in r)]
|
|
xlsx_count = len(data_rows)
|
|
|
|
inserted = changed = unchanged = filtered_out = 0
|
|
|
|
for row in data_rows:
|
|
raw = dict(zip(header, row))
|
|
|
|
country = (raw.get("Country Name") or "")
|
|
if apply_country_filter and country != COUNTRY_FILTER:
|
|
filtered_out += 1
|
|
continue
|
|
|
|
fields = {}
|
|
for k, v in raw.items():
|
|
if k is None:
|
|
continue
|
|
fields[k] = clean_value(v)
|
|
|
|
composite_keys = report_cfg.get("composite_keys")
|
|
record_id = raw.get(upsert_key) if upsert_key else None
|
|
has_id = record_id is not None
|
|
|
|
if composite_keys:
|
|
key_parts = [str(raw.get(k) or "").strip() for k in composite_keys]
|
|
h = hashlib.sha1("|".join(key_parts).encode("utf-8")).hexdigest()[:16]
|
|
record_id = f"C-{h}"
|
|
existing = collection.find_one({"record_id": record_id})
|
|
elif has_id:
|
|
record_id = str(int(record_id)) if isinstance(record_id, (int, float)) else str(record_id).strip()
|
|
existing = collection.find_one({"record_id": record_id})
|
|
elif use_fuzzy:
|
|
existing = fuzzy_match_existing(collection, fields)
|
|
if existing is not None:
|
|
record_id = existing["record_id"]
|
|
else:
|
|
key_parts = [
|
|
str(raw.get("Part") or ""),
|
|
str(raw.get("Site ID") or ""),
|
|
str(raw.get("Create Date") or ""),
|
|
str(raw.get("Description") or ""),
|
|
str(raw.get("Brief Description - Subject ID") or ""),
|
|
str(raw.get("Comments") or ""),
|
|
str(raw.get("Action Taken") or ""),
|
|
]
|
|
h = hashlib.sha1("|".join(key_parts).encode("utf-8")).hexdigest()[:16]
|
|
record_id = f"H-{h}"
|
|
else:
|
|
filtered_out += 1
|
|
continue
|
|
|
|
if existing is None:
|
|
doc = {
|
|
"record_id": record_id,
|
|
"fields": fields,
|
|
"sourceFile": filename,
|
|
"firstSeen": snapshot_date,
|
|
"lastSeen": snapshot_date,
|
|
"history": [],
|
|
}
|
|
collection.insert_one(doc)
|
|
inserted += 1
|
|
|
|
elif existing.get("fields") != fields:
|
|
old_entry = {
|
|
"date": existing.get("lastSeen", snapshot_date),
|
|
"fields": existing["fields"],
|
|
}
|
|
collection.update_one(
|
|
{"_id": existing["_id"]},
|
|
{
|
|
"$push": {"history": old_entry},
|
|
"$set": {
|
|
"fields": fields,
|
|
"sourceFile": filename,
|
|
"lastSeen": snapshot_date,
|
|
},
|
|
},
|
|
)
|
|
changed += 1
|
|
|
|
else:
|
|
collection.update_one(
|
|
{"_id": existing["_id"]},
|
|
{"$set": {"lastSeen": snapshot_date, "sourceFile": filename}},
|
|
)
|
|
unchanged += 1
|
|
|
|
processed = inserted + changed + unchanged + filtered_out
|
|
protocol_id = None
|
|
for row in data_rows[:50]:
|
|
raw = dict(zip(header, row))
|
|
pid = raw.get("Protocol ID")
|
|
if pid:
|
|
protocol_id = str(pid).strip()
|
|
break
|
|
db_count = collection.count_documents({"fields.Protocol ID": protocol_id}) if protocol_id else None
|
|
|
|
stats = {
|
|
"snapshot": snapshot_date,
|
|
"inserted": inserted,
|
|
"changed": changed,
|
|
"unchanged": unchanged,
|
|
"filtered_out": filtered_out,
|
|
"xlsx_count": xlsx_count,
|
|
"db_count": db_count,
|
|
"protocol_id": protocol_id,
|
|
}
|
|
print(f" {collection_name} [{snapshot_date}]: +{inserted} new, ~{changed} changed, ={unchanged} same, -{filtered_out} non-CZ")
|
|
|
|
expected_in_db = xlsx_count - filtered_out
|
|
|
|
if processed != xlsx_count:
|
|
print(f" !!! VAROVANI: zpracovano {processed} radku, ale v XLSX je {xlsx_count} datovych radku")
|
|
if db_count is not None and db_count != expected_in_db and not report_cfg.get("composite_keys"):
|
|
print(f" !!! VAROVANI: v DB je {db_count} dokumentu pro Protocol ID {protocol_id}, ocekavano {expected_in_db} (XLSX {xlsx_count} - filtered {filtered_out})")
|
|
|
|
return stats
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
paths: list[Path] = []
|
|
|
|
if len(sys.argv) > 1:
|
|
for arg in sys.argv[1:]:
|
|
p = Path(arg)
|
|
if p.is_file():
|
|
paths.append(p)
|
|
else:
|
|
print(f"Soubor nenalezen: {arg}")
|
|
else:
|
|
paths = sorted(DOWNLOADS_DIR.glob("*.xlsx"))
|
|
|
|
if not paths:
|
|
print("Zadne XLSX soubory k importu.")
|
|
return
|
|
|
|
print(f"Nalezeno {len(paths)} souboru.\n")
|
|
|
|
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
|
|
client.admin.command("ping")
|
|
db = client[DB_NAME]
|
|
|
|
collections_cache: dict[str, any] = {}
|
|
|
|
def get_collection(cfg: dict):
|
|
name = cfg["collection"]
|
|
if name not in collections_cache:
|
|
col = db[name]
|
|
col.create_index([("record_id", ASCENDING)], unique=True)
|
|
for idx_field in cfg["indexes"]:
|
|
col.create_index([(idx_field, ASCENDING)])
|
|
collections_cache[name] = col
|
|
return collections_cache[name]
|
|
|
|
PROCESSED_DIR.mkdir(exist_ok=True)
|
|
|
|
total = {"inserted": 0, "changed": 0, "unchanged": 0}
|
|
|
|
for xlsx_path in paths:
|
|
report_cfg = detect_report_type(xlsx_path.name)
|
|
if report_cfg is None:
|
|
print(f"PRESKAKUJI (neznamy typ): {xlsx_path.name}")
|
|
continue
|
|
|
|
collection = get_collection(report_cfg)
|
|
print(f"Import: {xlsx_path.name} -> {report_cfg['collection']}")
|
|
stats = import_file(str(xlsx_path), collection, report_cfg)
|
|
for k in total:
|
|
total[k] += stats.get(k, 0)
|
|
|
|
dest = PROCESSED_DIR / xlsx_path.name
|
|
shutil.move(str(xlsx_path), str(dest))
|
|
print(f" -> presunut do Zpracovano/")
|
|
|
|
client.close()
|
|
|
|
print(f"\nCelkem: +{total['inserted']} new, ~{total['changed']} changed, ={total['unchanged']} same")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|