6bcb721eb4
Sjednocení ukládání příloh do jednoho blob storu na Tower1 (SeaweedFS Filer), content-addressed cesta /mail-attachments/ab/cd/<sha256> přes sdílený seaweed_store.py. Tři zdroje, jeden dedup: - mailstore: mailstore_attachments_poc.py (pole seaweed_attachments[]) - Graph: 3_download_attachments v1.4→v1.5 (upload při stažení nové přílohy; attachments_index dostává seaweed_path/url/synced_at) + backfill graph - JNJ: jnj_tower_ingest v1.2→v1.3 (upload při parse .msg; attachments[] dostává sha256/seaweed_path/url + doc-level seaweed_synced_at) + backfill jnj Backfill skripty jsou idempotentní (batch+resume, --retry-errors). Výpadek SeaweedFS žádnou pipeline neshodí (jen warning, doplní backfill). Ověřeno: 114 726 objektů / 53.3 GB, 0 nesynchronizovaných dokumentů, globální dedup mezi větvemi funguje. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
184 lines
6.8 KiB
Python
184 lines
6.8 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
seaweed_attachments_backfill_graph.py
|
|
Jednorazovy backfill: stazene Graph prilohy z disku -> SeaweedFS na Tower1.
|
|
|
|
Zdroj pravdy je kolekce emaily.attachments_index (dedup dle SHA-256 = _id).
|
|
Kazdy zaznam ukazuje na soubor /mnt/Emails/<mailbox>/Attachments/<local_path>.
|
|
Skript soubor nacte, overi hash, nahraje do SeaweedFS (idempotentne, dedup
|
|
dle obsahu — identicky obsah jiz nahrany mailstore vetvi se preskoci) a do
|
|
index dokumentu doplni:
|
|
seaweed_path, seaweed_url, seaweed_synced_at
|
|
|
|
RESUME + BATCH: bere jen zaznamy bez `seaweed_synced_at`, kazdy zpracovany
|
|
oznaci (i pri chybe) -> vypadne z dotazu, kurzor nikdy nezije dlouho a beh
|
|
lze kdykoli prerusit a spustit znovu.
|
|
|
|
Chybove stavy (taky dostanou seaweed_synced_at, aby nezacyklily resume):
|
|
seaweed_file_missing : True — soubor na disku nenalezen
|
|
seaweed_hash_mismatch: <hash> — obsah souboru ma jiny hash nez _id
|
|
(ulozeno pod skutecnym hashem obsahu)
|
|
seaweed_upload_error : <msg> — SeaweedFS PUT/spojeni selhalo
|
|
|
|
Spousteni (v python-runner kontejneru na Toweru, kde je /mnt/Emails):
|
|
docker exec python-runner python /scripts/seaweed_attachments_backfill_graph.py
|
|
... --dry-run # nic nezapise, jen spocita
|
|
... --limit 500 # jen N zaznamu (test)
|
|
... --retry-errors # znovu i zaznamy s seaweed_*_error/missing/mismatch
|
|
"""
|
|
|
|
import sys
|
|
import time
|
|
import hashlib
|
|
import argparse
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
|
|
from pymongo import MongoClient, UpdateOne
|
|
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent))
|
|
import seaweed_store as sw
|
|
|
|
if hasattr(sys.stdout, "reconfigure"):
|
|
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
|
|
|
|
MONGO_URI = "mongodb://192.168.1.76:27017"
|
|
MONGO_DB = "emaily"
|
|
MONGO_COL_INDEX = "attachments_index"
|
|
EMAILS_BASE_DIR = Path("/mnt/Emails")
|
|
BATCH = 500
|
|
|
|
|
|
def sha256(b: bytes) -> str:
|
|
return hashlib.sha256(b).hexdigest()
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser(description="Backfill Graph priloh -> SeaweedFS")
|
|
ap.add_argument("--limit", type=int, default=0,
|
|
help="Zpracovat max N zaznamu (0 = vse)")
|
|
ap.add_argument("--dry-run", action="store_true",
|
|
help="Nic nezapisovat (do SeaweedFS ani do Mongo), jen report")
|
|
ap.add_argument("--retry-errors", action="store_true",
|
|
help="Znovu i zaznamy oznacene seaweed_file_missing / "
|
|
"seaweed_hash_mismatch / seaweed_upload_error")
|
|
args = ap.parse_args()
|
|
|
|
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
|
|
client.admin.command("ping")
|
|
col = client[MONGO_DB][MONGO_COL_INDEX]
|
|
|
|
if args.retry_errors:
|
|
# znovu vse co nema platnou seaweed cestu
|
|
base_query = {"seaweed_path": {"$exists": False}}
|
|
else:
|
|
base_query = {"seaweed_synced_at": {"$exists": False}}
|
|
|
|
total_target = col.count_documents(base_query)
|
|
print(f"=== Backfill Graph priloh -> SeaweedFS ===")
|
|
print(f"Filer: {sw.SEAWEED_FILER}{sw.BASE_PATH}")
|
|
print(f"Zaznamu ke zpracovani: {total_target}"
|
|
f"{' (DRY-RUN)' if args.dry_run else ''}")
|
|
if total_target == 0:
|
|
print("Neni co delat.")
|
|
return 0
|
|
|
|
t0 = time.time()
|
|
done = uploaded = dedup = missing = mismatch = errors = 0
|
|
|
|
while True:
|
|
if args.limit and done >= args.limit:
|
|
break
|
|
take = BATCH
|
|
if args.limit:
|
|
take = min(BATCH, args.limit - done)
|
|
docs = list(col.find(base_query,
|
|
{"_id": 1, "mailbox": 1, "local_path": 1,
|
|
"mime_type": 1}).limit(take))
|
|
if not docs:
|
|
break
|
|
|
|
ops = []
|
|
for d in docs:
|
|
done += 1
|
|
_id = d["_id"]
|
|
mailbox = d.get("mailbox", "")
|
|
local = d.get("local_path", "")
|
|
mime = d.get("mime_type") or "application/octet-stream"
|
|
now = datetime.now(timezone.utc).replace(tzinfo=None)
|
|
|
|
if not local:
|
|
missing += 1
|
|
ops.append(UpdateOne({"_id": _id}, {"$set": {
|
|
"seaweed_file_missing": True, "seaweed_synced_at": now}}))
|
|
continue
|
|
|
|
fpath = EMAILS_BASE_DIR / mailbox / "Attachments" / local
|
|
if not fpath.is_file():
|
|
missing += 1
|
|
print(f" MISS {mailbox}/{local}")
|
|
ops.append(UpdateOne({"_id": _id}, {"$set": {
|
|
"seaweed_file_missing": True, "seaweed_synced_at": now}}))
|
|
continue
|
|
|
|
try:
|
|
data = fpath.read_bytes()
|
|
except OSError as e:
|
|
errors += 1
|
|
print(f" ERR read {fpath}: {e}")
|
|
ops.append(UpdateOne({"_id": _id}, {"$set": {
|
|
"seaweed_upload_error": f"read: {e}", "seaweed_synced_at": now}}))
|
|
continue
|
|
|
|
real_hash = sha256(data)
|
|
set_fields = {"seaweed_synced_at": now}
|
|
if real_hash != _id:
|
|
mismatch += 1
|
|
set_fields["seaweed_hash_mismatch"] = _id # puvodni _id pro audit
|
|
# ulozime pod SKUTECNYM hashem obsahu (content-addressed je spravne)
|
|
|
|
if args.dry_run:
|
|
continue
|
|
|
|
try:
|
|
path, url, was_new = sw.store(real_hash, data, mime)
|
|
except Exception as e:
|
|
errors += 1
|
|
print(f" ERR put {mailbox}/{local}: {e}")
|
|
ops.append(UpdateOne({"_id": _id}, {"$set": {
|
|
"seaweed_upload_error": str(e), "seaweed_synced_at": now}}))
|
|
continue
|
|
|
|
if was_new:
|
|
uploaded += 1
|
|
else:
|
|
dedup += 1
|
|
set_fields["seaweed_path"] = path
|
|
set_fields["seaweed_url"] = url
|
|
# vycistit pripadne stare chybove vlajky pri uspechu
|
|
unset = {"seaweed_file_missing": "", "seaweed_upload_error": ""}
|
|
ops.append(UpdateOne({"_id": _id},
|
|
{"$set": set_fields, "$unset": unset}))
|
|
|
|
if ops and not args.dry_run:
|
|
col.bulk_write(ops, ordered=False)
|
|
|
|
rate = done / max(time.time() - t0, 0.001)
|
|
print(f" {done}/{total_target} up={uploaded} dedup={dedup} "
|
|
f"miss={missing} mism={mismatch} err={errors} ({rate:.0f}/s)")
|
|
|
|
if args.dry_run and not args.limit:
|
|
# v dry-run nic nemizi z dotazu -> jeden pruchod a konec
|
|
break
|
|
|
|
dt = time.time() - t0
|
|
print(f"\n=== HOTOVO za {dt/60:.1f} min ===")
|
|
print(f"zpracovano={done} nahrano={uploaded} dedup={dedup} "
|
|
f"chybi_soubor={missing} hash_mismatch={mismatch} chyby={errors}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|