Files
janssen/Python-runner/Trash/enrich_fulltext_emails_v1.1.py
T
2026-06-05 21:21:30 +02:00

456 lines
16 KiB
Python

"""
==============================================================================
Skript: enrich_fulltext_emails_v1.1.py
Verze: 1.1
Datum: 2026-06-03
Autor: vladimir.buzalka
Popis:
Vytahne plny text z emailu ulozenych v MongoDB (db: emaily) a ulozi ho do
PostgreSQL (db: MongoEmaily, tabulka: emails) s GIN tsvector indexem.
Emaily se NESTAHUJI znovu - tela uz jsou v Mongo z parse_emails_graph_v1.4
(a refetch_text_bodies_v1.0 pro stare plain-text emaily).
Tento skript jen vybere prvni dostupne telo a posle text do PG na fulltext.
Zmeny proti v1.0:
- Fallback poradi rozsireno: body_html -> body_text (novy v parse_emails_graph_v1.4)
-> body_preview -> empty. Drive bylo body_html -> body_preview.
- body_source umi novou hodnotu "text" (plne plain-text telo, max 2 MB).
- EXTRACTOR_VERSION=1.1 -> vsechny existujici emaily v PG se preparsuji.
Zdroj:
MongoDB 192.168.1.76 db=emaily kolekce=<mailbox>
(krome attachments_index)
Cil:
PostgreSQL 192.168.1.76 db=MongoEmaily tabulka=emails
tsvector config 'soubory' (sdileny - simple + unaccent)
Inkrementalita:
Pokud (mailbox, message_id) jiz existuje a extractor_version je aktualni
a modified_at v Mongo neni novejsi -> skip. Pri zmene verze extractoru
se vse preparsuje.
Spusteni:
python enrich_fulltext_emails_v1.0.py # vsechny schranky
python enrich_fulltext_emails_v1.0.py --mailbox vbuzalka@its.jnj.com
python enrich_fulltext_emails_v1.0.py --limit 500 # test
==============================================================================
"""
from __future__ import annotations
import argparse
import re
import sys
import time
import traceback
from datetime import datetime, timezone
from typing import Optional
import psycopg
from bs4 import BeautifulSoup
from pymongo import MongoClient
# --- konfigurace ------------------------------------------------------------
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "emaily"
PG_DSN = ("host=192.168.1.76 port=5432 dbname=MongoEmaily "
"user=vladimir.buzalka password=Vlado7309208104++")
EXTRACTOR_VERSION = "1.1"
MAX_TEXT_BYTES = 5 * 1024 * 1024 # plain text max 5 MB
SKIP_COLLECTIONS = {"attachments_index"}
BATCH_SIZE = 100
# --- SCHEMA -----------------------------------------------------------------
SCHEMA_SQL = """
CREATE EXTENSION IF NOT EXISTS unaccent;
CREATE EXTENSION IF NOT EXISTS pg_trgm;
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_ts_config WHERE cfgname = 'soubory') THEN
CREATE TEXT SEARCH CONFIGURATION soubory ( COPY = simple );
ALTER TEXT SEARCH CONFIGURATION soubory
ALTER MAPPING FOR hword, hword_part, word
WITH unaccent, simple;
END IF;
END$$;
CREATE TABLE IF NOT EXISTS emails (
id BIGSERIAL PRIMARY KEY,
mailbox TEXT NOT NULL,
message_id TEXT NOT NULL,
graph_id TEXT,
conversation_id TEXT,
folder_path TEXT,
subject TEXT,
sender_email TEXT,
sender_name TEXT,
to_addrs TEXT,
cc_addrs TEXT,
bcc_addrs TEXT,
sent_at TIMESTAMPTZ,
received_at TIMESTAMPTZ,
modified_at TIMESTAMPTZ,
is_read BOOLEAN,
is_draft BOOLEAN,
has_attachments BOOLEAN,
attachment_count INT,
attachments_summary TEXT,
body TEXT,
body_length INT,
body_source TEXT, -- 'html' | 'preview' | 'empty'
tsv tsvector GENERATED ALWAYS AS (
to_tsvector('soubory'::regconfig,
left(
coalesce(subject, '') || ' ' ||
coalesce(sender_email, '') || ' ' ||
coalesce(sender_name, '') || ' ' ||
coalesce(to_addrs, '') || ' ' ||
coalesce(cc_addrs, '') || ' ' ||
coalesce(attachments_summary, '') || ' ' ||
coalesce(body, ''),
800000)
)
) STORED,
extracted_at TIMESTAMPTZ DEFAULT now(),
extractor_version TEXT,
ok BOOLEAN,
error TEXT,
UNIQUE (mailbox, message_id)
);
CREATE INDEX IF NOT EXISTS emails_tsv_gin ON emails USING gin(tsv);
CREATE INDEX IF NOT EXISTS emails_subject_trgm ON emails USING gin(subject gin_trgm_ops);
CREATE INDEX IF NOT EXISTS emails_sender_email_idx ON emails(sender_email);
CREATE INDEX IF NOT EXISTS emails_mailbox_idx ON emails(mailbox);
CREATE INDEX IF NOT EXISTS emails_received_idx ON emails(received_at DESC);
CREATE INDEX IF NOT EXISTS emails_conv_idx ON emails(conversation_id);
"""
# --- HELPERY ----------------------------------------------------------------
_CTRL_RX = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f]")
_WS_RX = re.compile(r"[ \t]+")
_NL_RX = re.compile(r"\n{3,}")
def _clean_for_pg(s: str) -> str:
if not s:
return ""
return _CTRL_RX.sub("", s)
def _truncate(s: str) -> str:
s = _clean_for_pg(s or "")
if not s:
return ""
b = s.encode("utf-8", errors="replace")
if len(b) <= MAX_TEXT_BYTES:
return s
return b[:MAX_TEXT_BYTES].decode("utf-8", errors="ignore")
def html_to_text(html: str) -> str:
"""Extrahuje plain text z HTML emailu. Odstrani <script>, <style>, normalizuje whitespace."""
if not html:
return ""
try:
soup = BeautifulSoup(html, "lxml")
except Exception:
soup = BeautifulSoup(html, "html.parser")
for tag in soup(["script", "style", "head"]):
tag.decompose()
text = soup.get_text(separator="\n")
# normalizace whitespace
lines = [_WS_RX.sub(" ", ln).strip() for ln in text.split("\n")]
text = "\n".join(ln for ln in lines if ln)
text = _NL_RX.sub("\n\n", text)
return text
def fmt_recipients(recipients: list, kind: str) -> str:
"""Sloupec to_addrs/cc_addrs/bcc_addrs - 'Jmeno <email>; Jmeno2 <email2>'."""
if not recipients:
return ""
out = []
for r in recipients:
if not isinstance(r, dict):
continue
if r.get("type") != kind:
continue
name = (r.get("name") or "").strip()
email = (r.get("email") or "").strip()
if name and email:
out.append(f"{name} <{email}>")
elif email:
out.append(email)
elif name:
out.append(name)
return "; ".join(out)
def fmt_attachments(attachments: list) -> str:
if not attachments:
return ""
out = []
for a in attachments[:20]:
if not isinstance(a, dict):
continue
name = a.get("name") or a.get("filename") or ""
if name:
out.append(name)
return " | ".join(out)
def _short(s, n=60):
if not s:
return ""
s = str(s).replace("\n", " ").strip()
return s if len(s) <= n else s[:n] + "..."
def _now() -> datetime:
return datetime.now(tz=timezone.utc)
# --- HLAVNI SMYCKA ----------------------------------------------------------
def process_mailbox(pg: psycopg.Connection, mongo_coll, mailbox: str,
limit: Optional[int] = None) -> dict:
# existujici zaznamy v PG (rychly inkrementalni lookup)
with pg.cursor() as cur:
cur.execute(
"SELECT message_id, extractor_version, modified_at, ok "
"FROM emails WHERE mailbox = %s",
(mailbox,),
)
existing = {row[0]: (row[1], row[2], row[3]) for row in cur.fetchall()}
proj = {
"_id": 1, "graph_id": 1, "conversation_id": 1, "folder_path": 1,
"subject": 1, "sender": 1, "recipients": 1,
"sent_at": 1, "received_at": 1, "modified_at": 1,
"is_read": 1, "is_draft": 1,
"has_attachments": 1, "attachment_count": 1, "attachments": 1,
"body_html": 1, "body_text": 1, "body_preview": 1,
}
cursor = mongo_coll.find({}, proj, no_cursor_timeout=True)
if limit:
cursor = cursor.limit(limit)
total_pending = limit or mongo_coll.estimated_document_count()
print(f"[{mailbox}] kandidatu: ~{total_pending}")
processed = ok = errors = skipped = empty_body = 0
queue: list[dict] = []
n = 0
try:
for doc in cursor:
n += 1
msg_id = doc.get("_id") or ""
prev = existing.get(msg_id)
mongo_mtime = doc.get("modified_at")
if (prev and prev[0] == EXTRACTOR_VERSION and prev[2]
and (mongo_mtime is None
or (prev[1] and prev[1] >= mongo_mtime))):
skipped += 1
continue
sender = doc.get("sender") or {}
recipients = doc.get("recipients") or []
attachments = doc.get("attachments") or []
row = {
"mailbox": mailbox,
"message_id": msg_id,
"graph_id": doc.get("graph_id"),
"conversation_id": doc.get("conversation_id"),
"folder_path": doc.get("folder_path"),
"subject": doc.get("subject") or "",
"sender_email": sender.get("email"),
"sender_name": sender.get("name"),
"to_addrs": fmt_recipients(recipients, "to"),
"cc_addrs": fmt_recipients(recipients, "cc"),
"bcc_addrs": fmt_recipients(recipients, "bcc"),
"sent_at": doc.get("sent_at"),
"received_at": doc.get("received_at"),
"modified_at": mongo_mtime,
"is_read": doc.get("is_read"),
"is_draft": doc.get("is_draft"),
"has_attachments": doc.get("has_attachments"),
"attachment_count": doc.get("attachment_count"),
"attachments_summary": fmt_attachments(attachments),
"body": None,
"body_length": 0,
"body_source": "empty",
"extracted_at": _now(),
"extractor_version": EXTRACTOR_VERSION,
"ok": False,
"error": None,
}
status = "OK "; detail = ""
try:
# fallback poradi (v1.1): body_html -> body_text -> body_preview
html = doc.get("body_html") or ""
text = html_to_text(html) if html else ""
if text:
row["body_source"] = "html"
else:
plain = doc.get("body_text") or ""
if plain:
text = plain
row["body_source"] = "text"
else:
preview = doc.get("body_preview") or ""
if preview:
text = preview
row["body_source"] = "preview"
else:
row["body_source"] = "empty"
empty_body += 1
body = _truncate(text)
row["body"] = body if body else None
row["body_length"] = len(body)
row["ok"] = True
ok += 1
detail = f"{len(body)} znaku {_short(body, 60)!r}"
except Exception as e:
row["error"] = f"{type(e).__name__}: {e}"[:500]
status = "ERR"; detail = row["error"][:80]; errors += 1
queue.append(row)
processed += 1
if n % 200 == 0 or n == 1:
subj = _short(row["subject"], 50)
print(f" [{n:>5}] {status} {row['body_source']:<7} "
f"{row['body_length']:>7}ch | {subj}", flush=True)
if len(queue) >= BATCH_SIZE:
_flush(pg, queue); queue.clear()
finally:
cursor.close()
if queue:
_flush(pg, queue)
return {"mailbox": mailbox, "processed": processed, "ok": ok,
"errors": errors, "skipped": skipped, "empty_body": empty_body}
UPSERT_SQL = """
INSERT INTO emails
(mailbox, message_id, graph_id, conversation_id, folder_path,
subject, sender_email, sender_name, to_addrs, cc_addrs, bcc_addrs,
sent_at, received_at, modified_at, is_read, is_draft,
has_attachments, attachment_count, attachments_summary,
body, body_length, body_source,
extracted_at, extractor_version, ok, error)
VALUES
(%(mailbox)s, %(message_id)s, %(graph_id)s, %(conversation_id)s, %(folder_path)s,
%(subject)s, %(sender_email)s, %(sender_name)s, %(to_addrs)s, %(cc_addrs)s, %(bcc_addrs)s,
%(sent_at)s, %(received_at)s, %(modified_at)s, %(is_read)s, %(is_draft)s,
%(has_attachments)s, %(attachment_count)s, %(attachments_summary)s,
%(body)s, %(body_length)s, %(body_source)s,
%(extracted_at)s, %(extractor_version)s, %(ok)s, %(error)s)
ON CONFLICT (mailbox, message_id) DO UPDATE SET
graph_id = EXCLUDED.graph_id,
conversation_id = EXCLUDED.conversation_id,
folder_path = EXCLUDED.folder_path,
subject = EXCLUDED.subject,
sender_email = EXCLUDED.sender_email,
sender_name = EXCLUDED.sender_name,
to_addrs = EXCLUDED.to_addrs,
cc_addrs = EXCLUDED.cc_addrs,
bcc_addrs = EXCLUDED.bcc_addrs,
sent_at = EXCLUDED.sent_at,
received_at = EXCLUDED.received_at,
modified_at = EXCLUDED.modified_at,
is_read = EXCLUDED.is_read,
is_draft = EXCLUDED.is_draft,
has_attachments = EXCLUDED.has_attachments,
attachment_count = EXCLUDED.attachment_count,
attachments_summary = EXCLUDED.attachments_summary,
body = EXCLUDED.body,
body_length = EXCLUDED.body_length,
body_source = EXCLUDED.body_source,
extracted_at = EXCLUDED.extracted_at,
extractor_version = EXCLUDED.extractor_version,
ok = EXCLUDED.ok,
error = EXCLUDED.error
"""
def _flush(pg: psycopg.Connection, rows: list[dict]) -> None:
for r in rows:
for k in ("subject", "sender_email", "sender_name", "to_addrs", "cc_addrs",
"bcc_addrs", "attachments_summary", "body", "error", "folder_path"):
if r.get(k):
r[k] = _clean_for_pg(r[k])
with pg.cursor() as cur:
cur.executemany(UPSERT_SQL, rows)
pg.commit()
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--mailbox", help="Jedna konkretni schranka (default: vsechny)")
ap.add_argument("--limit", type=int, help="Limit emailu na schranku (test)")
args = ap.parse_args()
t0 = time.time()
print("Pripojuji se k PostgreSQL...")
# MongoEmaily DB musi existovat (create externe pres psql nebo DBeaver),
# protoze CREATE DATABASE nesmi byt v transakci.
pg = psycopg.connect(PG_DSN, connect_timeout=10)
with pg.cursor() as cur:
cur.execute(SCHEMA_SQL)
pg.commit()
print("Schema OK.")
print("Pripojuji se k MongoDB...")
mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
mongo.admin.command("ping")
db = mongo[MONGO_DB]
if args.mailbox:
mailboxes = [args.mailbox]
else:
mailboxes = [c for c in db.list_collection_names() if c not in SKIP_COLLECTIONS]
print(f"Schranky ({len(mailboxes)}): {mailboxes}")
results = []
for mb in mailboxes:
results.append(process_mailbox(pg, db[mb], mb, limit=args.limit))
pg.close()
print("\n=== SHRNUTI ===")
for r in results:
print(f" {r['mailbox']}: processed={r['processed']} ok={r['ok']} "
f"errors={r['errors']} skipped={r['skipped']} empty={r['empty_body']}")
print(f"\nCelkem trvalo: {time.time() - t0:.1f} s")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except KeyboardInterrupt:
print("\nPreruseno uzivatelem")
except Exception:
traceback.print_exc()
sys.exit(1)