""" Import Panorama Issues & Deviations XLSX do MongoDB (databáze: Panorama). Kolekce: IssuesAndDeviations Filtr: pouze řádky s Country Name == "Czechia" Historie: při změně fields se stará verze uloží do pole history[] Použití: python import_to_mongo.py # importuje všechny xlsx z Downloads/ python import_to_mongo.py Downloads/konkretni.xlsx # jeden soubor """ import re import shutil import sys from datetime import datetime, date from pathlib import Path import openpyxl from pymongo import MongoClient, ASCENDING MONGO_URI = "mongodb://192.168.1.76:27017" DB_NAME = "Panorama" COLLECTION_NAME = "IssuesAndDeviations" DOWNLOADS_DIR = Path(__file__).parent / "Downloads" PROCESSED_DIR = DOWNLOADS_DIR / "Zpracovano" COUNTRY_FILTER = None # None = všechny země HEADER_ROW = 5 # 0-indexed řádek s hlavičkou DATA_START_ROW = 6 # 0-indexed první datový řádek UPSERT_KEY = "ID" # unikátní klíč pro upsert # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def extract_snapshot_date(filename: str) -> str: match = re.match(r"(\d{4}-\d{2}-\d{2})", Path(filename).name) return match.group(1) if match else datetime.now().strftime("%Y-%m-%d") def clean_value(val): """Převede datetime na ISO string, None nechá, zbytek strip.""" if val is None: return None if isinstance(val, datetime): return val.isoformat() if isinstance(val, date): return val.isoformat() if isinstance(val, str): val = val.strip() return val if val else None return val # --------------------------------------------------------------------------- # Import jednoho souboru # --------------------------------------------------------------------------- def import_file(xlsx_path: str, collection) -> dict: filename = Path(xlsx_path).name snapshot_date = extract_snapshot_date(filename) wb = openpyxl.load_workbook(xlsx_path, read_only=True) ws = wb[wb.sheetnames[0]] rows = list(ws.iter_rows(values_only=True)) wb.close() header = rows[HEADER_ROW] inserted = changed = unchanged = filtered_out = 0 for row in rows[DATA_START_ROW:]: raw = dict(zip(header, row)) country = (raw.get("Country Name") or "") if COUNTRY_FILTER and country != COUNTRY_FILTER: filtered_out += 1 continue record_id = raw.get(UPSERT_KEY) if record_id is None: continue record_id = str(int(record_id)) if isinstance(record_id, (int, float)) else str(record_id).strip() fields = {} for k, v in raw.items(): if k is None: continue fields[k] = clean_value(v) existing = collection.find_one({"record_id": record_id}) if existing is None: doc = { "record_id": record_id, "fields": fields, "sourceFile": filename, "firstSeen": snapshot_date, "lastSeen": snapshot_date, "history": [], } collection.insert_one(doc) inserted += 1 elif existing.get("fields") != fields: old_entry = { "date": existing.get("lastSeen", snapshot_date), "fields": existing["fields"], } collection.update_one( {"_id": existing["_id"]}, { "$push": {"history": old_entry}, "$set": { "fields": fields, "sourceFile": filename, "lastSeen": snapshot_date, }, }, ) changed += 1 else: collection.update_one( {"_id": existing["_id"]}, {"$set": {"lastSeen": snapshot_date, "sourceFile": filename}}, ) unchanged += 1 stats = { "snapshot": snapshot_date, "inserted": inserted, "changed": changed, "unchanged": unchanged, "filtered_out": filtered_out, } print(f" {COLLECTION_NAME} [{snapshot_date}]: +{inserted} new, ~{changed} changed, ={unchanged} same, -{filtered_out} non-CZ") return stats # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): paths: list[Path] = [] if len(sys.argv) > 1: for arg in sys.argv[1:]: p = Path(arg) if p.is_file(): paths.append(p) else: print(f"Soubor nenalezen: {arg}") else: paths = sorted(DOWNLOADS_DIR.glob("*.xlsx")) if not paths: print("Zadne XLSX soubory k importu.") return print(f"Nalezeno {len(paths)} souboru.\n") client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) client.admin.command("ping") db = client[DB_NAME] collection = db[COLLECTION_NAME] collection.create_index([("record_id", ASCENDING)], unique=True) collection.create_index([("fields.Country Name", ASCENDING)]) collection.create_index([("fields.Site ID", ASCENDING)]) collection.create_index([("fields.Status", ASCENDING)]) collection.create_index([("fields.Brief Description - Subject ID", ASCENDING)]) PROCESSED_DIR.mkdir(exist_ok=True) total = {"inserted": 0, "changed": 0, "unchanged": 0} for xlsx_path in paths: print(f"Import: {xlsx_path.name}") stats = import_file(str(xlsx_path), collection) for k in total: total[k] += stats.get(k, 0) dest = PROCESSED_DIR / xlsx_path.name shutil.move(str(xlsx_path), str(dest)) print(f" -> presunut do Zpracovano/") client.close() print(f"\nCelkem: +{total['inserted']} new, ~{total['changed']} changed, ={total['unchanged']} same") if __name__ == "__main__": main()