Přílohy ze všech 3 email pipeline → SeaweedFS (globální SHA-256 dedup)

Sjednocení ukládání příloh do jednoho blob storu na Tower1 (SeaweedFS Filer),
content-addressed cesta /mail-attachments/ab/cd/<sha256> přes sdílený
seaweed_store.py. Tři zdroje, jeden dedup:

- mailstore: mailstore_attachments_poc.py (pole seaweed_attachments[])
- Graph: 3_download_attachments v1.4→v1.5 (upload při stažení nové přílohy;
  attachments_index dostává seaweed_path/url/synced_at) + backfill graph
- JNJ: jnj_tower_ingest v1.2→v1.3 (upload při parse .msg; attachments[]
  dostává sha256/seaweed_path/url + doc-level seaweed_synced_at) + backfill jnj

Backfill skripty jsou idempotentní (batch+resume, --retry-errors). Výpadek
SeaweedFS žádnou pipeline neshodí (jen warning, doplní backfill).

Ověřeno: 114 726 objektů / 53.3 GB, 0 nesynchronizovaných dokumentů,
globální dedup mezi větvemi funguje.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-13 21:43:01 +02:00
parent 35e6310dac
commit 6bcb721eb4
9 changed files with 1008 additions and 20 deletions
+1 -1
View File
@@ -56,7 +56,7 @@ if _REQ_FILE.exists():
# Definice pipeline (step_id, label, executable filename)
STEPS = [
("1b", "Graph delta sync", "1b_parse_emails_graph_delta_v1.0.py"),
("3", "Download attachments", "3_download_attachments_v1.4.py"),
("3", "Download attachments", "3_download_attachments_v1.5.py"),
("4", "Unwrap S/MIME", "4_unwrap_smime_v1.0.py"),
("5", "Enrich fulltext (PG)", "5_enrich_fulltext_emails_v1.3.py"),
]
@@ -1,14 +1,14 @@
"""
download_attachments_v1.4.py
Nazev: download_attachments_v1.4.py
Verze: 1.4
Datum: 2026-06-04
download_attachments_v1.5.py
Nazev: download_attachments_v1.5.py
Verze: 1.5
Datum: 2026-06-13
Autor: vladimir.buzalka
Popis:
Stahuje skutecne prilohy (is_inline=False) vsech emailu z MongoDB
pres Microsoft Graph API a uklada je do adresare
/mnt/Emails/<schranka>/Attachments/.
/mnt/Emails/<schranka>/Attachments/ a zaroven do SeaweedFS (Tower1).
Bez argumentu --mailbox projede vsechny kolekce v `emaily` mimo
NON_MAILBOX_COLLECTIONS a SKIP_MAILBOXES.
@@ -20,7 +20,13 @@ Popis:
Po ulozeni aktualizuje MongoDB:
- v email dokumentu: kazda priloha dostane file_hash + local_path
- kolekce emaily.attachments_index: _id=hash, filename, ...
- kolekce emaily.attachments_index: _id=hash, filename, ...,
seaweed_path, seaweed_url, seaweed_synced_at (viz seaweed_store.py)
SeaweedFS: nova priloha se krome disku nahraje i do Fileru na Tower1
(/mail-attachments/ab/cd/<hash>, dedup dle obsahu, sdilene s mailstore
vetvi). Vypadek SeaweedFS pipeline neshodi soubor i index se zapisou,
seaweed pole pak doplni seaweed_attachments_backfill_graph.py.
NOVE v 1.4:
- Spravne zpracovani vsech typu priloh:
@@ -60,6 +66,9 @@ Historie verzi:
1.3 2026-06-02 Primarni stazeni pres graph_att_id; --mailbox volitelny
1.4 2026-06-04 itemAttachment/referenceAttachment handling; retry s backoffem;
permanentni tagging chyb (attachment_missing / attachment_reference)
1.5 2026-06-13 Nova priloha se zaroven nahrava do SeaweedFS (Tower1) pres
sdileny seaweed_store.py; index dostane seaweed_path/url/synced_at.
Vypadek SeaweedFS pipeline neshodi (fallback = backfill skript).
"""
import sys
@@ -78,6 +87,9 @@ import msal
import requests
from pymongo import MongoClient, UpdateOne
sys.path.insert(0, str(Path(__file__).resolve().parent))
import seaweed_store as sw
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
@@ -93,7 +105,7 @@ MONGO_COL_INDEX = "attachments_index"
EMAILS_BASE_DIR = Path("/mnt/Emails")
LOG_FILE = Path(__file__).parent / "parse_emails_errors.log"
SCRIPT_VERSION = "1.4"
SCRIPT_VERSION = "1.5"
BATCH_SIZE = 50
# Typy příloh které přeskočíme (S/MIME podpisy, certifikáty)
@@ -377,7 +389,7 @@ def save_attachment(content: bytes, original_name: str, mime_type: str,
file_path = att_dir / filename
file_path.write_bytes(content)
col_index.insert_one({
doc = {
"_id": hash_val,
"filename": filename,
"local_path": filename,
@@ -386,7 +398,21 @@ def save_attachment(content: bytes, original_name: str, mime_type: str,
"mailbox": mailbox,
"first_seen_at": datetime.now(timezone.utc).replace(tzinfo=None),
"ref_count": 1,
})
}
# Zaroven do SeaweedFS (dedup dle obsahu, sdilene s mailstore vetvi).
# Vypadek SeaweedFS NESMI shodit pipeline — soubor + index se zapisou vzdy,
# seaweed pole pak doplni seaweed_attachments_backfill_graph.py.
try:
path, url, _ = sw.store(hash_val, content, mime_type)
doc["seaweed_path"] = path
doc["seaweed_url"] = url
doc["seaweed_synced_at"] = datetime.now(timezone.utc).replace(tzinfo=None)
except Exception as e:
logging.warning("SeaweedFS upload selhal pro %s (%s): %s",
filename, hash_val[:12], e)
col_index.insert_one(doc)
return hash_val, filename, True
@@ -405,13 +431,17 @@ def process_mailbox(client, mailbox: str, args) -> dict:
col_emails = client[MONGO_DB][mongo_col]
col_index = client[MONGO_DB][MONGO_COL_INDEX]
# source=mailstore zpravy maji vlastni stahovani priloh (mailstore_ingest);
# v zive schrance uz neexistuji, takze Graph fetch by jen selhal (nenalezeno).
# Tato pipeline je proto kompletne ignoruje. Graph zpravy pole `source` nemaji.
if args.force_recheck:
query = {"has_attachments": True}
query = {"has_attachments": True, "source": {"$ne": "mailstore"}}
else:
# priloha "ke zpracovani" = neni inline, nema file_hash, neni oznacena
# jako missing/reference
query = {
"has_attachments": True,
"source": {"$ne": "mailstore"},
"attachments": {
"$elemMatch": {
"is_inline": False,
+40
View File
@@ -0,0 +1,40 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Deploy seaweed zmen do /mnt/user/Scripts na Unraidu (.76) pres SFTP."""
import sys, paramiko
from pathlib import Path
LOCAL = Path(r"U:\janssen\Python-runner")
REMOTE = "/mnt/user/Scripts"
FILES = [
"seaweed_store.py",
"seaweed_attachments_backfill_graph.py",
"3_download_attachments_v1.5.py",
"0_run_pipeline_v1.0.py",
]
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
c.connect("192.168.1.76", username="root", password="7309208104", timeout=10)
sftp = c.open_sftp()
for f in FILES:
lp = LOCAL / f
rp = f"{REMOTE}/{f}"
sftp.put(str(lp), rp)
print(f"PUT {f} ({lp.stat().st_size} B)")
# smaz stary v1.4
old = f"{REMOTE}/3_download_attachments_v1.4.py"
try:
sftp.remove(old)
print("DEL 3_download_attachments_v1.4.py")
except IOError as e:
print(f"(v1.4 uz neni / {e})")
# overeni
_, out, _ = c.exec_command(f"ls -la {REMOTE}/seaweed_store.py "
f"{REMOTE}/seaweed_attachments_backfill_graph.py "
f"{REMOTE}/3_download_attachments_v1.5.py")
print(out.read().decode())
sftp.close(); c.close()
@@ -0,0 +1,183 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
seaweed_attachments_backfill_graph.py
Jednorazovy backfill: stazene Graph prilohy z disku -> SeaweedFS na Tower1.
Zdroj pravdy je kolekce emaily.attachments_index (dedup dle SHA-256 = _id).
Kazdy zaznam ukazuje na soubor /mnt/Emails/<mailbox>/Attachments/<local_path>.
Skript soubor nacte, overi hash, nahraje do SeaweedFS (idempotentne, dedup
dle obsahu — identicky obsah jiz nahrany mailstore vetvi se preskoci) a do
index dokumentu doplni:
seaweed_path, seaweed_url, seaweed_synced_at
RESUME + BATCH: bere jen zaznamy bez `seaweed_synced_at`, kazdy zpracovany
oznaci (i pri chybe) -> vypadne z dotazu, kurzor nikdy nezije dlouho a beh
lze kdykoli prerusit a spustit znovu.
Chybove stavy (taky dostanou seaweed_synced_at, aby nezacyklily resume):
seaweed_file_missing : True — soubor na disku nenalezen
seaweed_hash_mismatch: <hash> — obsah souboru ma jiny hash nez _id
(ulozeno pod skutecnym hashem obsahu)
seaweed_upload_error : <msg> — SeaweedFS PUT/spojeni selhalo
Spousteni (v python-runner kontejneru na Toweru, kde je /mnt/Emails):
docker exec python-runner python /scripts/seaweed_attachments_backfill_graph.py
... --dry-run # nic nezapise, jen spocita
... --limit 500 # jen N zaznamu (test)
... --retry-errors # znovu i zaznamy s seaweed_*_error/missing/mismatch
"""
import sys
import time
import hashlib
import argparse
from pathlib import Path
from datetime import datetime, timezone
from pymongo import MongoClient, UpdateOne
sys.path.insert(0, str(Path(__file__).resolve().parent))
import seaweed_store as sw
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "emaily"
MONGO_COL_INDEX = "attachments_index"
EMAILS_BASE_DIR = Path("/mnt/Emails")
BATCH = 500
def sha256(b: bytes) -> str:
return hashlib.sha256(b).hexdigest()
def main() -> int:
ap = argparse.ArgumentParser(description="Backfill Graph priloh -> SeaweedFS")
ap.add_argument("--limit", type=int, default=0,
help="Zpracovat max N zaznamu (0 = vse)")
ap.add_argument("--dry-run", action="store_true",
help="Nic nezapisovat (do SeaweedFS ani do Mongo), jen report")
ap.add_argument("--retry-errors", action="store_true",
help="Znovu i zaznamy oznacene seaweed_file_missing / "
"seaweed_hash_mismatch / seaweed_upload_error")
args = ap.parse_args()
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
client.admin.command("ping")
col = client[MONGO_DB][MONGO_COL_INDEX]
if args.retry_errors:
# znovu vse co nema platnou seaweed cestu
base_query = {"seaweed_path": {"$exists": False}}
else:
base_query = {"seaweed_synced_at": {"$exists": False}}
total_target = col.count_documents(base_query)
print(f"=== Backfill Graph priloh -> SeaweedFS ===")
print(f"Filer: {sw.SEAWEED_FILER}{sw.BASE_PATH}")
print(f"Zaznamu ke zpracovani: {total_target}"
f"{' (DRY-RUN)' if args.dry_run else ''}")
if total_target == 0:
print("Neni co delat.")
return 0
t0 = time.time()
done = uploaded = dedup = missing = mismatch = errors = 0
while True:
if args.limit and done >= args.limit:
break
take = BATCH
if args.limit:
take = min(BATCH, args.limit - done)
docs = list(col.find(base_query,
{"_id": 1, "mailbox": 1, "local_path": 1,
"mime_type": 1}).limit(take))
if not docs:
break
ops = []
for d in docs:
done += 1
_id = d["_id"]
mailbox = d.get("mailbox", "")
local = d.get("local_path", "")
mime = d.get("mime_type") or "application/octet-stream"
now = datetime.now(timezone.utc).replace(tzinfo=None)
if not local:
missing += 1
ops.append(UpdateOne({"_id": _id}, {"$set": {
"seaweed_file_missing": True, "seaweed_synced_at": now}}))
continue
fpath = EMAILS_BASE_DIR / mailbox / "Attachments" / local
if not fpath.is_file():
missing += 1
print(f" MISS {mailbox}/{local}")
ops.append(UpdateOne({"_id": _id}, {"$set": {
"seaweed_file_missing": True, "seaweed_synced_at": now}}))
continue
try:
data = fpath.read_bytes()
except OSError as e:
errors += 1
print(f" ERR read {fpath}: {e}")
ops.append(UpdateOne({"_id": _id}, {"$set": {
"seaweed_upload_error": f"read: {e}", "seaweed_synced_at": now}}))
continue
real_hash = sha256(data)
set_fields = {"seaweed_synced_at": now}
if real_hash != _id:
mismatch += 1
set_fields["seaweed_hash_mismatch"] = _id # puvodni _id pro audit
# ulozime pod SKUTECNYM hashem obsahu (content-addressed je spravne)
if args.dry_run:
continue
try:
path, url, was_new = sw.store(real_hash, data, mime)
except Exception as e:
errors += 1
print(f" ERR put {mailbox}/{local}: {e}")
ops.append(UpdateOne({"_id": _id}, {"$set": {
"seaweed_upload_error": str(e), "seaweed_synced_at": now}}))
continue
if was_new:
uploaded += 1
else:
dedup += 1
set_fields["seaweed_path"] = path
set_fields["seaweed_url"] = url
# vycistit pripadne stare chybove vlajky pri uspechu
unset = {"seaweed_file_missing": "", "seaweed_upload_error": ""}
ops.append(UpdateOne({"_id": _id},
{"$set": set_fields, "$unset": unset}))
if ops and not args.dry_run:
col.bulk_write(ops, ordered=False)
rate = done / max(time.time() - t0, 0.001)
print(f" {done}/{total_target} up={uploaded} dedup={dedup} "
f"miss={missing} mism={mismatch} err={errors} ({rate:.0f}/s)")
if args.dry_run and not args.limit:
# v dry-run nic nemizi z dotazu -> jeden pruchod a konec
break
dt = time.time() - t0
print(f"\n=== HOTOVO za {dt/60:.1f} min ===")
print(f"zpracovano={done} nahrano={uploaded} dedup={dedup} "
f"chybi_soubor={missing} hash_mismatch={mismatch} chyby={errors}")
return 0
if __name__ == "__main__":
sys.exit(main())
+77
View File
@@ -0,0 +1,77 @@
"""
seaweed_store.py
Sdileny helper pro ukladani priloh do SeaweedFS Fileru na Tower1 (192.168.1.50).
Pouzivaji:
- 3_download_attachments_v1.5.py (Graph pipeline, dalsi behy)
- seaweed_attachments_backfill_graph.py (jednorazovy backfill jiz stazenych priloh)
- mailstore/mailstore_attachments_poc.py (mailstore vetev — stejne schema cesty)
Schema cesty je content-addressed dle SHA-256 obsahu:
/mail-attachments/<ab>/<cd>/<plny-hash>
=> identicky obsah z libovolneho zdroje (Graph i mailstore) skonci na stejne
ceste a deduplikuje se globalne.
Upload: PUT raw telo (POST multipart na prvni zapis do Fileru timeoutuje!).
"""
import requests
SEAWEED_FILER = "http://192.168.1.50:8888"
BASE_PATH = "/mail-attachments"
HTTP_TIMEOUT = 60
# Lazy modulova session — znovupouziti spojeni napric volanimi v ramci procesu.
_SESSION: requests.Session | None = None
def _session() -> requests.Session:
global _SESSION
if _SESSION is None:
_SESSION = requests.Session()
return _SESSION
def seaweed_path(sha256: str) -> str:
"""Cesta deduplikovana podle obsahu: /mail-attachments/ab/cd/<hash>."""
return f"{BASE_PATH}/{sha256[:2]}/{sha256[2:4]}/{sha256}"
def seaweed_url(sha256: str) -> str:
return SEAWEED_FILER + seaweed_path(sha256)
def exists(path: str, sess: requests.Session | None = None) -> bool:
sess = sess or _session()
try:
r = sess.head(SEAWEED_FILER + path, timeout=HTTP_TIMEOUT)
return r.status_code == 200
except requests.RequestException:
return False
def put(path: str, data: bytes, mime: str, sess: requests.Session | None = None) -> bool:
sess = sess or _session()
r = sess.put(SEAWEED_FILER + path, data=data,
headers={"Content-Type": mime or "application/octet-stream"},
timeout=HTTP_TIMEOUT)
return r.status_code in (200, 201)
def store(sha256: str, data: bytes, mime: str,
sess: requests.Session | None = None) -> tuple[str, str, bool]:
"""Ulozi obsah do SeaweedFS (idempotentne, dedup dle hashe).
Vraci (path, url, uploaded):
uploaded=True pokud byl objekt nove nahran
uploaded=False pokud uz na ceste existoval (dedup hit)
Vyhazuje requests.RequestException / RuntimeError pri selhani zapisu —
volajici si osetri (pipeline nesmi spadnout, jen preskoci seaweed pole).
"""
sess = sess or _session()
path = seaweed_path(sha256)
if exists(path, sess):
return path, SEAWEED_FILER + path, False
if not put(path, data, mime, sess):
raise RuntimeError(f"SeaweedFS PUT selhal pro {path}")
return path, SEAWEED_FILER + path, True