#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ seaweed_attachments_backfill_jnj.py Jednorazovy backfill: JNJ prilohy z .msg souboru -> SeaweedFS na Tower1. JNJ vetev (vbuzalka@its.jnj.com) parsuje .msg z /mnt/JNJEMAILS a do Mongo uklada u priloh JEN metadata — binarky zustavaly uvnitr .msg. Tento skript znovuotevre kazdy .msg, vytahne bajty priloh, nahraje je do SeaweedFS a do mail dokumentu doplni u kazde prilohy sha256 + seaweed_path + seaweed_url a doc-level seaweed_synced_at. PARITA: pouziva PRIMO funkce open_message() a extract_attachments() z nasazeneho jnj_tower_ingest_v1.3.py (nacteno pres importlib z disku, protoze nazev s "v1.3" neni validni modul). extract_attachments uz sam uploaduje do SeaweedFS a vraci obohacene zaznamy — tady jen prepiseme attachments[]. RESUME + BATCH: bere has_attachments=True dokumenty bez platneho seaweed_synced_at; kazdy zpracovany oznaci -> vypadne z dotazu. Chybove stavy (taky dostanou seaweed_synced_at, aby nezacyklily resume): seaweed_file_missing : True — .msg na disku nenalezen seaweed_parse_error : — .msg nejde otevrit / chyba extrakce Spousteni (v python-runner kontejneru na Toweru): docker exec python-runner python /scripts/seaweed_attachments_backfill_jnj.py ... --dry-run # nic nezapise, jen spocita (a otevre .msg pro test) ... --limit 200 # jen N dokumentu ... --retry-errors # znovu i seaweed_file_missing / seaweed_parse_error """ import sys import time import argparse import importlib.util from pathlib import Path from datetime import datetime, timezone from pymongo import MongoClient, UpdateOne if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8", errors="replace") MONGO_URI = "mongodb://192.168.1.76:27017" MONGO_DB = "emaily" MONGO_COL = "vbuzalka@its.jnj.com" JNJEMAILS = Path("/mnt/JNJEMAILS") MODULE_PATH = "/scripts/jnj_tower_ingest_v1.3.py" BATCH = 300 def load_jnj(): spec = importlib.util.spec_from_file_location("jnj_ingest", MODULE_PATH) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) return mod def main() -> int: ap = argparse.ArgumentParser(description="Backfill JNJ priloh -> SeaweedFS") ap.add_argument("--limit", type=int, default=0) ap.add_argument("--dry-run", action="store_true") ap.add_argument("--retry-errors", action="store_true") args = ap.parse_args() jnj = load_jnj() client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) client.admin.command("ping") col = client[MONGO_DB][MONGO_COL] not_synced = {"$or": [{"seaweed_synced_at": {"$exists": False}}, {"seaweed_synced_at": None}]} if args.retry_errors: query = {"has_attachments": True, "$or": [{"seaweed_synced_at": {"$exists": False}}, {"seaweed_synced_at": None}, {"seaweed_file_missing": True}, {"seaweed_parse_error": {"$exists": True}}]} else: query = {"has_attachments": True, **not_synced} total = col.count_documents(query) print("=== Backfill JNJ priloh -> SeaweedFS ===") print(f"Filer: {jnj.sw.SEAWEED_FILER}{jnj.sw.BASE_PATH}") print(f"Dokumentu s prilohami ke zpracovani: {total}" f"{' (DRY-RUN)' if args.dry_run else ''}") if total == 0: print("Neni co delat.") return 0 t0 = time.time() done = files = atts_up = missing = errors = 0 while True: if args.limit and done >= args.limit: break take = min(BATCH, args.limit - done) if args.limit else BATCH docs = list(col.find(query, {"_id": 1, "filename": 1}).limit(take)) if not docs: break ops = [] for d in docs: done += 1 _id = d["_id"] fname = d.get("filename", "") now = datetime.now(timezone.utc).replace(tzinfo=None) fpath = JNJEMAILS / fname if not fname or not fpath.is_file(): missing += 1 print(f" MISS {fname}") ops.append(UpdateOne({"_id": _id}, {"$set": { "seaweed_file_missing": True, "seaweed_synced_at": now}})) continue try: msg, _mode = jnj.open_message(fpath) if msg is None: raise RuntimeError("open_message vratil None") atts = jnj.extract_attachments(msg) # <- uploaduje do SeaweedFS try: msg.close() except Exception: pass except Exception as e: errors += 1 print(f" ERR {fname}: {e}") ops.append(UpdateOne({"_id": _id}, {"$set": { "seaweed_parse_error": str(e)[:200], "seaweed_synced_at": now}})) continue up = sum(1 for a in atts if a.get("seaweed_path")) atts_up += up files += 1 if args.dry_run: continue ops.append(UpdateOne({"_id": _id}, { "$set": {"attachments": atts, "seaweed_synced_at": now}, "$unset": {"seaweed_file_missing": "", "seaweed_parse_error": ""}})) if ops and not args.dry_run: col.bulk_write(ops, ordered=False) rate = done / max(time.time() - t0, 0.001) print(f" {done}/{total} soubory={files} prilohy_up={atts_up} " f"miss={missing} err={errors} ({rate:.1f} doc/s)") if args.dry_run and not args.limit: break # dry-run: kurzor se neposouva (nemizi z dotazu) -> 1 pruchod dt = time.time() - t0 print(f"\n=== HOTOVO za {dt/60:.1f} min ===") print(f"dokumentu={done} zpracovano_souboru={files} nahrano_priloh={atts_up} " f"chybi_soubor={missing} chyby={errors}") return 0 if __name__ == "__main__": sys.exit(main())