456 lines
16 KiB
Python
456 lines
16 KiB
Python
"""
|
|
==============================================================================
|
|
Skript: enrich_fulltext_emails_v1.1.py
|
|
Verze: 1.1
|
|
Datum: 2026-06-03
|
|
Autor: vladimir.buzalka
|
|
|
|
Popis:
|
|
Vytahne plny text z emailu ulozenych v MongoDB (db: emaily) a ulozi ho do
|
|
PostgreSQL (db: MongoEmaily, tabulka: emails) s GIN tsvector indexem.
|
|
|
|
Emaily se NESTAHUJI znovu - tela uz jsou v Mongo z parse_emails_graph_v1.4
|
|
(a refetch_text_bodies_v1.0 pro stare plain-text emaily).
|
|
Tento skript jen vybere prvni dostupne telo a posle text do PG na fulltext.
|
|
|
|
Zmeny proti v1.0:
|
|
- Fallback poradi rozsireno: body_html -> body_text (novy v parse_emails_graph_v1.4)
|
|
-> body_preview -> empty. Drive bylo body_html -> body_preview.
|
|
- body_source umi novou hodnotu "text" (plne plain-text telo, max 2 MB).
|
|
- EXTRACTOR_VERSION=1.1 -> vsechny existujici emaily v PG se preparsuji.
|
|
|
|
Zdroj:
|
|
MongoDB 192.168.1.76 db=emaily kolekce=<mailbox>
|
|
(krome attachments_index)
|
|
|
|
Cil:
|
|
PostgreSQL 192.168.1.76 db=MongoEmaily tabulka=emails
|
|
tsvector config 'soubory' (sdileny - simple + unaccent)
|
|
|
|
Inkrementalita:
|
|
Pokud (mailbox, message_id) jiz existuje a extractor_version je aktualni
|
|
a modified_at v Mongo neni novejsi -> skip. Pri zmene verze extractoru
|
|
se vse preparsuje.
|
|
|
|
Spusteni:
|
|
python enrich_fulltext_emails_v1.0.py # vsechny schranky
|
|
python enrich_fulltext_emails_v1.0.py --mailbox vbuzalka@its.jnj.com
|
|
python enrich_fulltext_emails_v1.0.py --limit 500 # test
|
|
==============================================================================
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import re
|
|
import sys
|
|
import time
|
|
import traceback
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
import psycopg
|
|
from bs4 import BeautifulSoup
|
|
from pymongo import MongoClient
|
|
|
|
# --- konfigurace ------------------------------------------------------------
|
|
MONGO_URI = "mongodb://192.168.1.76:27017"
|
|
MONGO_DB = "emaily"
|
|
|
|
PG_DSN = ("host=192.168.1.76 port=5432 dbname=MongoEmaily "
|
|
"user=vladimir.buzalka password=Vlado7309208104++")
|
|
|
|
EXTRACTOR_VERSION = "1.1"
|
|
|
|
MAX_TEXT_BYTES = 5 * 1024 * 1024 # plain text max 5 MB
|
|
SKIP_COLLECTIONS = {"attachments_index"}
|
|
|
|
BATCH_SIZE = 100
|
|
|
|
|
|
# --- SCHEMA -----------------------------------------------------------------
|
|
|
|
SCHEMA_SQL = """
|
|
CREATE EXTENSION IF NOT EXISTS unaccent;
|
|
CREATE EXTENSION IF NOT EXISTS pg_trgm;
|
|
|
|
DO $$
|
|
BEGIN
|
|
IF NOT EXISTS (SELECT 1 FROM pg_ts_config WHERE cfgname = 'soubory') THEN
|
|
CREATE TEXT SEARCH CONFIGURATION soubory ( COPY = simple );
|
|
ALTER TEXT SEARCH CONFIGURATION soubory
|
|
ALTER MAPPING FOR hword, hword_part, word
|
|
WITH unaccent, simple;
|
|
END IF;
|
|
END$$;
|
|
|
|
CREATE TABLE IF NOT EXISTS emails (
|
|
id BIGSERIAL PRIMARY KEY,
|
|
mailbox TEXT NOT NULL,
|
|
message_id TEXT NOT NULL,
|
|
graph_id TEXT,
|
|
conversation_id TEXT,
|
|
folder_path TEXT,
|
|
subject TEXT,
|
|
sender_email TEXT,
|
|
sender_name TEXT,
|
|
to_addrs TEXT,
|
|
cc_addrs TEXT,
|
|
bcc_addrs TEXT,
|
|
sent_at TIMESTAMPTZ,
|
|
received_at TIMESTAMPTZ,
|
|
modified_at TIMESTAMPTZ,
|
|
is_read BOOLEAN,
|
|
is_draft BOOLEAN,
|
|
has_attachments BOOLEAN,
|
|
attachment_count INT,
|
|
attachments_summary TEXT,
|
|
body TEXT,
|
|
body_length INT,
|
|
body_source TEXT, -- 'html' | 'preview' | 'empty'
|
|
tsv tsvector GENERATED ALWAYS AS (
|
|
to_tsvector('soubory'::regconfig,
|
|
left(
|
|
coalesce(subject, '') || ' ' ||
|
|
coalesce(sender_email, '') || ' ' ||
|
|
coalesce(sender_name, '') || ' ' ||
|
|
coalesce(to_addrs, '') || ' ' ||
|
|
coalesce(cc_addrs, '') || ' ' ||
|
|
coalesce(attachments_summary, '') || ' ' ||
|
|
coalesce(body, ''),
|
|
800000)
|
|
)
|
|
) STORED,
|
|
extracted_at TIMESTAMPTZ DEFAULT now(),
|
|
extractor_version TEXT,
|
|
ok BOOLEAN,
|
|
error TEXT,
|
|
UNIQUE (mailbox, message_id)
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS emails_tsv_gin ON emails USING gin(tsv);
|
|
CREATE INDEX IF NOT EXISTS emails_subject_trgm ON emails USING gin(subject gin_trgm_ops);
|
|
CREATE INDEX IF NOT EXISTS emails_sender_email_idx ON emails(sender_email);
|
|
CREATE INDEX IF NOT EXISTS emails_mailbox_idx ON emails(mailbox);
|
|
CREATE INDEX IF NOT EXISTS emails_received_idx ON emails(received_at DESC);
|
|
CREATE INDEX IF NOT EXISTS emails_conv_idx ON emails(conversation_id);
|
|
"""
|
|
|
|
|
|
# --- HELPERY ----------------------------------------------------------------
|
|
|
|
_CTRL_RX = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f]")
|
|
_WS_RX = re.compile(r"[ \t]+")
|
|
_NL_RX = re.compile(r"\n{3,}")
|
|
|
|
|
|
def _clean_for_pg(s: str) -> str:
|
|
if not s:
|
|
return ""
|
|
return _CTRL_RX.sub("", s)
|
|
|
|
|
|
def _truncate(s: str) -> str:
|
|
s = _clean_for_pg(s or "")
|
|
if not s:
|
|
return ""
|
|
b = s.encode("utf-8", errors="replace")
|
|
if len(b) <= MAX_TEXT_BYTES:
|
|
return s
|
|
return b[:MAX_TEXT_BYTES].decode("utf-8", errors="ignore")
|
|
|
|
|
|
def html_to_text(html: str) -> str:
|
|
"""Extrahuje plain text z HTML emailu. Odstrani <script>, <style>, normalizuje whitespace."""
|
|
if not html:
|
|
return ""
|
|
try:
|
|
soup = BeautifulSoup(html, "lxml")
|
|
except Exception:
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
for tag in soup(["script", "style", "head"]):
|
|
tag.decompose()
|
|
text = soup.get_text(separator="\n")
|
|
# normalizace whitespace
|
|
lines = [_WS_RX.sub(" ", ln).strip() for ln in text.split("\n")]
|
|
text = "\n".join(ln for ln in lines if ln)
|
|
text = _NL_RX.sub("\n\n", text)
|
|
return text
|
|
|
|
|
|
def fmt_recipients(recipients: list, kind: str) -> str:
|
|
"""Sloupec to_addrs/cc_addrs/bcc_addrs - 'Jmeno <email>; Jmeno2 <email2>'."""
|
|
if not recipients:
|
|
return ""
|
|
out = []
|
|
for r in recipients:
|
|
if not isinstance(r, dict):
|
|
continue
|
|
if r.get("type") != kind:
|
|
continue
|
|
name = (r.get("name") or "").strip()
|
|
email = (r.get("email") or "").strip()
|
|
if name and email:
|
|
out.append(f"{name} <{email}>")
|
|
elif email:
|
|
out.append(email)
|
|
elif name:
|
|
out.append(name)
|
|
return "; ".join(out)
|
|
|
|
|
|
def fmt_attachments(attachments: list) -> str:
|
|
if not attachments:
|
|
return ""
|
|
out = []
|
|
for a in attachments[:20]:
|
|
if not isinstance(a, dict):
|
|
continue
|
|
name = a.get("name") or a.get("filename") or ""
|
|
if name:
|
|
out.append(name)
|
|
return " | ".join(out)
|
|
|
|
|
|
def _short(s, n=60):
|
|
if not s:
|
|
return ""
|
|
s = str(s).replace("\n", " ").strip()
|
|
return s if len(s) <= n else s[:n] + "..."
|
|
|
|
|
|
def _now() -> datetime:
|
|
return datetime.now(tz=timezone.utc)
|
|
|
|
|
|
# --- HLAVNI SMYCKA ----------------------------------------------------------
|
|
|
|
def process_mailbox(pg: psycopg.Connection, mongo_coll, mailbox: str,
|
|
limit: Optional[int] = None) -> dict:
|
|
# existujici zaznamy v PG (rychly inkrementalni lookup)
|
|
with pg.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT message_id, extractor_version, modified_at, ok "
|
|
"FROM emails WHERE mailbox = %s",
|
|
(mailbox,),
|
|
)
|
|
existing = {row[0]: (row[1], row[2], row[3]) for row in cur.fetchall()}
|
|
|
|
proj = {
|
|
"_id": 1, "graph_id": 1, "conversation_id": 1, "folder_path": 1,
|
|
"subject": 1, "sender": 1, "recipients": 1,
|
|
"sent_at": 1, "received_at": 1, "modified_at": 1,
|
|
"is_read": 1, "is_draft": 1,
|
|
"has_attachments": 1, "attachment_count": 1, "attachments": 1,
|
|
"body_html": 1, "body_text": 1, "body_preview": 1,
|
|
}
|
|
cursor = mongo_coll.find({}, proj, no_cursor_timeout=True)
|
|
if limit:
|
|
cursor = cursor.limit(limit)
|
|
|
|
total_pending = limit or mongo_coll.estimated_document_count()
|
|
print(f"[{mailbox}] kandidatu: ~{total_pending}")
|
|
|
|
processed = ok = errors = skipped = empty_body = 0
|
|
queue: list[dict] = []
|
|
n = 0
|
|
|
|
try:
|
|
for doc in cursor:
|
|
n += 1
|
|
msg_id = doc.get("_id") or ""
|
|
prev = existing.get(msg_id)
|
|
mongo_mtime = doc.get("modified_at")
|
|
if (prev and prev[0] == EXTRACTOR_VERSION and prev[2]
|
|
and (mongo_mtime is None
|
|
or (prev[1] and prev[1] >= mongo_mtime))):
|
|
skipped += 1
|
|
continue
|
|
|
|
sender = doc.get("sender") or {}
|
|
recipients = doc.get("recipients") or []
|
|
attachments = doc.get("attachments") or []
|
|
|
|
row = {
|
|
"mailbox": mailbox,
|
|
"message_id": msg_id,
|
|
"graph_id": doc.get("graph_id"),
|
|
"conversation_id": doc.get("conversation_id"),
|
|
"folder_path": doc.get("folder_path"),
|
|
"subject": doc.get("subject") or "",
|
|
"sender_email": sender.get("email"),
|
|
"sender_name": sender.get("name"),
|
|
"to_addrs": fmt_recipients(recipients, "to"),
|
|
"cc_addrs": fmt_recipients(recipients, "cc"),
|
|
"bcc_addrs": fmt_recipients(recipients, "bcc"),
|
|
"sent_at": doc.get("sent_at"),
|
|
"received_at": doc.get("received_at"),
|
|
"modified_at": mongo_mtime,
|
|
"is_read": doc.get("is_read"),
|
|
"is_draft": doc.get("is_draft"),
|
|
"has_attachments": doc.get("has_attachments"),
|
|
"attachment_count": doc.get("attachment_count"),
|
|
"attachments_summary": fmt_attachments(attachments),
|
|
"body": None,
|
|
"body_length": 0,
|
|
"body_source": "empty",
|
|
"extracted_at": _now(),
|
|
"extractor_version": EXTRACTOR_VERSION,
|
|
"ok": False,
|
|
"error": None,
|
|
}
|
|
|
|
status = "OK "; detail = ""
|
|
try:
|
|
# fallback poradi (v1.1): body_html -> body_text -> body_preview
|
|
html = doc.get("body_html") or ""
|
|
text = html_to_text(html) if html else ""
|
|
if text:
|
|
row["body_source"] = "html"
|
|
else:
|
|
plain = doc.get("body_text") or ""
|
|
if plain:
|
|
text = plain
|
|
row["body_source"] = "text"
|
|
else:
|
|
preview = doc.get("body_preview") or ""
|
|
if preview:
|
|
text = preview
|
|
row["body_source"] = "preview"
|
|
else:
|
|
row["body_source"] = "empty"
|
|
empty_body += 1
|
|
body = _truncate(text)
|
|
row["body"] = body if body else None
|
|
row["body_length"] = len(body)
|
|
row["ok"] = True
|
|
ok += 1
|
|
detail = f"{len(body)} znaku {_short(body, 60)!r}"
|
|
except Exception as e:
|
|
row["error"] = f"{type(e).__name__}: {e}"[:500]
|
|
status = "ERR"; detail = row["error"][:80]; errors += 1
|
|
|
|
queue.append(row)
|
|
processed += 1
|
|
|
|
if n % 200 == 0 or n == 1:
|
|
subj = _short(row["subject"], 50)
|
|
print(f" [{n:>5}] {status} {row['body_source']:<7} "
|
|
f"{row['body_length']:>7}ch | {subj}", flush=True)
|
|
|
|
if len(queue) >= BATCH_SIZE:
|
|
_flush(pg, queue); queue.clear()
|
|
finally:
|
|
cursor.close()
|
|
|
|
if queue:
|
|
_flush(pg, queue)
|
|
|
|
return {"mailbox": mailbox, "processed": processed, "ok": ok,
|
|
"errors": errors, "skipped": skipped, "empty_body": empty_body}
|
|
|
|
|
|
UPSERT_SQL = """
|
|
INSERT INTO emails
|
|
(mailbox, message_id, graph_id, conversation_id, folder_path,
|
|
subject, sender_email, sender_name, to_addrs, cc_addrs, bcc_addrs,
|
|
sent_at, received_at, modified_at, is_read, is_draft,
|
|
has_attachments, attachment_count, attachments_summary,
|
|
body, body_length, body_source,
|
|
extracted_at, extractor_version, ok, error)
|
|
VALUES
|
|
(%(mailbox)s, %(message_id)s, %(graph_id)s, %(conversation_id)s, %(folder_path)s,
|
|
%(subject)s, %(sender_email)s, %(sender_name)s, %(to_addrs)s, %(cc_addrs)s, %(bcc_addrs)s,
|
|
%(sent_at)s, %(received_at)s, %(modified_at)s, %(is_read)s, %(is_draft)s,
|
|
%(has_attachments)s, %(attachment_count)s, %(attachments_summary)s,
|
|
%(body)s, %(body_length)s, %(body_source)s,
|
|
%(extracted_at)s, %(extractor_version)s, %(ok)s, %(error)s)
|
|
ON CONFLICT (mailbox, message_id) DO UPDATE SET
|
|
graph_id = EXCLUDED.graph_id,
|
|
conversation_id = EXCLUDED.conversation_id,
|
|
folder_path = EXCLUDED.folder_path,
|
|
subject = EXCLUDED.subject,
|
|
sender_email = EXCLUDED.sender_email,
|
|
sender_name = EXCLUDED.sender_name,
|
|
to_addrs = EXCLUDED.to_addrs,
|
|
cc_addrs = EXCLUDED.cc_addrs,
|
|
bcc_addrs = EXCLUDED.bcc_addrs,
|
|
sent_at = EXCLUDED.sent_at,
|
|
received_at = EXCLUDED.received_at,
|
|
modified_at = EXCLUDED.modified_at,
|
|
is_read = EXCLUDED.is_read,
|
|
is_draft = EXCLUDED.is_draft,
|
|
has_attachments = EXCLUDED.has_attachments,
|
|
attachment_count = EXCLUDED.attachment_count,
|
|
attachments_summary = EXCLUDED.attachments_summary,
|
|
body = EXCLUDED.body,
|
|
body_length = EXCLUDED.body_length,
|
|
body_source = EXCLUDED.body_source,
|
|
extracted_at = EXCLUDED.extracted_at,
|
|
extractor_version = EXCLUDED.extractor_version,
|
|
ok = EXCLUDED.ok,
|
|
error = EXCLUDED.error
|
|
"""
|
|
|
|
|
|
def _flush(pg: psycopg.Connection, rows: list[dict]) -> None:
|
|
for r in rows:
|
|
for k in ("subject", "sender_email", "sender_name", "to_addrs", "cc_addrs",
|
|
"bcc_addrs", "attachments_summary", "body", "error", "folder_path"):
|
|
if r.get(k):
|
|
r[k] = _clean_for_pg(r[k])
|
|
with pg.cursor() as cur:
|
|
cur.executemany(UPSERT_SQL, rows)
|
|
pg.commit()
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--mailbox", help="Jedna konkretni schranka (default: vsechny)")
|
|
ap.add_argument("--limit", type=int, help="Limit emailu na schranku (test)")
|
|
args = ap.parse_args()
|
|
|
|
t0 = time.time()
|
|
print("Pripojuji se k PostgreSQL...")
|
|
# MongoEmaily DB musi existovat (create externe pres psql nebo DBeaver),
|
|
# protoze CREATE DATABASE nesmi byt v transakci.
|
|
pg = psycopg.connect(PG_DSN, connect_timeout=10)
|
|
with pg.cursor() as cur:
|
|
cur.execute(SCHEMA_SQL)
|
|
pg.commit()
|
|
print("Schema OK.")
|
|
|
|
print("Pripojuji se k MongoDB...")
|
|
mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
|
|
mongo.admin.command("ping")
|
|
db = mongo[MONGO_DB]
|
|
|
|
if args.mailbox:
|
|
mailboxes = [args.mailbox]
|
|
else:
|
|
mailboxes = [c for c in db.list_collection_names() if c not in SKIP_COLLECTIONS]
|
|
print(f"Schranky ({len(mailboxes)}): {mailboxes}")
|
|
|
|
results = []
|
|
for mb in mailboxes:
|
|
results.append(process_mailbox(pg, db[mb], mb, limit=args.limit))
|
|
|
|
pg.close()
|
|
|
|
print("\n=== SHRNUTI ===")
|
|
for r in results:
|
|
print(f" {r['mailbox']}: processed={r['processed']} ok={r['ok']} "
|
|
f"errors={r['errors']} skipped={r['skipped']} empty={r['empty_body']}")
|
|
print(f"\nCelkem trvalo: {time.time() - t0:.1f} s")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
raise SystemExit(main())
|
|
except KeyboardInterrupt:
|
|
print("\nPreruseno uzivatelem")
|
|
except Exception:
|
|
traceback.print_exc()
|
|
sys.exit(1)
|