Files
administrator 6bcb721eb4 Přílohy ze všech 3 email pipeline → SeaweedFS (globální SHA-256 dedup)
Sjednocení ukládání příloh do jednoho blob storu na Tower1 (SeaweedFS Filer),
content-addressed cesta /mail-attachments/ab/cd/<sha256> přes sdílený
seaweed_store.py. Tři zdroje, jeden dedup:

- mailstore: mailstore_attachments_poc.py (pole seaweed_attachments[])
- Graph: 3_download_attachments v1.4→v1.5 (upload při stažení nové přílohy;
  attachments_index dostává seaweed_path/url/synced_at) + backfill graph
- JNJ: jnj_tower_ingest v1.2→v1.3 (upload při parse .msg; attachments[]
  dostává sha256/seaweed_path/url + doc-level seaweed_synced_at) + backfill jnj

Backfill skripty jsou idempotentní (batch+resume, --retry-errors). Výpadek
SeaweedFS žádnou pipeline neshodí (jen warning, doplní backfill).

Ověřeno: 114 726 objektů / 53.3 GB, 0 nesynchronizovaných dokumentů,
globální dedup mezi větvemi funguje.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-13 21:43:01 +02:00

78 lines
2.6 KiB
Python

"""
seaweed_store.py
Sdileny helper pro ukladani priloh do SeaweedFS Fileru na Tower1 (192.168.1.50).
Pouzivaji:
- 3_download_attachments_v1.5.py (Graph pipeline, dalsi behy)
- seaweed_attachments_backfill_graph.py (jednorazovy backfill jiz stazenych priloh)
- mailstore/mailstore_attachments_poc.py (mailstore vetev — stejne schema cesty)
Schema cesty je content-addressed dle SHA-256 obsahu:
/mail-attachments/<ab>/<cd>/<plny-hash>
=> identicky obsah z libovolneho zdroje (Graph i mailstore) skonci na stejne
ceste a deduplikuje se globalne.
Upload: PUT raw telo (POST multipart na prvni zapis do Fileru timeoutuje!).
"""
import requests
SEAWEED_FILER = "http://192.168.1.50:8888"
BASE_PATH = "/mail-attachments"
HTTP_TIMEOUT = 60
# Lazy modulova session — znovupouziti spojeni napric volanimi v ramci procesu.
_SESSION: requests.Session | None = None
def _session() -> requests.Session:
global _SESSION
if _SESSION is None:
_SESSION = requests.Session()
return _SESSION
def seaweed_path(sha256: str) -> str:
"""Cesta deduplikovana podle obsahu: /mail-attachments/ab/cd/<hash>."""
return f"{BASE_PATH}/{sha256[:2]}/{sha256[2:4]}/{sha256}"
def seaweed_url(sha256: str) -> str:
return SEAWEED_FILER + seaweed_path(sha256)
def exists(path: str, sess: requests.Session | None = None) -> bool:
sess = sess or _session()
try:
r = sess.head(SEAWEED_FILER + path, timeout=HTTP_TIMEOUT)
return r.status_code == 200
except requests.RequestException:
return False
def put(path: str, data: bytes, mime: str, sess: requests.Session | None = None) -> bool:
sess = sess or _session()
r = sess.put(SEAWEED_FILER + path, data=data,
headers={"Content-Type": mime or "application/octet-stream"},
timeout=HTTP_TIMEOUT)
return r.status_code in (200, 201)
def store(sha256: str, data: bytes, mime: str,
sess: requests.Session | None = None) -> tuple[str, str, bool]:
"""Ulozi obsah do SeaweedFS (idempotentne, dedup dle hashe).
Vraci (path, url, uploaded):
uploaded=True pokud byl objekt nove nahran
uploaded=False pokud uz na ceste existoval (dedup hit)
Vyhazuje requests.RequestException / RuntimeError pri selhani zapisu —
volajici si osetri (pipeline nesmi spadnout, jen preskoci seaweed pole).
"""
sess = sess or _session()
path = seaweed_path(sha256)
if exists(path, sess):
return path, SEAWEED_FILER + path, False
if not put(path, data, mime, sess):
raise RuntimeError(f"SeaweedFS PUT selhal pro {path}")
return path, SEAWEED_FILER + path, True