Files
janssen/Python-runner/5_enrich_fulltext_emails_v1.3.py
2026-06-05 21:21:30 +02:00

568 lines
21 KiB
Python

"""
==============================================================================
Skript: enrich_fulltext_emails_v1.3.py
Verze: 1.3
Datum: 2026-06-04
Autor: vladimir.buzalka
Popis:
Vytahne plny text z emailu ulozenych v MongoDB (db: emaily) a ulozi ho do
PostgreSQL (db: MongoEmaily, tabulka: emails) s GIN tsvector indexem.
Emaily se NESTAHUJI znovu - tela uz jsou v Mongo z parse_emails_graph_v1.4
(a refetch_text_bodies_v1.0 pro stare plain-text emaily).
Tento skript jen vybere prvni dostupne telo a posle text do PG na fulltext.
Zmeny v1.3 vs v1.2:
- Bugfix: NON_MAILBOX_COLLECTIONS = {"attachments_index", "sync_state"}
(sync_state pribyla v delta syncu, predtim ji v1.2 brala jako mailbox).
- --index-reset: pred zpracovanim schranky vymaze vsechny jeji emaily z PG
(force re-extract; pouzij kdyz povysis EXTRACTOR_VERSION nebo chces ciste).
- Vylepseny header per-mailbox: ukaze pocet v Mongu, v PG a k zpracovani.
Zmeny v1.2 vs v1.1:
- S/MIME emaily: pokud unwrap_smime_v1.0 ulozil smime_body_text/smime_body_html,
pouzije se PREFEROVANE pred bezvyznamnym wrapper telem.
- body_source: nova hodnota "smime".
- EXTRACTOR_VERSION=1.2 -> vsechny existujici emaily v PG se preparsuji.
Zmeny v1.1 vs v1.0:
- Fallback poradi rozsireno o body_text.
- body_source umi novou hodnotu "text" (plne plain-text telo, max 2 MB).
Zdroj:
MongoDB 192.168.1.76 db=emaily kolekce=<mailbox>
(krome NON_MAILBOX_COLLECTIONS)
Cil:
PostgreSQL 192.168.1.76 db=MongoEmaily tabulka=emails
tsvector config 'soubory' (sdileny - simple + unaccent)
Inkrementalita:
Pokud (mailbox, message_id) jiz existuje a extractor_version je aktualni
a modified_at v Mongo neni novejsi -> skip. Pri zmene verze extractoru
se vse preparsuje. --index-reset to obejde a smaze PG pred behom.
Spusteni:
python enrich_fulltext_emails_v1.3.py # vsechny schranky
python enrich_fulltext_emails_v1.3.py --mailbox ordinace@buzalkova.cz
python enrich_fulltext_emails_v1.3.py --limit 500 # test
python enrich_fulltext_emails_v1.3.py --mailbox X --index-reset # smaze PG schranky a re-extrahuje vsechno
python enrich_fulltext_emails_v1.3.py --index-reset # smaze CELY index a postavi znovu (POMALE!)
==============================================================================
"""
from __future__ import annotations
import argparse
import re
import sys
import time
import traceback
from datetime import datetime, timezone
from typing import Optional
import psycopg
from bs4 import BeautifulSoup
from pymongo import MongoClient
# --- konfigurace ------------------------------------------------------------
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "emaily"
PG_DSN = ("host=192.168.1.76 port=5432 dbname=MongoEmaily "
"user=vladimir.buzalka password=Vlado7309208104++")
EXTRACTOR_VERSION = "1.2" # NEMENIT pokud nemenis fallback logiku!
MAX_TEXT_BYTES = 5 * 1024 * 1024 # plain text max 5 MB
# Kolekce v `emaily` ktere NEJSOU mailboxy (nezpracovavame)
NON_MAILBOX_COLLECTIONS = {"attachments_index", "sync_state"}
BATCH_SIZE = 100
# --- SCHEMA -----------------------------------------------------------------
SCHEMA_SQL = """
CREATE EXTENSION IF NOT EXISTS unaccent;
CREATE EXTENSION IF NOT EXISTS pg_trgm;
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_ts_config WHERE cfgname = 'soubory') THEN
CREATE TEXT SEARCH CONFIGURATION soubory ( COPY = simple );
ALTER TEXT SEARCH CONFIGURATION soubory
ALTER MAPPING FOR hword, hword_part, word
WITH unaccent, simple;
END IF;
END$$;
CREATE TABLE IF NOT EXISTS emails (
id BIGSERIAL PRIMARY KEY,
mailbox TEXT NOT NULL,
message_id TEXT NOT NULL,
graph_id TEXT,
conversation_id TEXT,
folder_path TEXT,
subject TEXT,
sender_email TEXT,
sender_name TEXT,
to_addrs TEXT,
cc_addrs TEXT,
bcc_addrs TEXT,
sent_at TIMESTAMPTZ,
received_at TIMESTAMPTZ,
modified_at TIMESTAMPTZ,
is_read BOOLEAN,
is_draft BOOLEAN,
has_attachments BOOLEAN,
attachment_count INT,
attachments_summary TEXT,
body TEXT,
body_length INT,
body_source TEXT, -- 'html' | 'preview' | 'empty'
tsv tsvector GENERATED ALWAYS AS (
to_tsvector('soubory'::regconfig,
left(
coalesce(subject, '') || ' ' ||
coalesce(sender_email, '') || ' ' ||
coalesce(sender_name, '') || ' ' ||
coalesce(to_addrs, '') || ' ' ||
coalesce(cc_addrs, '') || ' ' ||
coalesce(attachments_summary, '') || ' ' ||
coalesce(body, ''),
800000)
)
) STORED,
extracted_at TIMESTAMPTZ DEFAULT now(),
extractor_version TEXT,
ok BOOLEAN,
error TEXT,
UNIQUE (mailbox, message_id)
);
CREATE INDEX IF NOT EXISTS emails_tsv_gin ON emails USING gin(tsv);
CREATE INDEX IF NOT EXISTS emails_subject_trgm ON emails USING gin(subject gin_trgm_ops);
CREATE INDEX IF NOT EXISTS emails_sender_email_idx ON emails(sender_email);
CREATE INDEX IF NOT EXISTS emails_mailbox_idx ON emails(mailbox);
CREATE INDEX IF NOT EXISTS emails_received_idx ON emails(received_at DESC);
CREATE INDEX IF NOT EXISTS emails_conv_idx ON emails(conversation_id);
"""
# --- HELPERY ----------------------------------------------------------------
_CTRL_RX = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f]")
_WS_RX = re.compile(r"[ \t]+")
_NL_RX = re.compile(r"\n{3,}")
def _clean_for_pg(s: str) -> str:
if not s:
return ""
return _CTRL_RX.sub("", s)
def _truncate(s: str) -> str:
s = _clean_for_pg(s or "")
if not s:
return ""
b = s.encode("utf-8", errors="replace")
if len(b) <= MAX_TEXT_BYTES:
return s
return b[:MAX_TEXT_BYTES].decode("utf-8", errors="ignore")
def html_to_text(html: str) -> str:
if not html:
return ""
try:
soup = BeautifulSoup(html, "lxml")
except Exception:
soup = BeautifulSoup(html, "html.parser")
for tag in soup(["script", "style", "head"]):
tag.decompose()
text = soup.get_text(separator="\n")
lines = [_WS_RX.sub(" ", ln).strip() for ln in text.split("\n")]
text = "\n".join(ln for ln in lines if ln)
text = _NL_RX.sub("\n\n", text)
return text
def fmt_recipients(recipients: list, kind: str) -> str:
if not recipients:
return ""
out = []
for r in recipients:
if not isinstance(r, dict):
continue
if r.get("type") != kind:
continue
name = (r.get("name") or "").strip()
email = (r.get("email") or "").strip()
if name and email:
out.append(f"{name} <{email}>")
elif email:
out.append(email)
elif name:
out.append(name)
return "; ".join(out)
def fmt_attachments(attachments: list) -> str:
if not attachments:
return ""
out = []
for a in attachments[:20]:
if not isinstance(a, dict):
continue
name = a.get("name") or a.get("filename") or ""
if name:
out.append(name)
return " | ".join(out)
def _short(s, n=60):
if not s:
return ""
s = str(s).replace("\n", " ").strip()
return s if len(s) <= n else s[:n] + "..."
def _now() -> datetime:
return datetime.now(tz=timezone.utc)
def _aware_utc(dt: Optional[datetime]) -> Optional[datetime]:
"""Sjednoceni: PG TIMESTAMPTZ -> tz-aware UTC; Mongo datetime -> naive (UTC).
Vrati tz-aware UTC datetime nebo None."""
if dt is None:
return None
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
# --- HLAVNI SMYCKA ----------------------------------------------------------
def process_mailbox(pg: psycopg.Connection, mongo_coll, mailbox: str,
limit: Optional[int] = None,
index_reset: bool = False) -> dict:
# --index-reset: smaz vse pro tuto schranku v PG
if index_reset:
with pg.cursor() as cur:
cur.execute("DELETE FROM emails WHERE mailbox = %s", (mailbox,))
deleted = cur.rowcount
pg.commit()
print(f"[{mailbox}] --index-reset: smazano {deleted} radku v PG")
# existujici zaznamy v PG (rychly inkrementalni lookup)
# tuple = (extractor_version, ok, body_source)
with pg.cursor() as cur:
cur.execute(
"SELECT message_id, extractor_version, ok, body_source "
"FROM emails WHERE mailbox = %s",
(mailbox,),
)
existing = {row[0]: (row[1], row[2], row[3]) for row in cur.fetchall()}
mongo_total = mongo_coll.estimated_document_count()
pg_total = len(existing)
pg_uptodate = sum(1 for v in existing.values()
if v[0] == EXTRACTOR_VERSION and v[1])
to_process_estimate = mongo_total - pg_uptodate
print(f"\n========== {mailbox} ==========")
print(f" v Mongu: {mongo_total}")
print(f" v PG: {pg_total} (z toho ext_v={EXTRACTOR_VERSION} & ok=true: {pg_uptodate})")
print(f" k zpracovani: ~{to_process_estimate}{' (limit=' + str(limit) + ')' if limit else ''}")
if to_process_estimate <= 0 and not index_reset and not limit:
print(" Nic noveho ke zpracovani.")
return {"mailbox": mailbox, "processed": 0, "ok": 0, "errors": 0,
"skipped": pg_uptodate, "empty_body": 0}
proj = {
"_id": 1, "graph_id": 1, "conversation_id": 1, "folder_path": 1,
"subject": 1, "sender": 1, "recipients": 1,
"sent_at": 1, "received_at": 1, "modified_at": 1,
"is_read": 1, "is_draft": 1,
"has_attachments": 1, "attachment_count": 1, "attachments": 1,
"body_html": 1, "body_text": 1, "body_preview": 1,
"smime_unwrapped": 1, "smime_body_text": 1, "smime_body_html": 1,
"smime_subject": 1, "smime_inner_attachments": 1,
}
cursor = mongo_coll.find({}, proj, no_cursor_timeout=True)
if limit:
cursor = cursor.limit(limit)
processed = ok = errors = skipped = empty_body = 0
queue: list[dict] = []
n = 0
try:
for doc in cursor:
n += 1
msg_id = doc.get("_id") or ""
prev = existing.get(msg_id) # (extractor_version, ok, body_source)
mongo_mtime = doc.get("modified_at")
# Skip kdyz PG ma stejnou EV a ok=true.
# Vyjimka: smime_unwrapped v Mongu, ale PG body_source != 'smime'
# -> unwrap_smime pridal rozbaleny text az po enrichu -> re-enrich.
if prev and prev[0] == EXTRACTOR_VERSION and prev[1]:
needs_smime_reindex = (
bool(doc.get("smime_unwrapped"))
and prev[2] != "smime"
)
if not needs_smime_reindex:
skipped += 1
continue
sender = doc.get("sender") or {}
recipients = doc.get("recipients") or []
attachments = doc.get("attachments") or []
inner = doc.get("smime_inner_attachments") or []
if inner:
attachments = list(attachments) + [
{"filename": (a.get("filename") or "") + " [smime]"}
for a in inner if a.get("filename")
]
row = {
"mailbox": mailbox,
"message_id": msg_id,
"graph_id": doc.get("graph_id"),
"conversation_id": doc.get("conversation_id"),
"folder_path": doc.get("folder_path"),
"subject": doc.get("subject") or "",
"sender_email": sender.get("email"),
"sender_name": sender.get("name"),
"to_addrs": fmt_recipients(recipients, "to"),
"cc_addrs": fmt_recipients(recipients, "cc"),
"bcc_addrs": fmt_recipients(recipients, "bcc"),
# Vsechny timestampy z Monga jsou naive ale interpretovany jako UTC.
# Tagneme je tz-aware aby PG TIMESTAMPTZ ulozil spravnou UTC hodnotu
# a nepocital posun podle session timezone.
"sent_at": _aware_utc(doc.get("sent_at")),
"received_at": _aware_utc(doc.get("received_at")),
"modified_at": _aware_utc(mongo_mtime),
"is_read": doc.get("is_read"),
"is_draft": doc.get("is_draft"),
"has_attachments": doc.get("has_attachments"),
"attachment_count": doc.get("attachment_count"),
"attachments_summary": fmt_attachments(attachments),
"body": None,
"body_length": 0,
"body_source": "empty",
"extracted_at": _now(),
"extractor_version": EXTRACTOR_VERSION,
"ok": False,
"error": None,
}
status = "OK "; detail = ""
try:
text = ""
if doc.get("smime_unwrapped"):
s_text = doc.get("smime_body_text") or ""
s_html = doc.get("smime_body_html") or ""
s_html_text = html_to_text(s_html) if s_html else ""
combined = "\n\n".join(p for p in (s_text, s_html_text) if p)
s_subject = doc.get("smime_subject") or ""
if s_subject:
combined = f"Subject: {s_subject}\n\n{combined}"
if combined:
text = combined
row["body_source"] = "smime"
if not text:
html = doc.get("body_html") or ""
h_text = html_to_text(html) if html else ""
if h_text:
text = h_text
row["body_source"] = "html"
if not text:
plain = doc.get("body_text") or ""
if plain:
text = plain
row["body_source"] = "text"
if not text:
preview = doc.get("body_preview") or ""
if preview:
text = preview
row["body_source"] = "preview"
if not text:
row["body_source"] = "empty"
empty_body += 1
body = _truncate(text)
row["body"] = body if body else None
row["body_length"] = len(body)
row["ok"] = True
ok += 1
detail = f"{len(body)} znaku {_short(body, 60)!r}"
except Exception as e:
row["error"] = f"{type(e).__name__}: {e}"[:500]
status = "ERR"; detail = row["error"][:80]; errors += 1
queue.append(row)
processed += 1
if processed % 200 == 0 or processed == 1:
subj = _short(row["subject"], 50)
print(f" [{n:>6}|p={processed:>5}] {status} {row['body_source']:<7} "
f"{row['body_length']:>7}ch | {subj}", flush=True)
if len(queue) >= BATCH_SIZE:
_flush(pg, queue); queue.clear()
finally:
cursor.close()
if queue:
_flush(pg, queue)
return {"mailbox": mailbox, "processed": processed, "ok": ok,
"errors": errors, "skipped": skipped, "empty_body": empty_body}
UPSERT_SQL = """
INSERT INTO emails
(mailbox, message_id, graph_id, conversation_id, folder_path,
subject, sender_email, sender_name, to_addrs, cc_addrs, bcc_addrs,
sent_at, received_at, modified_at, is_read, is_draft,
has_attachments, attachment_count, attachments_summary,
body, body_length, body_source,
extracted_at, extractor_version, ok, error)
VALUES
(%(mailbox)s, %(message_id)s, %(graph_id)s, %(conversation_id)s, %(folder_path)s,
%(subject)s, %(sender_email)s, %(sender_name)s, %(to_addrs)s, %(cc_addrs)s, %(bcc_addrs)s,
%(sent_at)s, %(received_at)s, %(modified_at)s, %(is_read)s, %(is_draft)s,
%(has_attachments)s, %(attachment_count)s, %(attachments_summary)s,
%(body)s, %(body_length)s, %(body_source)s,
%(extracted_at)s, %(extractor_version)s, %(ok)s, %(error)s)
ON CONFLICT (mailbox, message_id) DO UPDATE SET
graph_id = EXCLUDED.graph_id,
conversation_id = EXCLUDED.conversation_id,
folder_path = EXCLUDED.folder_path,
subject = EXCLUDED.subject,
sender_email = EXCLUDED.sender_email,
sender_name = EXCLUDED.sender_name,
to_addrs = EXCLUDED.to_addrs,
cc_addrs = EXCLUDED.cc_addrs,
bcc_addrs = EXCLUDED.bcc_addrs,
sent_at = EXCLUDED.sent_at,
received_at = EXCLUDED.received_at,
modified_at = EXCLUDED.modified_at,
is_read = EXCLUDED.is_read,
is_draft = EXCLUDED.is_draft,
has_attachments = EXCLUDED.has_attachments,
attachment_count = EXCLUDED.attachment_count,
attachments_summary = EXCLUDED.attachments_summary,
body = EXCLUDED.body,
body_length = EXCLUDED.body_length,
body_source = EXCLUDED.body_source,
extracted_at = EXCLUDED.extracted_at,
extractor_version = EXCLUDED.extractor_version,
ok = EXCLUDED.ok,
error = EXCLUDED.error
"""
def _flush(pg: psycopg.Connection, rows: list[dict]) -> None:
for r in rows:
for k in ("subject", "sender_email", "sender_name", "to_addrs", "cc_addrs",
"bcc_addrs", "attachments_summary", "body", "error", "folder_path"):
if r.get(k):
r[k] = _clean_for_pg(r[k])
with pg.cursor() as cur:
cur.executemany(UPSERT_SQL, rows)
pg.commit()
def discover_mailboxes(db) -> list[str]:
out = []
for name in sorted(db.list_collection_names()):
if name in NON_MAILBOX_COLLECTIONS:
continue
out.append(name)
return out
def main() -> int:
ap = argparse.ArgumentParser(description="enrich_fulltext_emails v1.3")
ap.add_argument("--mailbox", default="",
help="Jedna konkretni schranka. Bez argumentu projede vsechny.")
ap.add_argument("--limit", type=int,
help="Limit emailu na schranku (test)")
ap.add_argument("--index-reset", action="store_true",
help="Pred zpracovanim schranky vymaze vsechny jeji emaily z PG "
"(force re-extract). Bez --mailbox SMAZE CELY index.")
args = ap.parse_args()
t0 = time.time()
print(f"=== enrich_fulltext_emails v1.3 ===")
print(f"Start: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("\nPripojuji se k PostgreSQL...")
pg = psycopg.connect(PG_DSN, connect_timeout=10)
with pg.cursor() as cur:
cur.execute(SCHEMA_SQL)
pg.commit()
print(" Schema OK.")
print("Pripojuji se k MongoDB...")
mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
mongo.admin.command("ping")
db = mongo[MONGO_DB]
print(" MongoDB OK.")
if args.mailbox:
mailboxes = [args.mailbox]
else:
mailboxes = discover_mailboxes(db)
print(f"\nSchranky ke zpracovani ({len(mailboxes)}):")
for mb in mailboxes:
print(f" - {mb}")
if args.index_reset and not args.mailbox:
print(f"\n!!! --index-reset bez --mailbox => SMAZE CELY INDEX ({len(mailboxes)} schranek) !!!")
results = []
for mb in mailboxes:
try:
results.append(process_mailbox(pg, db[mb], mb,
limit=args.limit,
index_reset=args.index_reset))
except Exception as e:
traceback.print_exc()
print(f" FATAL pri zpracovani {mb}: {e}")
results.append({"mailbox": mb, "processed": 0, "ok": 0,
"errors": 1, "skipped": 0, "empty_body": 0})
pg.close()
print("\n" + "="*60)
print("=== SHRNUTI ===")
grand = {"processed": 0, "ok": 0, "errors": 0, "skipped": 0, "empty_body": 0}
for r in results:
print(f" {r['mailbox']:40} processed={r['processed']:>5} ok={r['ok']:>5} "
f"errors={r['errors']:>3} skipped={r['skipped']:>6} empty={r['empty_body']:>4}")
for k in grand:
grand[k] += r.get(k, 0)
print(f" {'TOTAL':40} processed={grand['processed']:>5} ok={grand['ok']:>5} "
f"errors={grand['errors']:>3} skipped={grand['skipped']:>6} empty={grand['empty_body']:>4}")
print(f"\nCelkem trvalo: {time.time() - t0:.1f} s")
print(f"Konec: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# exit code: 0 jen kdyz vsechny schranky probehly bez chyby
return 1 if grand["errors"] > 0 else 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except KeyboardInterrupt:
print("\nPreruseno uzivatelem")
except Exception:
traceback.print_exc()
sys.exit(1)