From 6bcb721eb4d957364e6f24c494378320d4a683a5 Mon Sep 17 00:00:00 2001 From: Vladimir Buzalka Date: Sat, 13 Jun 2026 21:43:01 +0200 Subject: [PATCH] =?UTF-8?q?P=C5=99=C3=ADlohy=20ze=20v=C5=A1ech=203=20email?= =?UTF-8?q?=20pipeline=20=E2=86=92=20SeaweedFS=20(glob=C3=A1ln=C3=AD=20SHA?= =?UTF-8?q?-256=20dedup)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sjednocení ukládání příloh do jednoho blob storu na Tower1 (SeaweedFS Filer), content-addressed cesta /mail-attachments/ab/cd/ přes sdílený seaweed_store.py. Tři zdroje, jeden dedup: - mailstore: mailstore_attachments_poc.py (pole seaweed_attachments[]) - Graph: 3_download_attachments v1.4→v1.5 (upload při stažení nové přílohy; attachments_index dostává seaweed_path/url/synced_at) + backfill graph - JNJ: jnj_tower_ingest v1.2→v1.3 (upload při parse .msg; attachments[] dostává sha256/seaweed_path/url + doc-level seaweed_synced_at) + backfill jnj Backfill skripty jsou idempotentní (batch+resume, --retry-errors). Výpadek SeaweedFS žádnou pipeline neshodí (jen warning, doplní backfill). Ověřeno: 114 726 objektů / 53.3 GB, 0 nesynchronizovaných dokumentů, globální dedup mezi větvemi funguje. Co-Authored-By: Claude Opus 4.8 --- ...ngest_v1.2.py => jnj_tower_ingest_v1.3.py} | 50 +- .../seaweed_attachments_backfill_jnj.py | 163 +++++++ Python-runner/0_run_pipeline_v1.0.py | 2 +- ...v1.4.py => 3_download_attachments_v1.5.py} | 50 +- Python-runner/_deploy_seaweed.py | 40 ++ .../seaweed_attachments_backfill_graph.py | 183 +++++++ Python-runner/seaweed_store.py | 77 +++ claude-memory/project_seaweedfs.md | 9 +- mailstore/mailstore_attachments_poc.py | 454 ++++++++++++++++++ 9 files changed, 1008 insertions(+), 20 deletions(-) rename EmailsImport/{jnj_tower_ingest_v1.2.py => jnj_tower_ingest_v1.3.py} (96%) create mode 100644 EmailsImport/seaweed_attachments_backfill_jnj.py rename Python-runner/{3_download_attachments_v1.4.py => 3_download_attachments_v1.5.py} (93%) create mode 100644 Python-runner/_deploy_seaweed.py create mode 100644 Python-runner/seaweed_attachments_backfill_graph.py create mode 100644 Python-runner/seaweed_store.py create mode 100644 mailstore/mailstore_attachments_poc.py diff --git a/EmailsImport/jnj_tower_ingest_v1.2.py b/EmailsImport/jnj_tower_ingest_v1.3.py similarity index 96% rename from EmailsImport/jnj_tower_ingest_v1.2.py rename to EmailsImport/jnj_tower_ingest_v1.3.py index c884a0e..97a143b 100644 --- a/EmailsImport/jnj_tower_ingest_v1.2.py +++ b/EmailsImport/jnj_tower_ingest_v1.3.py @@ -1,10 +1,17 @@ """ -jnj_tower_ingest v1.2 -Nazev: jnj_tower_ingest_v1.2.py -Verze: 1.2.0 -Datum: 2026-06-10 +jnj_tower_ingest v1.3 +Nazev: jnj_tower_ingest_v1.3.py +Verze: 1.3.0 +Datum: 2026-06-13 Autor: vladimir.buzalka +ZMENA 1.3: + PARSE faze pri extrakci priloh zaroven nahraje binarku do SeaweedFS + (Tower1) pres sdileny seaweed_store.py; kazda priloha dostane sha256 + + seaweed_path + seaweed_url. Dokument s prilohami dostane doc-level + seaweed_synced_at. Vypadek SeaweedFS parse neshodi (jen warning). + Backfill jiz naparsovanych: seaweed_attachments_backfill_jnj.py. + Popis: Sjednoceny Tower-side ingest JNJ e-mailu. Spojuje tri drive oddelene casti do jednoho behu (vse bezi v kontejneru python-runner u Monga): @@ -97,6 +104,7 @@ import sys import os import re import glob +import hashlib import logging import argparse import base64 @@ -113,6 +121,9 @@ import olefile from dateutil import parser as dtparser from pymongo import MongoClient, UpdateOne, ASCENDING, TEXT +sys.path.insert(0, str(Path(__file__).resolve().parent)) +import seaweed_store as sw + if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8", errors="replace") @@ -271,18 +282,37 @@ def extract_attachments(msg) -> list: if not fname: continue size = 0 + raw = None try: d = att.data - size = len(d) if d else 0 + if isinstance(d, (bytes, bytearray)): + raw = bytes(d) + size = len(raw) + elif d: + size = len(d) # embedded message apod. — bez bajtu except Exception: pass - result.append({ + mime = safe(att, "mimetype", "mimeType", default="application/octet-stream") + entry = { "filename": fname, "size_bytes": size, - "mime_type": safe(att, "mimetype", "mimeType", default="application/octet-stream"), + "mime_type": mime, "content_id": safe(att, "cid", default=None), "is_inline": bool(safe(att, "isInline", default=False)), - }) + } + # SeaweedFS upload (dedup dle obsahu, sdilene s Graph/mailstore vetvi). + # Vypadek SeaweedFS NESMI shodit parse — pole se proste nedoplni a + # dozene je seaweed_attachments_backfill_jnj.py. + if raw: + try: + h = hashlib.sha256(raw).hexdigest() + path, url, _ = sw.store(h, raw, mime) + entry["sha256"] = h + entry["seaweed_path"] = path + entry["seaweed_url"] = url + except Exception as e: + logging.warning("SeaweedFS upload selhal (%s): %s", fname, e) + result.append(entry) except Exception as e: logging.error("extract_attachments: %s", e) return result @@ -680,6 +710,10 @@ def extract_message(msg_path: Path) -> Optional[dict]: "parse_degraded": parse_degraded, "parsed_at": datetime.now(timezone.utc).replace(tzinfo=None), + # priznak ze prilohy (pokud nejake) jsou v SeaweedFS — pro backfill + "seaweed_synced_at": (datetime.now(timezone.utc).replace(tzinfo=None) + if any(a.get("seaweed_path") for a in attachments) + else None), } except Exception as e: diff --git a/EmailsImport/seaweed_attachments_backfill_jnj.py b/EmailsImport/seaweed_attachments_backfill_jnj.py new file mode 100644 index 0000000..375cb06 --- /dev/null +++ b/EmailsImport/seaweed_attachments_backfill_jnj.py @@ -0,0 +1,163 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +seaweed_attachments_backfill_jnj.py +Jednorazovy backfill: JNJ prilohy z .msg souboru -> SeaweedFS na Tower1. + +JNJ vetev (vbuzalka@its.jnj.com) parsuje .msg z /mnt/JNJEMAILS a do Mongo +uklada u priloh JEN metadata — binarky zustavaly uvnitr .msg. Tento skript +znovuotevre kazdy .msg, vytahne bajty priloh, nahraje je do SeaweedFS a do +mail dokumentu doplni u kazde prilohy sha256 + seaweed_path + seaweed_url +a doc-level seaweed_synced_at. + +PARITA: pouziva PRIMO funkce open_message() a extract_attachments() z +nasazeneho jnj_tower_ingest_v1.3.py (nacteno pres importlib z disku, protoze +nazev s "v1.3" neni validni modul). extract_attachments uz sam uploaduje do +SeaweedFS a vraci obohacene zaznamy — tady jen prepiseme attachments[]. + +RESUME + BATCH: bere has_attachments=True dokumenty bez platneho +seaweed_synced_at; kazdy zpracovany oznaci -> vypadne z dotazu. + +Chybove stavy (taky dostanou seaweed_synced_at, aby nezacyklily resume): + seaweed_file_missing : True — .msg na disku nenalezen + seaweed_parse_error : — .msg nejde otevrit / chyba extrakce + +Spousteni (v python-runner kontejneru na Toweru): + docker exec python-runner python /scripts/seaweed_attachments_backfill_jnj.py + ... --dry-run # nic nezapise, jen spocita (a otevre .msg pro test) + ... --limit 200 # jen N dokumentu + ... --retry-errors # znovu i seaweed_file_missing / seaweed_parse_error +""" + +import sys +import time +import argparse +import importlib.util +from pathlib import Path +from datetime import datetime, timezone + +from pymongo import MongoClient, UpdateOne + +if hasattr(sys.stdout, "reconfigure"): + sys.stdout.reconfigure(encoding="utf-8", errors="replace") + +MONGO_URI = "mongodb://192.168.1.76:27017" +MONGO_DB = "emaily" +MONGO_COL = "vbuzalka@its.jnj.com" +JNJEMAILS = Path("/mnt/JNJEMAILS") +MODULE_PATH = "/scripts/jnj_tower_ingest_v1.3.py" +BATCH = 300 + + +def load_jnj(): + spec = importlib.util.spec_from_file_location("jnj_ingest", MODULE_PATH) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +def main() -> int: + ap = argparse.ArgumentParser(description="Backfill JNJ priloh -> SeaweedFS") + ap.add_argument("--limit", type=int, default=0) + ap.add_argument("--dry-run", action="store_true") + ap.add_argument("--retry-errors", action="store_true") + args = ap.parse_args() + + jnj = load_jnj() + + client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) + client.admin.command("ping") + col = client[MONGO_DB][MONGO_COL] + + not_synced = {"$or": [{"seaweed_synced_at": {"$exists": False}}, + {"seaweed_synced_at": None}]} + if args.retry_errors: + query = {"has_attachments": True, + "$or": [{"seaweed_synced_at": {"$exists": False}}, + {"seaweed_synced_at": None}, + {"seaweed_file_missing": True}, + {"seaweed_parse_error": {"$exists": True}}]} + else: + query = {"has_attachments": True, **not_synced} + + total = col.count_documents(query) + print("=== Backfill JNJ priloh -> SeaweedFS ===") + print(f"Filer: {jnj.sw.SEAWEED_FILER}{jnj.sw.BASE_PATH}") + print(f"Dokumentu s prilohami ke zpracovani: {total}" + f"{' (DRY-RUN)' if args.dry_run else ''}") + if total == 0: + print("Neni co delat.") + return 0 + + t0 = time.time() + done = files = atts_up = missing = errors = 0 + + while True: + if args.limit and done >= args.limit: + break + take = min(BATCH, args.limit - done) if args.limit else BATCH + docs = list(col.find(query, {"_id": 1, "filename": 1}).limit(take)) + if not docs: + break + + ops = [] + for d in docs: + done += 1 + _id = d["_id"] + fname = d.get("filename", "") + now = datetime.now(timezone.utc).replace(tzinfo=None) + fpath = JNJEMAILS / fname + + if not fname or not fpath.is_file(): + missing += 1 + print(f" MISS {fname}") + ops.append(UpdateOne({"_id": _id}, {"$set": { + "seaweed_file_missing": True, "seaweed_synced_at": now}})) + continue + + try: + msg, _mode = jnj.open_message(fpath) + if msg is None: + raise RuntimeError("open_message vratil None") + atts = jnj.extract_attachments(msg) # <- uploaduje do SeaweedFS + try: + msg.close() + except Exception: + pass + except Exception as e: + errors += 1 + print(f" ERR {fname}: {e}") + ops.append(UpdateOne({"_id": _id}, {"$set": { + "seaweed_parse_error": str(e)[:200], "seaweed_synced_at": now}})) + continue + + up = sum(1 for a in atts if a.get("seaweed_path")) + atts_up += up + files += 1 + + if args.dry_run: + continue + + ops.append(UpdateOne({"_id": _id}, { + "$set": {"attachments": atts, "seaweed_synced_at": now}, + "$unset": {"seaweed_file_missing": "", "seaweed_parse_error": ""}})) + + if ops and not args.dry_run: + col.bulk_write(ops, ordered=False) + + rate = done / max(time.time() - t0, 0.001) + print(f" {done}/{total} soubory={files} prilohy_up={atts_up} " + f"miss={missing} err={errors} ({rate:.1f} doc/s)") + + if args.dry_run and not args.limit: + break # dry-run: kurzor se neposouva (nemizi z dotazu) -> 1 pruchod + + dt = time.time() - t0 + print(f"\n=== HOTOVO za {dt/60:.1f} min ===") + print(f"dokumentu={done} zpracovano_souboru={files} nahrano_priloh={atts_up} " + f"chybi_soubor={missing} chyby={errors}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/Python-runner/0_run_pipeline_v1.0.py b/Python-runner/0_run_pipeline_v1.0.py index 6df66dd..7fb14b3 100644 --- a/Python-runner/0_run_pipeline_v1.0.py +++ b/Python-runner/0_run_pipeline_v1.0.py @@ -56,7 +56,7 @@ if _REQ_FILE.exists(): # Definice pipeline (step_id, label, executable filename) STEPS = [ ("1b", "Graph delta sync", "1b_parse_emails_graph_delta_v1.0.py"), - ("3", "Download attachments", "3_download_attachments_v1.4.py"), + ("3", "Download attachments", "3_download_attachments_v1.5.py"), ("4", "Unwrap S/MIME", "4_unwrap_smime_v1.0.py"), ("5", "Enrich fulltext (PG)", "5_enrich_fulltext_emails_v1.3.py"), ] diff --git a/Python-runner/3_download_attachments_v1.4.py b/Python-runner/3_download_attachments_v1.5.py similarity index 93% rename from Python-runner/3_download_attachments_v1.4.py rename to Python-runner/3_download_attachments_v1.5.py index 487852f..c38243b 100644 --- a/Python-runner/3_download_attachments_v1.4.py +++ b/Python-runner/3_download_attachments_v1.5.py @@ -1,14 +1,14 @@ """ -download_attachments_v1.4.py -Nazev: download_attachments_v1.4.py -Verze: 1.4 -Datum: 2026-06-04 +download_attachments_v1.5.py +Nazev: download_attachments_v1.5.py +Verze: 1.5 +Datum: 2026-06-13 Autor: vladimir.buzalka Popis: Stahuje skutecne prilohy (is_inline=False) vsech emailu z MongoDB pres Microsoft Graph API a uklada je do adresare - /mnt/Emails//Attachments/. + /mnt/Emails//Attachments/ a zaroven do SeaweedFS (Tower1). Bez argumentu --mailbox projede vsechny kolekce v `emaily` mimo NON_MAILBOX_COLLECTIONS a SKIP_MAILBOXES. @@ -20,7 +20,13 @@ Popis: Po ulozeni aktualizuje MongoDB: - v email dokumentu: kazda priloha dostane file_hash + local_path - - kolekce emaily.attachments_index: _id=hash, filename, ... + - kolekce emaily.attachments_index: _id=hash, filename, ..., + seaweed_path, seaweed_url, seaweed_synced_at (viz seaweed_store.py) + + SeaweedFS: nova priloha se krome disku nahraje i do Fileru na Tower1 + (/mail-attachments/ab/cd/, dedup dle obsahu, sdilene s mailstore + vetvi). Vypadek SeaweedFS pipeline neshodi — soubor i index se zapisou, + seaweed pole pak doplni seaweed_attachments_backfill_graph.py. NOVE v 1.4: - Spravne zpracovani vsech typu priloh: @@ -60,6 +66,9 @@ Historie verzi: 1.3 2026-06-02 Primarni stazeni pres graph_att_id; --mailbox volitelny 1.4 2026-06-04 itemAttachment/referenceAttachment handling; retry s backoffem; permanentni tagging chyb (attachment_missing / attachment_reference) + 1.5 2026-06-13 Nova priloha se zaroven nahrava do SeaweedFS (Tower1) pres + sdileny seaweed_store.py; index dostane seaweed_path/url/synced_at. + Vypadek SeaweedFS pipeline neshodi (fallback = backfill skript). """ import sys @@ -78,6 +87,9 @@ import msal import requests from pymongo import MongoClient, UpdateOne +sys.path.insert(0, str(Path(__file__).resolve().parent)) +import seaweed_store as sw + if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8", errors="replace") @@ -93,7 +105,7 @@ MONGO_COL_INDEX = "attachments_index" EMAILS_BASE_DIR = Path("/mnt/Emails") LOG_FILE = Path(__file__).parent / "parse_emails_errors.log" -SCRIPT_VERSION = "1.4" +SCRIPT_VERSION = "1.5" BATCH_SIZE = 50 # Typy příloh které přeskočíme (S/MIME podpisy, certifikáty) @@ -377,7 +389,7 @@ def save_attachment(content: bytes, original_name: str, mime_type: str, file_path = att_dir / filename file_path.write_bytes(content) - col_index.insert_one({ + doc = { "_id": hash_val, "filename": filename, "local_path": filename, @@ -386,7 +398,21 @@ def save_attachment(content: bytes, original_name: str, mime_type: str, "mailbox": mailbox, "first_seen_at": datetime.now(timezone.utc).replace(tzinfo=None), "ref_count": 1, - }) + } + + # Zaroven do SeaweedFS (dedup dle obsahu, sdilene s mailstore vetvi). + # Vypadek SeaweedFS NESMI shodit pipeline — soubor + index se zapisou vzdy, + # seaweed pole pak doplni seaweed_attachments_backfill_graph.py. + try: + path, url, _ = sw.store(hash_val, content, mime_type) + doc["seaweed_path"] = path + doc["seaweed_url"] = url + doc["seaweed_synced_at"] = datetime.now(timezone.utc).replace(tzinfo=None) + except Exception as e: + logging.warning("SeaweedFS upload selhal pro %s (%s): %s", + filename, hash_val[:12], e) + + col_index.insert_one(doc) return hash_val, filename, True @@ -405,13 +431,17 @@ def process_mailbox(client, mailbox: str, args) -> dict: col_emails = client[MONGO_DB][mongo_col] col_index = client[MONGO_DB][MONGO_COL_INDEX] + # source=mailstore zpravy maji vlastni stahovani priloh (mailstore_ingest); + # v zive schrance uz neexistuji, takze Graph fetch by jen selhal (nenalezeno). + # Tato pipeline je proto kompletne ignoruje. Graph zpravy pole `source` nemaji. if args.force_recheck: - query = {"has_attachments": True} + query = {"has_attachments": True, "source": {"$ne": "mailstore"}} else: # priloha "ke zpracovani" = neni inline, nema file_hash, neni oznacena # jako missing/reference query = { "has_attachments": True, + "source": {"$ne": "mailstore"}, "attachments": { "$elemMatch": { "is_inline": False, diff --git a/Python-runner/_deploy_seaweed.py b/Python-runner/_deploy_seaweed.py new file mode 100644 index 0000000..ee76a2b --- /dev/null +++ b/Python-runner/_deploy_seaweed.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""Deploy seaweed zmen do /mnt/user/Scripts na Unraidu (.76) pres SFTP.""" +import sys, paramiko +from pathlib import Path + +LOCAL = Path(r"U:\janssen\Python-runner") +REMOTE = "/mnt/user/Scripts" +FILES = [ + "seaweed_store.py", + "seaweed_attachments_backfill_graph.py", + "3_download_attachments_v1.5.py", + "0_run_pipeline_v1.0.py", +] + +c = paramiko.SSHClient() +c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) +c.connect("192.168.1.76", username="root", password="7309208104", timeout=10) +sftp = c.open_sftp() + +for f in FILES: + lp = LOCAL / f + rp = f"{REMOTE}/{f}" + sftp.put(str(lp), rp) + print(f"PUT {f} ({lp.stat().st_size} B)") + +# smaz stary v1.4 +old = f"{REMOTE}/3_download_attachments_v1.4.py" +try: + sftp.remove(old) + print("DEL 3_download_attachments_v1.4.py") +except IOError as e: + print(f"(v1.4 uz neni / {e})") + +# overeni +_, out, _ = c.exec_command(f"ls -la {REMOTE}/seaweed_store.py " + f"{REMOTE}/seaweed_attachments_backfill_graph.py " + f"{REMOTE}/3_download_attachments_v1.5.py") +print(out.read().decode()) +sftp.close(); c.close() diff --git a/Python-runner/seaweed_attachments_backfill_graph.py b/Python-runner/seaweed_attachments_backfill_graph.py new file mode 100644 index 0000000..8bad35f --- /dev/null +++ b/Python-runner/seaweed_attachments_backfill_graph.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +seaweed_attachments_backfill_graph.py +Jednorazovy backfill: stazene Graph prilohy z disku -> SeaweedFS na Tower1. + +Zdroj pravdy je kolekce emaily.attachments_index (dedup dle SHA-256 = _id). +Kazdy zaznam ukazuje na soubor /mnt/Emails//Attachments/. +Skript soubor nacte, overi hash, nahraje do SeaweedFS (idempotentne, dedup +dle obsahu — identicky obsah jiz nahrany mailstore vetvi se preskoci) a do +index dokumentu doplni: + seaweed_path, seaweed_url, seaweed_synced_at + +RESUME + BATCH: bere jen zaznamy bez `seaweed_synced_at`, kazdy zpracovany +oznaci (i pri chybe) -> vypadne z dotazu, kurzor nikdy nezije dlouho a beh +lze kdykoli prerusit a spustit znovu. + +Chybove stavy (taky dostanou seaweed_synced_at, aby nezacyklily resume): + seaweed_file_missing : True — soubor na disku nenalezen + seaweed_hash_mismatch: — obsah souboru ma jiny hash nez _id + (ulozeno pod skutecnym hashem obsahu) + seaweed_upload_error : — SeaweedFS PUT/spojeni selhalo + +Spousteni (v python-runner kontejneru na Toweru, kde je /mnt/Emails): + docker exec python-runner python /scripts/seaweed_attachments_backfill_graph.py + ... --dry-run # nic nezapise, jen spocita + ... --limit 500 # jen N zaznamu (test) + ... --retry-errors # znovu i zaznamy s seaweed_*_error/missing/mismatch +""" + +import sys +import time +import hashlib +import argparse +from pathlib import Path +from datetime import datetime, timezone + +from pymongo import MongoClient, UpdateOne + +sys.path.insert(0, str(Path(__file__).resolve().parent)) +import seaweed_store as sw + +if hasattr(sys.stdout, "reconfigure"): + sys.stdout.reconfigure(encoding="utf-8", errors="replace") + +MONGO_URI = "mongodb://192.168.1.76:27017" +MONGO_DB = "emaily" +MONGO_COL_INDEX = "attachments_index" +EMAILS_BASE_DIR = Path("/mnt/Emails") +BATCH = 500 + + +def sha256(b: bytes) -> str: + return hashlib.sha256(b).hexdigest() + + +def main() -> int: + ap = argparse.ArgumentParser(description="Backfill Graph priloh -> SeaweedFS") + ap.add_argument("--limit", type=int, default=0, + help="Zpracovat max N zaznamu (0 = vse)") + ap.add_argument("--dry-run", action="store_true", + help="Nic nezapisovat (do SeaweedFS ani do Mongo), jen report") + ap.add_argument("--retry-errors", action="store_true", + help="Znovu i zaznamy oznacene seaweed_file_missing / " + "seaweed_hash_mismatch / seaweed_upload_error") + args = ap.parse_args() + + client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) + client.admin.command("ping") + col = client[MONGO_DB][MONGO_COL_INDEX] + + if args.retry_errors: + # znovu vse co nema platnou seaweed cestu + base_query = {"seaweed_path": {"$exists": False}} + else: + base_query = {"seaweed_synced_at": {"$exists": False}} + + total_target = col.count_documents(base_query) + print(f"=== Backfill Graph priloh -> SeaweedFS ===") + print(f"Filer: {sw.SEAWEED_FILER}{sw.BASE_PATH}") + print(f"Zaznamu ke zpracovani: {total_target}" + f"{' (DRY-RUN)' if args.dry_run else ''}") + if total_target == 0: + print("Neni co delat.") + return 0 + + t0 = time.time() + done = uploaded = dedup = missing = mismatch = errors = 0 + + while True: + if args.limit and done >= args.limit: + break + take = BATCH + if args.limit: + take = min(BATCH, args.limit - done) + docs = list(col.find(base_query, + {"_id": 1, "mailbox": 1, "local_path": 1, + "mime_type": 1}).limit(take)) + if not docs: + break + + ops = [] + for d in docs: + done += 1 + _id = d["_id"] + mailbox = d.get("mailbox", "") + local = d.get("local_path", "") + mime = d.get("mime_type") or "application/octet-stream" + now = datetime.now(timezone.utc).replace(tzinfo=None) + + if not local: + missing += 1 + ops.append(UpdateOne({"_id": _id}, {"$set": { + "seaweed_file_missing": True, "seaweed_synced_at": now}})) + continue + + fpath = EMAILS_BASE_DIR / mailbox / "Attachments" / local + if not fpath.is_file(): + missing += 1 + print(f" MISS {mailbox}/{local}") + ops.append(UpdateOne({"_id": _id}, {"$set": { + "seaweed_file_missing": True, "seaweed_synced_at": now}})) + continue + + try: + data = fpath.read_bytes() + except OSError as e: + errors += 1 + print(f" ERR read {fpath}: {e}") + ops.append(UpdateOne({"_id": _id}, {"$set": { + "seaweed_upload_error": f"read: {e}", "seaweed_synced_at": now}})) + continue + + real_hash = sha256(data) + set_fields = {"seaweed_synced_at": now} + if real_hash != _id: + mismatch += 1 + set_fields["seaweed_hash_mismatch"] = _id # puvodni _id pro audit + # ulozime pod SKUTECNYM hashem obsahu (content-addressed je spravne) + + if args.dry_run: + continue + + try: + path, url, was_new = sw.store(real_hash, data, mime) + except Exception as e: + errors += 1 + print(f" ERR put {mailbox}/{local}: {e}") + ops.append(UpdateOne({"_id": _id}, {"$set": { + "seaweed_upload_error": str(e), "seaweed_synced_at": now}})) + continue + + if was_new: + uploaded += 1 + else: + dedup += 1 + set_fields["seaweed_path"] = path + set_fields["seaweed_url"] = url + # vycistit pripadne stare chybove vlajky pri uspechu + unset = {"seaweed_file_missing": "", "seaweed_upload_error": ""} + ops.append(UpdateOne({"_id": _id}, + {"$set": set_fields, "$unset": unset})) + + if ops and not args.dry_run: + col.bulk_write(ops, ordered=False) + + rate = done / max(time.time() - t0, 0.001) + print(f" {done}/{total_target} up={uploaded} dedup={dedup} " + f"miss={missing} mism={mismatch} err={errors} ({rate:.0f}/s)") + + if args.dry_run and not args.limit: + # v dry-run nic nemizi z dotazu -> jeden pruchod a konec + break + + dt = time.time() - t0 + print(f"\n=== HOTOVO za {dt/60:.1f} min ===") + print(f"zpracovano={done} nahrano={uploaded} dedup={dedup} " + f"chybi_soubor={missing} hash_mismatch={mismatch} chyby={errors}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/Python-runner/seaweed_store.py b/Python-runner/seaweed_store.py new file mode 100644 index 0000000..86ac250 --- /dev/null +++ b/Python-runner/seaweed_store.py @@ -0,0 +1,77 @@ +""" +seaweed_store.py +Sdileny helper pro ukladani priloh do SeaweedFS Fileru na Tower1 (192.168.1.50). + +Pouzivaji: + - 3_download_attachments_v1.5.py (Graph pipeline, dalsi behy) + - seaweed_attachments_backfill_graph.py (jednorazovy backfill jiz stazenych priloh) + - mailstore/mailstore_attachments_poc.py (mailstore vetev — stejne schema cesty) + +Schema cesty je content-addressed dle SHA-256 obsahu: + /mail-attachments/// +=> identicky obsah z libovolneho zdroje (Graph i mailstore) skonci na stejne +ceste a deduplikuje se globalne. + +Upload: PUT raw telo (POST multipart na prvni zapis do Fileru timeoutuje!). +""" + +import requests + +SEAWEED_FILER = "http://192.168.1.50:8888" +BASE_PATH = "/mail-attachments" +HTTP_TIMEOUT = 60 + +# Lazy modulova session — znovupouziti spojeni napric volanimi v ramci procesu. +_SESSION: requests.Session | None = None + + +def _session() -> requests.Session: + global _SESSION + if _SESSION is None: + _SESSION = requests.Session() + return _SESSION + + +def seaweed_path(sha256: str) -> str: + """Cesta deduplikovana podle obsahu: /mail-attachments/ab/cd/.""" + return f"{BASE_PATH}/{sha256[:2]}/{sha256[2:4]}/{sha256}" + + +def seaweed_url(sha256: str) -> str: + return SEAWEED_FILER + seaweed_path(sha256) + + +def exists(path: str, sess: requests.Session | None = None) -> bool: + sess = sess or _session() + try: + r = sess.head(SEAWEED_FILER + path, timeout=HTTP_TIMEOUT) + return r.status_code == 200 + except requests.RequestException: + return False + + +def put(path: str, data: bytes, mime: str, sess: requests.Session | None = None) -> bool: + sess = sess or _session() + r = sess.put(SEAWEED_FILER + path, data=data, + headers={"Content-Type": mime or "application/octet-stream"}, + timeout=HTTP_TIMEOUT) + return r.status_code in (200, 201) + + +def store(sha256: str, data: bytes, mime: str, + sess: requests.Session | None = None) -> tuple[str, str, bool]: + """Ulozi obsah do SeaweedFS (idempotentne, dedup dle hashe). + + Vraci (path, url, uploaded): + uploaded=True pokud byl objekt nove nahran + uploaded=False pokud uz na ceste existoval (dedup hit) + Vyhazuje requests.RequestException / RuntimeError pri selhani zapisu — + volajici si osetri (pipeline nesmi spadnout, jen preskoci seaweed pole). + """ + sess = sess or _session() + path = seaweed_path(sha256) + if exists(path, sess): + return path, SEAWEED_FILER + path, False + if not put(path, data, mime, sess): + raise RuntimeError(f"SeaweedFS PUT selhal pro {path}") + return path, SEAWEED_FILER + path, True diff --git a/claude-memory/project_seaweedfs.md b/claude-memory/project_seaweedfs.md index d66c8e4..976f2ca 100644 --- a/claude-memory/project_seaweedfs.md +++ b/claude-memory/project_seaweedfs.md @@ -9,7 +9,14 @@ metadata: SeaweedFS „na hraní" na Tower1 Unraid (192.168.1.50, root, heslo Vlado7309208104++; SSH jen heslem, klíč nejde). Postaveno 2026-06-12. -- Kontejner `seaweedfs` (chrislusf/seaweedfs, host network), `weed server -filer -s3`: master :9333, volume :8080, filer :8888, S3 :8333 (anonymní). Limit 1 GB/volume, max 50. +- Kontejner `seaweedfs` (chrislusf/seaweedfs, host network), `weed server -filer -s3`: master :9333, volume :8080, filer :8888, S3 :8333 (anonymní). **Limit volume = 30 GB** (ne 1 GB — dřívější údaj v Triliu byl chybný; ověřeno z `/dir/status` pole Version="30GB"), max 50 slotů → efektivní strop ~1,5 TB, pod tím array ~11,5 TB volných. Kapacita není omezení. +- **Upload do Fileru: `PUT` raw tělo** (POST multipart na první zápis timeoutuje!). `requests` i v python-runner kontejneru. Dedup podle SHA-256 obsahu. Cesta `/mail-attachments/ab/cd/` (content-addressed, ab=hash[:2], cd=hash[2:4]). +- **Sdílený modul `seaweed_store.py`** (`/scripts/seaweed_store.py` na Unraidu .76) — jednotné schéma cesty/URL/PUT pro obě větve příloh, takže identický obsah z Graphu i mailstore skončí na téže cestě a globálně se dedupuje. Funkce `store(sha256, data, mime) -> (path, url, uploaded)`. +- **Tři větve příloh → jeden blob store** (globální dedup přes SHA-256, vše přes `seaweed_store.py`): + 1. mailstore: `mailstore/mailstore_attachments_poc.py` (běží z toweru .76), zapisuje pole `seaweed_attachments[]` do mail dokumentů v Mongo `emaily`. + 2. Graph pipeline: `3_download_attachments_v1.5.py` při uložení nové přílohy zároveň pushne do SeaweedFS; do `emaily.attachments_index` (dedup dle SHA-256=_id) zapíše `seaweed_path/seaweed_url/seaweed_synced_at`. Jednorázový backfill: `seaweed_attachments_backfill_graph.py` (čte `/mnt/Emails//Attachments/`). + 3. JNJ pipeline (vbuzalka@its.jnj.com, viz [[python-runner]]/[[graph-email-import]]): `EmailsImport/jnj_tower_ingest_v1.3.py` při parse `.msg` z `/mnt/JNJEMAILS` nahraje binárku přílohy do SeaweedFS, do `attachments[]` v mail doc zapíše `sha256/seaweed_path/seaweed_url` + doc-level `seaweed_synced_at`. Jednorázový backfill: `EmailsImport/seaweed_attachments_backfill_jnj.py` (znovuotevře `.msg` přes importlib reuse `open_message`+`extract_attachments`). + Disk/.msg zůstávají paralelně. Viz [[project-mailstore]] a [[project-python-runner]]. - Bloby: share `SeaweedFS` → `/mnt/user/SeaweedFS/data`, array disk9–11 pod paritou (useCache=no). Filer leveldb + master raft: `/mnt/cache2tb0225dec/appdata/seaweedfs/` (SSD, bez parity). - Noční backup filer metadat: cron 03:30 na Tower1 (`/boot/config/plugins/dynamix/seaweedfs-backup.cron`) spouští `/boot/config/scripts/seaweedfs_meta_backup.sh` — `fs.meta.save` přes weed shell → scp na tower 192.168.1.76 do `/mnt/user/Backup/Critical/SeaweedFS/daily/` (drží 7), neděle navíc `weekly/` (drží 4). Log `/var/log/seaweedfs_backup.log`. Restore: `fs.meta.load`. - Tower (.76, root heslo 7309208104) a Tower1 mají vyměněné SSH klíče — z Tower1 na tower jde psát přímo. diff --git a/mailstore/mailstore_attachments_poc.py b/mailstore/mailstore_attachments_poc.py new file mode 100644 index 0000000..4842209 --- /dev/null +++ b/mailstore/mailstore_attachments_poc.py @@ -0,0 +1,454 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +mailstore_attachments_poc.py — PROOF OF CONCEPT + +Ulozi PRILOHY mailstore mailu do SeaweedFS (Tower1, 192.168.1.50) pres Filer +HTTP API. Deduplikace podle SHA-256 obsahu (stejny soubor = stejna cesta = 1 kopie). + +Tok: + Mongo (source=mailstore, has_attachments) --> IMAP re-fetch RFC822 (podle + mailstore_uid + mailstore_folder) --> extrakce priloh --> PUT do Seaweedu. + Volitelne (--write-back) zapise do Mongo k mailu pole `seaweed_attachments` + (filename, sha256, seaweed_path/url) - NEDESTRUKTIVNE, vedle puvodnich metadat. + +SeaweedFS (z Trilium "2026-06-13 SeaweedFS na Tower1"): + - Filer: http://192.168.1.50:8888 (anonymni, bez auth - PoC v interni siti) + - bloby na array pod paritou; LIMIT 1 GB/volume, max 50 volumu (~50 GB). + - upload = PUT raw telo (POST multipart na prvni zapis casto timeoutuje). + +Pouziti: + python mailstore_attachments_poc.py vladimir.buzalka@buzalka.cz --limit 25 + python mailstore_attachments_poc.py vladimir.buzalka@buzalka.cz --limit 25 --write-back + python mailstore_attachments_poc.py vladimir.buzalka@buzalka.cz --limit 5 --dry-run + +Graphova vetev se NEMENI (ta uklada na /mnt/Emails). Tohle je samostatne PoC +nad SeaweedFS. +""" +from __future__ import annotations + +import argparse +import base64 as _b64 +import email +import hashlib +import imaplib +import ssl +import sys +import re +import time +from datetime import datetime, timezone +from email.header import decode_header + +import requests +from pymongo import MongoClient + +# --- config ------------------------------------------------------------------ +MS_HOST = "192.168.1.53" +IMAP_PORT = 143 +MS_USER = "admin" +MS_PASS = "*$N(B)vMUym!%" + +MONGO_URI = "mongodb://192.168.1.76:27017" +MONGO_DB = "emaily" + +SEAWEED_FILER = "http://192.168.1.50:8888" +BASE_PATH = "/mail-attachments" # koren v SeaweedFS Fileru +HTTP_TIMEOUT = 60 + + +# --- helpery (prevzato z mailstore_ingest_v1.0.py) --------------------------- +def _safe_decode(b: bytes, enc) -> str: + for e in (enc, "utf-8", "latin-1"): + if not e: + continue + try: + return b.decode(e, errors="replace") + except (LookupError, TypeError): + continue + return b.decode("utf-8", errors="replace") + + +def dec(s) -> str: + if not s: + return "" + out = [] + for txt, enc in decode_header(s): + out.append(_safe_decode(txt, enc) if isinstance(txt, bytes) else txt) + return "".join(out).replace("\r", " ").replace("\n", " ").strip() + + +def encode_mutf7(s: str) -> str: + """Nazev IMAP slozky -> modified UTF-7 (RFC 3501).""" + res = [] + i, n = 0, len(s) + while i < n: + ch = s[i] + o = ord(ch) + if 0x20 <= o <= 0x7e: + res.append("&-" if ch == "&" else ch) + i += 1 + else: + j = i + while j < n and not (0x20 <= ord(s[j]) <= 0x7e): + j += 1 + b = s[i:j].encode("utf-16-be") + enc = _b64.b64encode(b).decode("ascii").rstrip("=").replace("/", ",") + res.append("&" + enc + "-") + i = j + return "".join(res) + + +def imap_connect(retries: int = 6, delay: float = 5.0) -> imaplib.IMAP4: + last = None + for attempt in range(1, retries + 1): + try: + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + M = imaplib.IMAP4(MS_HOST, IMAP_PORT) + M.starttls(ssl_context=ctx) + M.login(MS_USER, MS_PASS) + return M + except (imaplib.IMAP4.abort, imaplib.IMAP4.error, OSError) as ex: + last = ex + print(f" ! imap_connect {attempt}/{retries} selhal: {ex} -> cekam {delay:.0f}s", + flush=True) + time.sleep(delay) + raise last + + +def imap_select(M: imaplib.IMAP4, folder: str): + return M.select(f'"{encode_mutf7(folder)}"', readonly=True) + + +def fetch_eml_by_seq(M: imaplib.IMAP4, folder: str, seq: int, + expect_mid: str | None = None) -> bytes | None: + """Stahne syrovy RFC822 zpravy podle SEKVENCNIHO cisla (mailstore_uid drzi + ve skutecnosti sequence, ne IMAP UID; UID nejsou po restartech MailStore + stabilni a jsou derave). Archiv je append-only -> seq stabilni. Pro jistotu + overi Message-ID proti ulozenemu _id (chrani pred pripadnym driftem).""" + typ, _ = imap_select(M, folder) + if typ != "OK": + return None + typ, data = M.fetch(str(seq), "(RFC822)") + if typ != "OK" or not data or not isinstance(data[0], tuple): + return None + raw = data[0][1] + if expect_mid: + got = (email.message_from_bytes(raw).get("Message-ID") or "").strip() + if got != expect_mid: + return None + return raw + + +_SEQ_RX = re.compile(rb"^(\d+)\s") + + +def scan_folder_midmap(M: imaplib.IMAP4, folder: str) -> dict: + """Naskenuje slozku a vrati mapu {Message-ID -> aktualni sekvencni cislo}. + Pouziva se pro slozky, kde ulozene mailstore_uid (seq) driftlo (zive slozky + Sent Items / Odeslana posta dostavaly postu behem ingestu).""" + typ, data = imap_select(M, folder) + if typ != "OK": + return {} + total = int(data[0]) if data and data[0] else 0 + midmap = {} + lo, BATCH = 1, 2000 + while lo <= total: + hi = min(lo + BATCH - 1, total) + typ, msgs = M.fetch(f"{lo}:{hi}", "(BODY.PEEK[HEADER.FIELDS (MESSAGE-ID)])") + for it in msgs: + if not isinstance(it, tuple): + continue + meta, hdr = it[0], it[1] + mseq = _SEQ_RX.match(meta or b"") + mid = (email.message_from_bytes(hdr or b"").get("Message-ID") or "").strip() + if mseq and mid: + midmap[mid] = int(mseq.group(1)) + lo = hi + 1 + return midmap + + +def fetch_raw_by_seq(M: imaplib.IMAP4, seq: int) -> bytes | None: + """Stahne RFC822 podle sekvence (slozka uz musi byt SELECTnuta).""" + typ, data = M.fetch(str(seq), "(RFC822)") + if typ != "OK" or not data or not isinstance(data[0], tuple): + return None + return data[0][1] + + +def extract_attachments(raw: bytes) -> list[dict]: + """Vrati seznam priloh: {filename, mime_type, is_inline, content(bytes)}.""" + msg = email.message_from_bytes(raw) + atts = [] + for part in msg.walk(): + if part.is_multipart(): + continue + ct = part.get_content_type() + disp = str(part.get("Content-Disposition") or "") + fname = part.get_filename() + is_att = "attachment" in disp or (fname and ct not in ("text/plain", "text/html")) + if not is_att: + continue + payload = part.get_payload(decode=True) + if not payload: + continue + atts.append({ + "filename": dec(fname) or "(bez nazvu)", + "mime_type": ct, + "is_inline": "inline" in disp, + "content": payload, + }) + return atts + + +# --- SeaweedFS Filer --------------------------------------------------------- +def seaweed_path(sha256: str) -> str: + """Cesta deduplikovana podle obsahu: /mail-attachments/ab/cd/.""" + return f"{BASE_PATH}/{sha256[:2]}/{sha256[2:4]}/{sha256}" + + +def seaweed_exists(sess: requests.Session, path: str) -> bool: + try: + r = sess.head(SEAWEED_FILER + path, timeout=HTTP_TIMEOUT) + return r.status_code == 200 + except requests.RequestException: + return False + + +def seaweed_put(sess: requests.Session, path: str, data: bytes, mime: str) -> bool: + r = sess.put(SEAWEED_FILER + path, data=data, + headers={"Content-Type": mime or "application/octet-stream"}, + timeout=HTTP_TIMEOUT) + return r.status_code in (200, 201) + + +# --- main -------------------------------------------------------------------- +def main() -> int: + ap = argparse.ArgumentParser(description="PoC: mailstore prilohy -> SeaweedFS") + ap.add_argument("mailbox", help="Schranka = Mongo kolekce") + ap.add_argument("--limit", type=int, default=0, + help="Kolik MAILU s prilohami zpracovat (0 = VSECHNY)") + ap.add_argument("--write-back", action="store_true", + help="Zapis do Mongo pole `seaweed_attachments` (nedestruktivne). " + "Zaroven zapne RESUME (preskoci uz hotove maily).") + ap.add_argument("--dry-run", action="store_true", + help="Nic neukladej, jen ukaz co by se stalo") + ap.add_argument("--remap-failed", action="store_true", + help="Dofetchne maily oznacene seaweed_fetch_failed pres mapu " + "Message-ID->seq (pro slozky s driftlou sekvenci, napr. Sent Items).") + ap.add_argument("--log-file", default=None, + help="Presmeruj vystup do souboru (line-buffered) - pro dlouhy beh " + "na serveru, prezije pad spojeni.") + args = ap.parse_args() + + if args.log_file: + _f = open(args.log_file, "a", buffering=1, encoding="utf-8") + sys.stdout = _f + sys.stderr = _f + + t0 = time.time() + print(f"=== mailstore prilohy -> SeaweedFS (PoC) | {args.mailbox} ===") + print(f"Filer: {SEAWEED_FILER}{BASE_PATH} | limit mailu: {args.limit}" + f"{' [DRY-RUN]' if args.dry_run else ''}" + f"{' [WRITE-BACK]' if args.write_back else ''}") + + mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) + mongo.admin.command("ping") + coll = mongo[MONGO_DB][args.mailbox] + + # resumable = davkovy samostranlujici rezim (imunni vuci timeoutu kurzoru): + # kazdy zpracovany mail dostane marker seaweed_synced_at -> prestane odpovidat + # dotazu -> dalsi davka vrati dalsi neudelane. Resume po preruseni zdarma. + resumable = args.write_back and not args.dry_run + BATCH = 500 + q = {"source": "mailstore", "has_attachments": True} + if resumable: + q["seaweed_synced_at"] = {"$exists": False} + proj = {"mailstore_uid": 1, "mailstore_folder": 1, "subject": 1} + total_q = coll.count_documents(q) + print(f"Mailu s prilohami ke zpracovani: {total_q:,}" + f"{' (limit ' + str(args.limit) + ')' if args.limit else ''}", flush=True) + + # per-prilohovy vypis jen u malych (PoC) behu; velky beh = jen progress + chyby + verbose = bool(args.limit and args.limit <= 100) + + sess = requests.Session() + M = imap_connect() + + n_mail = n_att = n_up = n_dedup = n_err = 0 + bytes_up = 0 + + def upload_raw(raw, subj): + """Z EML vytahne prilohy a nahraje do SeaweedFS (dedup). Vraci enriched[].""" + nonlocal n_att, n_up, n_dedup, n_err, bytes_up + enriched = [] + for a in extract_attachments(raw): + n_att += 1 + h = hashlib.sha256(a["content"]).hexdigest() + path = seaweed_path(h) + size = len(a["content"]) + status = "?" + try: + if seaweed_exists(sess, path): + n_dedup += 1 + status = "dedup" + elif args.dry_run: + status = "by-upload" + elif seaweed_put(sess, path, a["content"], a["mime_type"]): + n_up += 1 + bytes_up += size + status = "upload" + else: + n_err += 1 + status = "ERR-put" + except requests.RequestException as ex: + n_err += 1 + status = f"ERR:{type(ex).__name__}" + if verbose or status.startswith("ERR"): + print(f" [{subj:50}] {status:9} {size:>8}B {a['filename'][:40]}", flush=True) + enriched.append({ + "filename": a["filename"], + "mime_type": a["mime_type"], + "is_inline": a["is_inline"], + "size_bytes": size, + "sha256": h, + "seaweed_path": path, + "seaweed_url": SEAWEED_FILER + path, + }) + return enriched + + def handle_doc(d, M): + """Stahne EML, nahraje prilohy. Vraci (M, failed, enriched).""" + nonlocal n_att, n_up, n_dedup, n_err, bytes_up + uid = d.get("mailstore_uid") + folder = d.get("mailstore_folder") + mid = d.get("_id") + subj = (d.get("subject") or "")[:50] + if not uid or not folder: + return M, True, [] + try: + raw = fetch_eml_by_seq(M, folder, int(uid), mid) + except imaplib.IMAP4.abort: + # spojeni umrelo -> prepoj a zkus jeste jednou (nikdy nespadni) + try: + M.logout() + except Exception: + pass + try: + M = imap_connect() + raw = fetch_eml_by_seq(M, folder, int(uid), mid) + except Exception as ex: + n_err += 1 + print(f" [seq={uid}] fetch selhal i po reconnectu: {type(ex).__name__}", flush=True) + return M, True, [] + if not raw: + print(f" [seq={uid}] EML se nepodarilo stahnout / Message-ID nesedi", flush=True) + return M, True, [] + return M, False, upload_raw(raw, subj) + + def progress(): + el = time.time() - t0 + print(f" ... {n_mail:,} mailu | {n_up:,} nahrano / {n_dedup:,} dedup / " + f"{n_err} chyb | {bytes_up/1024/1024:.0f} MB | {el/60:.1f} min", flush=True) + + if args.remap_failed: + # Dofetch maily se seaweed_fetch_failed: pro kazdou slozku naskenuj mapu + # Message-ID->aktualni seq, pak fetchni spravnou sekvenci a nahraj prilohy. + fq = {"source": "mailstore", "seaweed_fetch_failed": True} + folders = coll.distinct("mailstore_folder", fq) + print(f"Slozek s failed maily: {len(folders)}", flush=True) + for folder in folders: + cnt = coll.count_documents({**fq, "mailstore_folder": folder}) + print(f"== {folder.split('/')[-1]} ({cnt} mailu) - skenuji mapu Message-ID...", flush=True) + try: + midmap = scan_folder_midmap(M, folder) + except imaplib.IMAP4.abort: + try: + M.logout() + except Exception: + pass + M = imap_connect() + midmap = scan_folder_midmap(M, folder) + print(f" mapa: {len(midmap)} zprav", flush=True) + imap_select(M, folder) # zustan ve slozce pro fetch + for d in coll.find({**fq, "mailstore_folder": folder}, + {"subject": 1}): + n_mail += 1 + mid = d["_id"] + seq = midmap.get(mid) + subj = (d.get("subject") or "")[:50] + if not seq: + # opravdu nenalezeno -> oznac jako definitivne nedohledatelne + coll.update_one({"_id": mid}, + {"$set": {"seaweed_unresolved": True}}) + continue + try: + raw = fetch_raw_by_seq(M, seq) + except imaplib.IMAP4.abort: + try: + M.logout() + except Exception: + pass + M = imap_connect() + imap_select(M, folder) + raw = fetch_raw_by_seq(M, seq) + if not raw: + coll.update_one({"_id": mid}, {"$set": {"seaweed_unresolved": True}}) + continue + enriched = upload_raw(raw, subj) + coll.update_one({"_id": mid}, + {"$set": {"seaweed_attachments": enriched, + "seaweed_synced_at": datetime.now(timezone.utc)}, + "$unset": {"seaweed_fetch_failed": ""}}) + if n_mail % 200 == 0: + progress() + elif resumable: + # davkove: kazdy zpracovany mail dostane seaweed_synced_at (i pri prazdne/ + # chybne priloze -> resume vzdy konverguje, zadne zacykleni) + while True: + batch = list(coll.find(q, proj).limit(BATCH)) + if not batch: + break + for d in batch: + M, failed, enriched = handle_doc(d, M) + n_mail += 1 + set_fields = {"seaweed_synced_at": datetime.now(timezone.utc)} + if failed: + set_fields["seaweed_fetch_failed"] = True + else: + set_fields["seaweed_attachments"] = enriched + coll.update_one({"_id": d["_id"]}, {"$set": set_fields}) + if n_mail % 200 == 0: + progress() + if args.limit and n_mail >= args.limit: + break + if args.limit and n_mail >= args.limit: + break + else: + cur = coll.find(q, proj) + if args.limit: + cur = cur.limit(args.limit) + for d in cur: + M, _failed, _enriched = handle_doc(d, M) + n_mail += 1 + if n_mail % 200 == 0: + progress() + + try: + M.logout() + except Exception: + pass + + print("-" * 64) + print(f"Mailu zpracovano: {n_mail}") + print(f"Priloh celkem: {n_att}") + print(f"Nahrano novych: {n_up} ({bytes_up/1024/1024:.1f} MB)") + print(f"Deduplikovano: {n_dedup} (uz v SeaweedFS)") + print(f"Chyb: {n_err}") + print(f"Trvalo: {time.time()-t0:.1f}s") + return 0 + + +if __name__ == "__main__": + sys.exit(main())