Files
janssen/mailstore/mailstore_attachments_poc.py
administrator 6bcb721eb4 Přílohy ze všech 3 email pipeline → SeaweedFS (globální SHA-256 dedup)
Sjednocení ukládání příloh do jednoho blob storu na Tower1 (SeaweedFS Filer),
content-addressed cesta /mail-attachments/ab/cd/<sha256> přes sdílený
seaweed_store.py. Tři zdroje, jeden dedup:

- mailstore: mailstore_attachments_poc.py (pole seaweed_attachments[])
- Graph: 3_download_attachments v1.4→v1.5 (upload při stažení nové přílohy;
  attachments_index dostává seaweed_path/url/synced_at) + backfill graph
- JNJ: jnj_tower_ingest v1.2→v1.3 (upload při parse .msg; attachments[]
  dostává sha256/seaweed_path/url + doc-level seaweed_synced_at) + backfill jnj

Backfill skripty jsou idempotentní (batch+resume, --retry-errors). Výpadek
SeaweedFS žádnou pipeline neshodí (jen warning, doplní backfill).

Ověřeno: 114 726 objektů / 53.3 GB, 0 nesynchronizovaných dokumentů,
globální dedup mezi větvemi funguje.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-13 21:43:01 +02:00

455 lines
17 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
mailstore_attachments_poc.py — PROOF OF CONCEPT
Ulozi PRILOHY mailstore mailu do SeaweedFS (Tower1, 192.168.1.50) pres Filer
HTTP API. Deduplikace podle SHA-256 obsahu (stejny soubor = stejna cesta = 1 kopie).
Tok:
Mongo (source=mailstore, has_attachments) --> IMAP re-fetch RFC822 (podle
mailstore_uid + mailstore_folder) --> extrakce priloh --> PUT do Seaweedu.
Volitelne (--write-back) zapise do Mongo k mailu pole `seaweed_attachments`
(filename, sha256, seaweed_path/url) - NEDESTRUKTIVNE, vedle puvodnich metadat.
SeaweedFS (z Trilium "2026-06-13 SeaweedFS na Tower1"):
- Filer: http://192.168.1.50:8888 (anonymni, bez auth - PoC v interni siti)
- bloby na array pod paritou; LIMIT 1 GB/volume, max 50 volumu (~50 GB).
- upload = PUT raw telo (POST multipart na prvni zapis casto timeoutuje).
Pouziti:
python mailstore_attachments_poc.py vladimir.buzalka@buzalka.cz --limit 25
python mailstore_attachments_poc.py vladimir.buzalka@buzalka.cz --limit 25 --write-back
python mailstore_attachments_poc.py vladimir.buzalka@buzalka.cz --limit 5 --dry-run
Graphova vetev se NEMENI (ta uklada na /mnt/Emails). Tohle je samostatne PoC
nad SeaweedFS.
"""
from __future__ import annotations
import argparse
import base64 as _b64
import email
import hashlib
import imaplib
import ssl
import sys
import re
import time
from datetime import datetime, timezone
from email.header import decode_header
import requests
from pymongo import MongoClient
# --- config ------------------------------------------------------------------
MS_HOST = "192.168.1.53"
IMAP_PORT = 143
MS_USER = "admin"
MS_PASS = "*$N(B)vMUym!%"
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "emaily"
SEAWEED_FILER = "http://192.168.1.50:8888"
BASE_PATH = "/mail-attachments" # koren v SeaweedFS Fileru
HTTP_TIMEOUT = 60
# --- helpery (prevzato z mailstore_ingest_v1.0.py) ---------------------------
def _safe_decode(b: bytes, enc) -> str:
for e in (enc, "utf-8", "latin-1"):
if not e:
continue
try:
return b.decode(e, errors="replace")
except (LookupError, TypeError):
continue
return b.decode("utf-8", errors="replace")
def dec(s) -> str:
if not s:
return ""
out = []
for txt, enc in decode_header(s):
out.append(_safe_decode(txt, enc) if isinstance(txt, bytes) else txt)
return "".join(out).replace("\r", " ").replace("\n", " ").strip()
def encode_mutf7(s: str) -> str:
"""Nazev IMAP slozky -> modified UTF-7 (RFC 3501)."""
res = []
i, n = 0, len(s)
while i < n:
ch = s[i]
o = ord(ch)
if 0x20 <= o <= 0x7e:
res.append("&-" if ch == "&" else ch)
i += 1
else:
j = i
while j < n and not (0x20 <= ord(s[j]) <= 0x7e):
j += 1
b = s[i:j].encode("utf-16-be")
enc = _b64.b64encode(b).decode("ascii").rstrip("=").replace("/", ",")
res.append("&" + enc + "-")
i = j
return "".join(res)
def imap_connect(retries: int = 6, delay: float = 5.0) -> imaplib.IMAP4:
last = None
for attempt in range(1, retries + 1):
try:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
M = imaplib.IMAP4(MS_HOST, IMAP_PORT)
M.starttls(ssl_context=ctx)
M.login(MS_USER, MS_PASS)
return M
except (imaplib.IMAP4.abort, imaplib.IMAP4.error, OSError) as ex:
last = ex
print(f" ! imap_connect {attempt}/{retries} selhal: {ex} -> cekam {delay:.0f}s",
flush=True)
time.sleep(delay)
raise last
def imap_select(M: imaplib.IMAP4, folder: str):
return M.select(f'"{encode_mutf7(folder)}"', readonly=True)
def fetch_eml_by_seq(M: imaplib.IMAP4, folder: str, seq: int,
expect_mid: str | None = None) -> bytes | None:
"""Stahne syrovy RFC822 zpravy podle SEKVENCNIHO cisla (mailstore_uid drzi
ve skutecnosti sequence, ne IMAP UID; UID nejsou po restartech MailStore
stabilni a jsou derave). Archiv je append-only -> seq stabilni. Pro jistotu
overi Message-ID proti ulozenemu _id (chrani pred pripadnym driftem)."""
typ, _ = imap_select(M, folder)
if typ != "OK":
return None
typ, data = M.fetch(str(seq), "(RFC822)")
if typ != "OK" or not data or not isinstance(data[0], tuple):
return None
raw = data[0][1]
if expect_mid:
got = (email.message_from_bytes(raw).get("Message-ID") or "").strip()
if got != expect_mid:
return None
return raw
_SEQ_RX = re.compile(rb"^(\d+)\s")
def scan_folder_midmap(M: imaplib.IMAP4, folder: str) -> dict:
"""Naskenuje slozku a vrati mapu {Message-ID -> aktualni sekvencni cislo}.
Pouziva se pro slozky, kde ulozene mailstore_uid (seq) driftlo (zive slozky
Sent Items / Odeslana posta dostavaly postu behem ingestu)."""
typ, data = imap_select(M, folder)
if typ != "OK":
return {}
total = int(data[0]) if data and data[0] else 0
midmap = {}
lo, BATCH = 1, 2000
while lo <= total:
hi = min(lo + BATCH - 1, total)
typ, msgs = M.fetch(f"{lo}:{hi}", "(BODY.PEEK[HEADER.FIELDS (MESSAGE-ID)])")
for it in msgs:
if not isinstance(it, tuple):
continue
meta, hdr = it[0], it[1]
mseq = _SEQ_RX.match(meta or b"")
mid = (email.message_from_bytes(hdr or b"").get("Message-ID") or "").strip()
if mseq and mid:
midmap[mid] = int(mseq.group(1))
lo = hi + 1
return midmap
def fetch_raw_by_seq(M: imaplib.IMAP4, seq: int) -> bytes | None:
"""Stahne RFC822 podle sekvence (slozka uz musi byt SELECTnuta)."""
typ, data = M.fetch(str(seq), "(RFC822)")
if typ != "OK" or not data or not isinstance(data[0], tuple):
return None
return data[0][1]
def extract_attachments(raw: bytes) -> list[dict]:
"""Vrati seznam priloh: {filename, mime_type, is_inline, content(bytes)}."""
msg = email.message_from_bytes(raw)
atts = []
for part in msg.walk():
if part.is_multipart():
continue
ct = part.get_content_type()
disp = str(part.get("Content-Disposition") or "")
fname = part.get_filename()
is_att = "attachment" in disp or (fname and ct not in ("text/plain", "text/html"))
if not is_att:
continue
payload = part.get_payload(decode=True)
if not payload:
continue
atts.append({
"filename": dec(fname) or "(bez nazvu)",
"mime_type": ct,
"is_inline": "inline" in disp,
"content": payload,
})
return atts
# --- SeaweedFS Filer ---------------------------------------------------------
def seaweed_path(sha256: str) -> str:
"""Cesta deduplikovana podle obsahu: /mail-attachments/ab/cd/<hash>."""
return f"{BASE_PATH}/{sha256[:2]}/{sha256[2:4]}/{sha256}"
def seaweed_exists(sess: requests.Session, path: str) -> bool:
try:
r = sess.head(SEAWEED_FILER + path, timeout=HTTP_TIMEOUT)
return r.status_code == 200
except requests.RequestException:
return False
def seaweed_put(sess: requests.Session, path: str, data: bytes, mime: str) -> bool:
r = sess.put(SEAWEED_FILER + path, data=data,
headers={"Content-Type": mime or "application/octet-stream"},
timeout=HTTP_TIMEOUT)
return r.status_code in (200, 201)
# --- main --------------------------------------------------------------------
def main() -> int:
ap = argparse.ArgumentParser(description="PoC: mailstore prilohy -> SeaweedFS")
ap.add_argument("mailbox", help="Schranka = Mongo kolekce")
ap.add_argument("--limit", type=int, default=0,
help="Kolik MAILU s prilohami zpracovat (0 = VSECHNY)")
ap.add_argument("--write-back", action="store_true",
help="Zapis do Mongo pole `seaweed_attachments` (nedestruktivne). "
"Zaroven zapne RESUME (preskoci uz hotove maily).")
ap.add_argument("--dry-run", action="store_true",
help="Nic neukladej, jen ukaz co by se stalo")
ap.add_argument("--remap-failed", action="store_true",
help="Dofetchne maily oznacene seaweed_fetch_failed pres mapu "
"Message-ID->seq (pro slozky s driftlou sekvenci, napr. Sent Items).")
ap.add_argument("--log-file", default=None,
help="Presmeruj vystup do souboru (line-buffered) - pro dlouhy beh "
"na serveru, prezije pad spojeni.")
args = ap.parse_args()
if args.log_file:
_f = open(args.log_file, "a", buffering=1, encoding="utf-8")
sys.stdout = _f
sys.stderr = _f
t0 = time.time()
print(f"=== mailstore prilohy -> SeaweedFS (PoC) | {args.mailbox} ===")
print(f"Filer: {SEAWEED_FILER}{BASE_PATH} | limit mailu: {args.limit}"
f"{' [DRY-RUN]' if args.dry_run else ''}"
f"{' [WRITE-BACK]' if args.write_back else ''}")
mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
mongo.admin.command("ping")
coll = mongo[MONGO_DB][args.mailbox]
# resumable = davkovy samostranlujici rezim (imunni vuci timeoutu kurzoru):
# kazdy zpracovany mail dostane marker seaweed_synced_at -> prestane odpovidat
# dotazu -> dalsi davka vrati dalsi neudelane. Resume po preruseni zdarma.
resumable = args.write_back and not args.dry_run
BATCH = 500
q = {"source": "mailstore", "has_attachments": True}
if resumable:
q["seaweed_synced_at"] = {"$exists": False}
proj = {"mailstore_uid": 1, "mailstore_folder": 1, "subject": 1}
total_q = coll.count_documents(q)
print(f"Mailu s prilohami ke zpracovani: {total_q:,}"
f"{' (limit ' + str(args.limit) + ')' if args.limit else ''}", flush=True)
# per-prilohovy vypis jen u malych (PoC) behu; velky beh = jen progress + chyby
verbose = bool(args.limit and args.limit <= 100)
sess = requests.Session()
M = imap_connect()
n_mail = n_att = n_up = n_dedup = n_err = 0
bytes_up = 0
def upload_raw(raw, subj):
"""Z EML vytahne prilohy a nahraje do SeaweedFS (dedup). Vraci enriched[]."""
nonlocal n_att, n_up, n_dedup, n_err, bytes_up
enriched = []
for a in extract_attachments(raw):
n_att += 1
h = hashlib.sha256(a["content"]).hexdigest()
path = seaweed_path(h)
size = len(a["content"])
status = "?"
try:
if seaweed_exists(sess, path):
n_dedup += 1
status = "dedup"
elif args.dry_run:
status = "by-upload"
elif seaweed_put(sess, path, a["content"], a["mime_type"]):
n_up += 1
bytes_up += size
status = "upload"
else:
n_err += 1
status = "ERR-put"
except requests.RequestException as ex:
n_err += 1
status = f"ERR:{type(ex).__name__}"
if verbose or status.startswith("ERR"):
print(f" [{subj:50}] {status:9} {size:>8}B {a['filename'][:40]}", flush=True)
enriched.append({
"filename": a["filename"],
"mime_type": a["mime_type"],
"is_inline": a["is_inline"],
"size_bytes": size,
"sha256": h,
"seaweed_path": path,
"seaweed_url": SEAWEED_FILER + path,
})
return enriched
def handle_doc(d, M):
"""Stahne EML, nahraje prilohy. Vraci (M, failed, enriched)."""
nonlocal n_att, n_up, n_dedup, n_err, bytes_up
uid = d.get("mailstore_uid")
folder = d.get("mailstore_folder")
mid = d.get("_id")
subj = (d.get("subject") or "")[:50]
if not uid or not folder:
return M, True, []
try:
raw = fetch_eml_by_seq(M, folder, int(uid), mid)
except imaplib.IMAP4.abort:
# spojeni umrelo -> prepoj a zkus jeste jednou (nikdy nespadni)
try:
M.logout()
except Exception:
pass
try:
M = imap_connect()
raw = fetch_eml_by_seq(M, folder, int(uid), mid)
except Exception as ex:
n_err += 1
print(f" [seq={uid}] fetch selhal i po reconnectu: {type(ex).__name__}", flush=True)
return M, True, []
if not raw:
print(f" [seq={uid}] EML se nepodarilo stahnout / Message-ID nesedi", flush=True)
return M, True, []
return M, False, upload_raw(raw, subj)
def progress():
el = time.time() - t0
print(f" ... {n_mail:,} mailu | {n_up:,} nahrano / {n_dedup:,} dedup / "
f"{n_err} chyb | {bytes_up/1024/1024:.0f} MB | {el/60:.1f} min", flush=True)
if args.remap_failed:
# Dofetch maily se seaweed_fetch_failed: pro kazdou slozku naskenuj mapu
# Message-ID->aktualni seq, pak fetchni spravnou sekvenci a nahraj prilohy.
fq = {"source": "mailstore", "seaweed_fetch_failed": True}
folders = coll.distinct("mailstore_folder", fq)
print(f"Slozek s failed maily: {len(folders)}", flush=True)
for folder in folders:
cnt = coll.count_documents({**fq, "mailstore_folder": folder})
print(f"== {folder.split('/')[-1]} ({cnt} mailu) - skenuji mapu Message-ID...", flush=True)
try:
midmap = scan_folder_midmap(M, folder)
except imaplib.IMAP4.abort:
try:
M.logout()
except Exception:
pass
M = imap_connect()
midmap = scan_folder_midmap(M, folder)
print(f" mapa: {len(midmap)} zprav", flush=True)
imap_select(M, folder) # zustan ve slozce pro fetch
for d in coll.find({**fq, "mailstore_folder": folder},
{"subject": 1}):
n_mail += 1
mid = d["_id"]
seq = midmap.get(mid)
subj = (d.get("subject") or "")[:50]
if not seq:
# opravdu nenalezeno -> oznac jako definitivne nedohledatelne
coll.update_one({"_id": mid},
{"$set": {"seaweed_unresolved": True}})
continue
try:
raw = fetch_raw_by_seq(M, seq)
except imaplib.IMAP4.abort:
try:
M.logout()
except Exception:
pass
M = imap_connect()
imap_select(M, folder)
raw = fetch_raw_by_seq(M, seq)
if not raw:
coll.update_one({"_id": mid}, {"$set": {"seaweed_unresolved": True}})
continue
enriched = upload_raw(raw, subj)
coll.update_one({"_id": mid},
{"$set": {"seaweed_attachments": enriched,
"seaweed_synced_at": datetime.now(timezone.utc)},
"$unset": {"seaweed_fetch_failed": ""}})
if n_mail % 200 == 0:
progress()
elif resumable:
# davkove: kazdy zpracovany mail dostane seaweed_synced_at (i pri prazdne/
# chybne priloze -> resume vzdy konverguje, zadne zacykleni)
while True:
batch = list(coll.find(q, proj).limit(BATCH))
if not batch:
break
for d in batch:
M, failed, enriched = handle_doc(d, M)
n_mail += 1
set_fields = {"seaweed_synced_at": datetime.now(timezone.utc)}
if failed:
set_fields["seaweed_fetch_failed"] = True
else:
set_fields["seaweed_attachments"] = enriched
coll.update_one({"_id": d["_id"]}, {"$set": set_fields})
if n_mail % 200 == 0:
progress()
if args.limit and n_mail >= args.limit:
break
if args.limit and n_mail >= args.limit:
break
else:
cur = coll.find(q, proj)
if args.limit:
cur = cur.limit(args.limit)
for d in cur:
M, _failed, _enriched = handle_doc(d, M)
n_mail += 1
if n_mail % 200 == 0:
progress()
try:
M.logout()
except Exception:
pass
print("-" * 64)
print(f"Mailu zpracovano: {n_mail}")
print(f"Priloh celkem: {n_att}")
print(f"Nahrano novych: {n_up} ({bytes_up/1024/1024:.1f} MB)")
print(f"Deduplikovano: {n_dedup} (uz v SeaweedFS)")
print(f"Chyb: {n_err}")
print(f"Trvalo: {time.time()-t0:.1f}s")
return 0
if __name__ == "__main__":
sys.exit(main())