#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ mailstore_attachments_poc.py — PROOF OF CONCEPT Ulozi PRILOHY mailstore mailu do SeaweedFS (Tower1, 192.168.1.50) pres Filer HTTP API. Deduplikace podle SHA-256 obsahu (stejny soubor = stejna cesta = 1 kopie). Tok: Mongo (source=mailstore, has_attachments) --> IMAP re-fetch RFC822 (podle mailstore_uid + mailstore_folder) --> extrakce priloh --> PUT do Seaweedu. Volitelne (--write-back) zapise do Mongo k mailu pole `seaweed_attachments` (filename, sha256, seaweed_path/url) - NEDESTRUKTIVNE, vedle puvodnich metadat. SeaweedFS (z Trilium "2026-06-13 SeaweedFS na Tower1"): - Filer: http://192.168.1.50:8888 (anonymni, bez auth - PoC v interni siti) - bloby na array pod paritou; LIMIT 1 GB/volume, max 50 volumu (~50 GB). - upload = PUT raw telo (POST multipart na prvni zapis casto timeoutuje). Pouziti: python mailstore_attachments_poc.py vladimir.buzalka@buzalka.cz --limit 25 python mailstore_attachments_poc.py vladimir.buzalka@buzalka.cz --limit 25 --write-back python mailstore_attachments_poc.py vladimir.buzalka@buzalka.cz --limit 5 --dry-run Graphova vetev se NEMENI (ta uklada na /mnt/Emails). Tohle je samostatne PoC nad SeaweedFS. """ from __future__ import annotations import argparse import base64 as _b64 import email import hashlib import imaplib import ssl import sys import re import time from datetime import datetime, timezone from email.header import decode_header import requests from pymongo import MongoClient # --- config ------------------------------------------------------------------ MS_HOST = "192.168.1.53" IMAP_PORT = 143 MS_USER = "admin" MS_PASS = "*$N(B)vMUym!%" MONGO_URI = "mongodb://192.168.1.76:27017" MONGO_DB = "emaily" SEAWEED_FILER = "http://192.168.1.50:8888" BASE_PATH = "/mail-attachments" # koren v SeaweedFS Fileru HTTP_TIMEOUT = 60 # --- helpery (prevzato z mailstore_ingest_v1.0.py) --------------------------- def _safe_decode(b: bytes, enc) -> str: for e in (enc, "utf-8", "latin-1"): if not e: continue try: return b.decode(e, errors="replace") except (LookupError, TypeError): continue return b.decode("utf-8", errors="replace") def dec(s) -> str: if not s: return "" out = [] for txt, enc in decode_header(s): out.append(_safe_decode(txt, enc) if isinstance(txt, bytes) else txt) return "".join(out).replace("\r", " ").replace("\n", " ").strip() def encode_mutf7(s: str) -> str: """Nazev IMAP slozky -> modified UTF-7 (RFC 3501).""" res = [] i, n = 0, len(s) while i < n: ch = s[i] o = ord(ch) if 0x20 <= o <= 0x7e: res.append("&-" if ch == "&" else ch) i += 1 else: j = i while j < n and not (0x20 <= ord(s[j]) <= 0x7e): j += 1 b = s[i:j].encode("utf-16-be") enc = _b64.b64encode(b).decode("ascii").rstrip("=").replace("/", ",") res.append("&" + enc + "-") i = j return "".join(res) def imap_connect(retries: int = 6, delay: float = 5.0) -> imaplib.IMAP4: last = None for attempt in range(1, retries + 1): try: ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE M = imaplib.IMAP4(MS_HOST, IMAP_PORT) M.starttls(ssl_context=ctx) M.login(MS_USER, MS_PASS) return M except (imaplib.IMAP4.abort, imaplib.IMAP4.error, OSError) as ex: last = ex print(f" ! imap_connect {attempt}/{retries} selhal: {ex} -> cekam {delay:.0f}s", flush=True) time.sleep(delay) raise last def imap_select(M: imaplib.IMAP4, folder: str): return M.select(f'"{encode_mutf7(folder)}"', readonly=True) def fetch_eml_by_seq(M: imaplib.IMAP4, folder: str, seq: int, expect_mid: str | None = None) -> bytes | None: """Stahne syrovy RFC822 zpravy podle SEKVENCNIHO cisla (mailstore_uid drzi ve skutecnosti sequence, ne IMAP UID; UID nejsou po restartech MailStore stabilni a jsou derave). Archiv je append-only -> seq stabilni. Pro jistotu overi Message-ID proti ulozenemu _id (chrani pred pripadnym driftem).""" typ, _ = imap_select(M, folder) if typ != "OK": return None typ, data = M.fetch(str(seq), "(RFC822)") if typ != "OK" or not data or not isinstance(data[0], tuple): return None raw = data[0][1] if expect_mid: got = (email.message_from_bytes(raw).get("Message-ID") or "").strip() if got != expect_mid: return None return raw _SEQ_RX = re.compile(rb"^(\d+)\s") def scan_folder_midmap(M: imaplib.IMAP4, folder: str) -> dict: """Naskenuje slozku a vrati mapu {Message-ID -> aktualni sekvencni cislo}. Pouziva se pro slozky, kde ulozene mailstore_uid (seq) driftlo (zive slozky Sent Items / Odeslana posta dostavaly postu behem ingestu).""" typ, data = imap_select(M, folder) if typ != "OK": return {} total = int(data[0]) if data and data[0] else 0 midmap = {} lo, BATCH = 1, 2000 while lo <= total: hi = min(lo + BATCH - 1, total) typ, msgs = M.fetch(f"{lo}:{hi}", "(BODY.PEEK[HEADER.FIELDS (MESSAGE-ID)])") for it in msgs: if not isinstance(it, tuple): continue meta, hdr = it[0], it[1] mseq = _SEQ_RX.match(meta or b"") mid = (email.message_from_bytes(hdr or b"").get("Message-ID") or "").strip() if mseq and mid: midmap[mid] = int(mseq.group(1)) lo = hi + 1 return midmap def fetch_raw_by_seq(M: imaplib.IMAP4, seq: int) -> bytes | None: """Stahne RFC822 podle sekvence (slozka uz musi byt SELECTnuta).""" typ, data = M.fetch(str(seq), "(RFC822)") if typ != "OK" or not data or not isinstance(data[0], tuple): return None return data[0][1] def extract_attachments(raw: bytes) -> list[dict]: """Vrati seznam priloh: {filename, mime_type, is_inline, content(bytes)}.""" msg = email.message_from_bytes(raw) atts = [] for part in msg.walk(): if part.is_multipart(): continue ct = part.get_content_type() disp = str(part.get("Content-Disposition") or "") fname = part.get_filename() is_att = "attachment" in disp or (fname and ct not in ("text/plain", "text/html")) if not is_att: continue payload = part.get_payload(decode=True) if not payload: continue atts.append({ "filename": dec(fname) or "(bez nazvu)", "mime_type": ct, "is_inline": "inline" in disp, "content": payload, }) return atts # --- SeaweedFS Filer --------------------------------------------------------- def seaweed_path(sha256: str) -> str: """Cesta deduplikovana podle obsahu: /mail-attachments/ab/cd/.""" return f"{BASE_PATH}/{sha256[:2]}/{sha256[2:4]}/{sha256}" def seaweed_exists(sess: requests.Session, path: str) -> bool: try: r = sess.head(SEAWEED_FILER + path, timeout=HTTP_TIMEOUT) return r.status_code == 200 except requests.RequestException: return False def seaweed_put(sess: requests.Session, path: str, data: bytes, mime: str) -> bool: r = sess.put(SEAWEED_FILER + path, data=data, headers={"Content-Type": mime or "application/octet-stream"}, timeout=HTTP_TIMEOUT) return r.status_code in (200, 201) # --- main -------------------------------------------------------------------- def main() -> int: ap = argparse.ArgumentParser(description="PoC: mailstore prilohy -> SeaweedFS") ap.add_argument("mailbox", help="Schranka = Mongo kolekce") ap.add_argument("--limit", type=int, default=0, help="Kolik MAILU s prilohami zpracovat (0 = VSECHNY)") ap.add_argument("--write-back", action="store_true", help="Zapis do Mongo pole `seaweed_attachments` (nedestruktivne). " "Zaroven zapne RESUME (preskoci uz hotove maily).") ap.add_argument("--dry-run", action="store_true", help="Nic neukladej, jen ukaz co by se stalo") ap.add_argument("--remap-failed", action="store_true", help="Dofetchne maily oznacene seaweed_fetch_failed pres mapu " "Message-ID->seq (pro slozky s driftlou sekvenci, napr. Sent Items).") ap.add_argument("--log-file", default=None, help="Presmeruj vystup do souboru (line-buffered) - pro dlouhy beh " "na serveru, prezije pad spojeni.") args = ap.parse_args() if args.log_file: _f = open(args.log_file, "a", buffering=1, encoding="utf-8") sys.stdout = _f sys.stderr = _f t0 = time.time() print(f"=== mailstore prilohy -> SeaweedFS (PoC) | {args.mailbox} ===") print(f"Filer: {SEAWEED_FILER}{BASE_PATH} | limit mailu: {args.limit}" f"{' [DRY-RUN]' if args.dry_run else ''}" f"{' [WRITE-BACK]' if args.write_back else ''}") mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) mongo.admin.command("ping") coll = mongo[MONGO_DB][args.mailbox] # resumable = davkovy samostranlujici rezim (imunni vuci timeoutu kurzoru): # kazdy zpracovany mail dostane marker seaweed_synced_at -> prestane odpovidat # dotazu -> dalsi davka vrati dalsi neudelane. Resume po preruseni zdarma. resumable = args.write_back and not args.dry_run BATCH = 500 q = {"source": "mailstore", "has_attachments": True} if resumable: q["seaweed_synced_at"] = {"$exists": False} proj = {"mailstore_uid": 1, "mailstore_folder": 1, "subject": 1} total_q = coll.count_documents(q) print(f"Mailu s prilohami ke zpracovani: {total_q:,}" f"{' (limit ' + str(args.limit) + ')' if args.limit else ''}", flush=True) # per-prilohovy vypis jen u malych (PoC) behu; velky beh = jen progress + chyby verbose = bool(args.limit and args.limit <= 100) sess = requests.Session() M = imap_connect() n_mail = n_att = n_up = n_dedup = n_err = 0 bytes_up = 0 def upload_raw(raw, subj): """Z EML vytahne prilohy a nahraje do SeaweedFS (dedup). Vraci enriched[].""" nonlocal n_att, n_up, n_dedup, n_err, bytes_up enriched = [] for a in extract_attachments(raw): n_att += 1 h = hashlib.sha256(a["content"]).hexdigest() path = seaweed_path(h) size = len(a["content"]) status = "?" try: if seaweed_exists(sess, path): n_dedup += 1 status = "dedup" elif args.dry_run: status = "by-upload" elif seaweed_put(sess, path, a["content"], a["mime_type"]): n_up += 1 bytes_up += size status = "upload" else: n_err += 1 status = "ERR-put" except requests.RequestException as ex: n_err += 1 status = f"ERR:{type(ex).__name__}" if verbose or status.startswith("ERR"): print(f" [{subj:50}] {status:9} {size:>8}B {a['filename'][:40]}", flush=True) enriched.append({ "filename": a["filename"], "mime_type": a["mime_type"], "is_inline": a["is_inline"], "size_bytes": size, "sha256": h, "seaweed_path": path, "seaweed_url": SEAWEED_FILER + path, }) return enriched def handle_doc(d, M): """Stahne EML, nahraje prilohy. Vraci (M, failed, enriched).""" nonlocal n_att, n_up, n_dedup, n_err, bytes_up uid = d.get("mailstore_uid") folder = d.get("mailstore_folder") mid = d.get("_id") subj = (d.get("subject") or "")[:50] if not uid or not folder: return M, True, [] try: raw = fetch_eml_by_seq(M, folder, int(uid), mid) except imaplib.IMAP4.abort: # spojeni umrelo -> prepoj a zkus jeste jednou (nikdy nespadni) try: M.logout() except Exception: pass try: M = imap_connect() raw = fetch_eml_by_seq(M, folder, int(uid), mid) except Exception as ex: n_err += 1 print(f" [seq={uid}] fetch selhal i po reconnectu: {type(ex).__name__}", flush=True) return M, True, [] if not raw: print(f" [seq={uid}] EML se nepodarilo stahnout / Message-ID nesedi", flush=True) return M, True, [] return M, False, upload_raw(raw, subj) def progress(): el = time.time() - t0 print(f" ... {n_mail:,} mailu | {n_up:,} nahrano / {n_dedup:,} dedup / " f"{n_err} chyb | {bytes_up/1024/1024:.0f} MB | {el/60:.1f} min", flush=True) if args.remap_failed: # Dofetch maily se seaweed_fetch_failed: pro kazdou slozku naskenuj mapu # Message-ID->aktualni seq, pak fetchni spravnou sekvenci a nahraj prilohy. fq = {"source": "mailstore", "seaweed_fetch_failed": True} folders = coll.distinct("mailstore_folder", fq) print(f"Slozek s failed maily: {len(folders)}", flush=True) for folder in folders: cnt = coll.count_documents({**fq, "mailstore_folder": folder}) print(f"== {folder.split('/')[-1]} ({cnt} mailu) - skenuji mapu Message-ID...", flush=True) try: midmap = scan_folder_midmap(M, folder) except imaplib.IMAP4.abort: try: M.logout() except Exception: pass M = imap_connect() midmap = scan_folder_midmap(M, folder) print(f" mapa: {len(midmap)} zprav", flush=True) imap_select(M, folder) # zustan ve slozce pro fetch for d in coll.find({**fq, "mailstore_folder": folder}, {"subject": 1}): n_mail += 1 mid = d["_id"] seq = midmap.get(mid) subj = (d.get("subject") or "")[:50] if not seq: # opravdu nenalezeno -> oznac jako definitivne nedohledatelne coll.update_one({"_id": mid}, {"$set": {"seaweed_unresolved": True}}) continue try: raw = fetch_raw_by_seq(M, seq) except imaplib.IMAP4.abort: try: M.logout() except Exception: pass M = imap_connect() imap_select(M, folder) raw = fetch_raw_by_seq(M, seq) if not raw: coll.update_one({"_id": mid}, {"$set": {"seaweed_unresolved": True}}) continue enriched = upload_raw(raw, subj) coll.update_one({"_id": mid}, {"$set": {"seaweed_attachments": enriched, "seaweed_synced_at": datetime.now(timezone.utc)}, "$unset": {"seaweed_fetch_failed": ""}}) if n_mail % 200 == 0: progress() elif resumable: # davkove: kazdy zpracovany mail dostane seaweed_synced_at (i pri prazdne/ # chybne priloze -> resume vzdy konverguje, zadne zacykleni) while True: batch = list(coll.find(q, proj).limit(BATCH)) if not batch: break for d in batch: M, failed, enriched = handle_doc(d, M) n_mail += 1 set_fields = {"seaweed_synced_at": datetime.now(timezone.utc)} if failed: set_fields["seaweed_fetch_failed"] = True else: set_fields["seaweed_attachments"] = enriched coll.update_one({"_id": d["_id"]}, {"$set": set_fields}) if n_mail % 200 == 0: progress() if args.limit and n_mail >= args.limit: break if args.limit and n_mail >= args.limit: break else: cur = coll.find(q, proj) if args.limit: cur = cur.limit(args.limit) for d in cur: M, _failed, _enriched = handle_doc(d, M) n_mail += 1 if n_mail % 200 == 0: progress() try: M.logout() except Exception: pass print("-" * 64) print(f"Mailu zpracovano: {n_mail}") print(f"Priloh celkem: {n_att}") print(f"Nahrano novych: {n_up} ({bytes_up/1024/1024:.1f} MB)") print(f"Deduplikovano: {n_dedup} (uz v SeaweedFS)") print(f"Chyb: {n_err}") print(f"Trvalo: {time.time()-t0:.1f}s") return 0 if __name__ == "__main__": sys.exit(main())