Files
janssen/mailstore/mailstore_ingest_v1.0.py
T
2026-06-11 21:49:04 +02:00

428 lines
15 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
================================================================================
Nazev: mailstore_ingest_v1.0.py
Verze: 1.0
Datum: 2026-06-11
Autor: Vladimir Buzalka (asistovano Claude)
Popis: Backfill stare historie z MailStore archivu do MongoDB `emaily`.
Dobere do existujici kolekce schranky JEN zpravy, ktere tam jeste
nejsou - dedup podle internet Message-ID (= _id v Mongu).
Cilove schema dokumentu = stejne jako Graph import, takze navazujici
enrich_fulltext_emails + MCP `emaily` search funguji bez uprav.
Strategie:
1. Nacti SET vsech Message-ID (_id) co uz v Mongu pro schranku jsou.
2. Projdi slozky schranky (API GetChildFolders).
3. Per slozka davkove stahni hlavicky (UID, DATE, MESSAGE-ID) - rychle.
4. Kandidat = Message-ID neni v setu AND rok(DATE) >= --since.
5. Pro kandidaty stahni cele telo (RFC822), naparsuj, upsert do Mongo.
Filtr data je client-side z DATE headeru (IMAP SEARCH je u MailStme 78s/k nicemu).
Spusteni:
# KOLIK by se dobralo (nic nezapise) - delej VZDY prvni:
python mailstore_ingest_v1.0.py "vladimir.buzalka@buzalka.cz" --since 2020 --dry-run
# ostry beh:
python mailstore_ingest_v1.0.py "vladimir.buzalka@buzalka.cz" --since 2020
# test na jedne slozce / s limitem:
python mailstore_ingest_v1.0.py "vladimir.buzalka@buzalka.cz" --since 2020 \
--folder "vladimir.buzalka@buzalka.cz/Exchange vladimir.buzalka/Sent Items" --limit 50
================================================================================
"""
from __future__ import annotations
import argparse
import email
import imaplib
import json
import re
import ssl
import sys
import time
import urllib.parse
import urllib.request
from base64 import b64encode
from datetime import datetime, timezone
from email.header import decode_header
from email.utils import getaddresses, parsedate_to_datetime
from pymongo import MongoClient, UpdateOne
# --- konfigurace ------------------------------------------------------------
MS_HOST = "192.168.1.53"
IMAP_PORT = 143
API_PORT = 8463
MS_USER = "admin"
MS_PASS = "*$N(B)vMUym!%"
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "emaily"
HEADER_BATCH = 2000 # kolik hlavicek FETCHovat naraz
UPSERT_BATCH = 100 # kolik dokumentu zapsat naraz do Mongo
# --- API (jen GetChildFolders na seznam slozek) -----------------------------
_API_BASE = f"https://{MS_HOST}:{API_PORT}/api"
_API_AUTH = "Basic " + b64encode(f"{MS_USER}:{MS_PASS}".encode()).decode()
_CTX = ssl.create_default_context()
_CTX.check_hostname = False
_CTX.verify_mode = ssl.CERT_NONE
def api_result(method: str, params: dict | None = None):
data = urllib.parse.urlencode(params or {}).encode()
req = urllib.request.Request(f"{_API_BASE}/invoke/{method}", data=data, method="POST",
headers={"Authorization": _API_AUTH,
"Content-Type": "application/x-www-form-urlencoded"})
with urllib.request.urlopen(req, context=_CTX, timeout=30) as resp:
r = json.loads(resp.read().decode("utf-8-sig"))
if r.get("statusCode") != "succeeded":
raise RuntimeError(f"{method}: {(r.get('error') or {}).get('message')}")
return r.get("result")
def collect_folders(mailbox: str) -> list[str]:
"""Vrati seznam plnych cest vsech slozek schranky (rekurzivne)."""
tree = api_result("GetChildFolders", {"folder": mailbox, "maxLevels": 20})
out: list[str] = []
def walk(node):
for ch in node.get("childFolders") or []:
out.append(ch["fullName"])
walk(ch)
walk(tree)
return out
# --- IMAP --------------------------------------------------------------------
def imap_connect() -> imaplib.IMAP4:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
M = imaplib.IMAP4(MS_HOST, IMAP_PORT)
M.starttls(ssl_context=ctx)
M.login(MS_USER, MS_PASS)
return M
_SEQ_RX = re.compile(rb"^(\d+)\s")
_UID_RX = re.compile(rb"UID (\d+)")
def dec(s) -> str:
if not s:
return ""
out = []
for txt, enc in decode_header(s):
out.append(txt.decode(enc or "utf-8", errors="replace") if isinstance(txt, bytes) else txt)
return "".join(out).replace("\r", " ").replace("\n", " ").strip()
def parse_date(raw) -> datetime | None:
if not raw:
return None
try:
dt = parsedate_to_datetime(raw)
if dt.tzinfo:
dt = dt.astimezone(timezone.utc).replace(tzinfo=None)
return dt
except Exception:
return None
def encode_mutf7(s: str) -> str:
"""Nazev IMAP slozky -> modified UTF-7 (RFC 3501). MailStore neumi
UTF8=ACCEPT, takze slozky s diakritikou (Dorucena posta) musi byt mUTF-7.
Vysledek je cisty ASCII -> bezpecne projde imaplib (ascii encoding)."""
res = []
i, n = 0, len(s)
while i < n:
ch = s[i]
o = ord(ch)
if 0x20 <= o <= 0x7e:
res.append("&-" if ch == "&" else ch)
i += 1
else:
j = i
while j < n and not (0x20 <= ord(s[j]) <= 0x7e):
j += 1
import base64 as _b64
b = s[i:j].encode("utf-16-be")
enc = _b64.b64encode(b).decode("ascii").rstrip("=").replace("/", ",")
res.append("&" + enc + "-")
i = j
return "".join(res)
def imap_select(M: imaplib.IMAP4, folder: str):
"""SELECT slozky s mUTF-7 enkodovanim nazvu (kvuli diakritice)."""
return M.select(f'"{encode_mutf7(folder)}"', readonly=True)
def scan_folder_headers(M: imaplib.IMAP4, folder: str):
"""Davkove stahne (seq, uid, msgid, date) vsech zprav slozky."""
typ, data = imap_select(M, folder)
if typ != "OK":
return None, []
total = int(data[0]) if data and data[0] else 0
if total == 0:
return 0, []
items = []
lo = 1
while lo <= total:
hi = min(lo + HEADER_BATCH - 1, total)
typ, msgs = M.fetch(f"{lo}:{hi}",
"(UID BODY.PEEK[HEADER.FIELDS (MESSAGE-ID DATE)])")
for it in msgs:
if not isinstance(it, tuple):
continue
meta, hdr = it[0], it[1]
mseq = _SEQ_RX.match(meta or b"")
muid = _UID_RX.search(meta or b"")
h = email.message_from_bytes(hdr or b"")
mid = (h.get("Message-ID") or "").strip()
items.append((int(mseq.group(1)) if mseq else 0,
int(muid.group(1)) if muid else 0,
mid, parse_date(h.get("Date"))))
lo = hi + 1
return total, items
def fetch_full(M: imaplib.IMAP4, seq: int) -> bytes | None:
typ, data = M.fetch(str(seq), "(RFC822)")
if typ != "OK" or not data or not isinstance(data[0], tuple):
return None
return data[0][1]
# --- mapovani EML -> Mongo dokument -----------------------------------------
def relativize(folder: str, mailbox: str) -> str:
"""schranka/Exchange X/Sent Items -> Sent Items (jako Graph folder_path)."""
parts = folder.split("/")
# odstran prefix schranky a 'Exchange ...' uroven
if len(parts) >= 2 and parts[0] == mailbox:
rest = parts[2:] if len(parts) > 2 else parts[1:]
return "/".join(rest) if rest else parts[-1]
return parts[-1]
def parse_addr_one(raw) -> dict:
if not raw:
return {"email": None, "name": None}
pairs = getaddresses([raw])
if not pairs:
return {"email": None, "name": None}
name, addr = pairs[0]
return {"email": (addr or "").lower() or None, "name": dec(name) or (addr or None)}
def parse_recipients(msg) -> list[dict]:
out = []
for kind, hdr in (("to", "To"), ("cc", "Cc"), ("bcc", "Bcc")):
val = msg.get(hdr)
if not val:
continue
for name, addr in getaddresses([val]):
if addr:
out.append({"type": kind, "email": addr.lower(),
"name": dec(name) or addr})
return out
def extract_bodies(msg):
body_text = body_html = ""
atts = []
for part in msg.walk():
if part.is_multipart():
continue
ct = part.get_content_type()
disp = str(part.get("Content-Disposition") or "")
payload = part.get_payload(decode=True)
is_att = "attachment" in disp or (part.get_filename() and ct not in ("text/plain", "text/html"))
if is_att:
atts.append({
"filename": dec(part.get_filename()) or "(bez nazvu)",
"size_bytes": len(payload or b""),
"mime_type": ct,
"is_inline": "inline" in disp,
})
elif ct == "text/plain" and not body_text:
body_text = (payload or b"").decode(part.get_content_charset() or "utf-8", errors="replace")
elif ct == "text/html" and not body_html:
body_html = (payload or b"").decode(part.get_content_charset() or "utf-8", errors="replace")
return body_text, body_html, atts
def build_doc(raw: bytes, uid: int, folder: str, mailbox: str) -> dict | None:
msg = email.message_from_bytes(raw)
mid = (msg.get("Message-ID") or "").strip()
if not mid:
return None
dt = parse_date(msg.get("Date"))
body_text, body_html, atts = extract_bodies(msg)
now = datetime.now(timezone.utc).replace(tzinfo=None)
preview = (body_text or "")[:255]
return {
"_id": mid,
"source": "mailstore",
"mailstore_uid": uid,
"mailstore_folder": folder,
# graph_id zamerne VYNECHANO: kolekce ma unique+sparse index na graph_id,
# explicitni None by kolidoval (sparse ignoruje jen CHYBEJICI pole).
"conversation_id": None,
"folder_path": relativize(folder, mailbox),
"subject": dec(msg.get("Subject")),
"sender": parse_addr_one(msg.get("From")),
"recipients": parse_recipients(msg),
"to": dec(msg.get("To")),
"cc": dec(msg.get("Cc")),
"bcc": dec(msg.get("Bcc")),
"sent_at": dt,
"received_at": dt,
"modified_at": now,
"created_at": now,
"parsed_at": now,
"is_read": True,
"is_draft": "draft" in folder.lower() or "koncept" in folder.lower(),
"has_attachments": bool(atts),
"attachment_count": len(atts),
"attachments": atts,
"body_html": body_html or None,
"body_text": body_text or None,
"body_preview": preview,
}
# --- hlavni ------------------------------------------------------------------
def main() -> int:
ap = argparse.ArgumentParser(description="MailStore -> Mongo backfill (dedup dle Message-ID)")
ap.add_argument("mailbox", help="Schranka (top-level slozka MailStore = Mongo kolekce)")
ap.add_argument("--since", type=int, default=None,
help="Ber jen zpravy s rokem >= SINCE (napr. 2020)")
ap.add_argument("--until", type=int, default=None,
help="Ber jen zpravy s rokem <= UNTIL")
ap.add_argument("--folder", default=None, help="Jen jedna konkretni slozka (plna cesta)")
ap.add_argument("--limit", type=int, default=None, help="Max zprav k ingestu (test)")
ap.add_argument("--max-folders", type=int, default=None, help="Max slozek (diagnostika)")
ap.add_argument("--dry-run", action="store_true",
help="Jen spocitej kolik by se dobralo, NIC nezapisuj")
args = ap.parse_args()
t0 = time.time()
print(f"=== MailStore ingest v1.0 | schranka: {args.mailbox} ===")
print(f"Filtr: rok >= {args.since or '-'}{' a <= ' + str(args.until) if args.until else ''}"
f"{' [DRY-RUN]' if args.dry_run else ''}")
# Mongo + set znamych Message-ID
mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
mongo.admin.command("ping")
coll = mongo[MONGO_DB][args.mailbox]
print("Nacitam existujici Message-ID z Mongo...", flush=True)
known = set(coll.distinct("_id"))
print(f" v Mongu uz mam: {len(known):,} zprav")
# slozky
if args.folder:
folders = [args.folder]
else:
folders = collect_folders(args.mailbox)
print(f"Slozek ke kontrole: {len(folders)}")
M = imap_connect()
grand_seen = grand_cand = grand_ingested = 0
queue: list[UpdateOne] = []
def flush():
nonlocal queue
if queue and not args.dry_run:
coll.bulk_write(queue, ordered=False)
queue = []
nonlocal_M = {"M": M}
for fidx, folder in enumerate(folders):
if args.max_folders and fidx >= args.max_folders:
print(f" (--max-folders {args.max_folders} dosazeno)")
break
try:
total, items = scan_folder_headers(nonlocal_M["M"], folder)
except Exception as ex:
# jedna chybna slozka nesmi shodit cely beh - zaloguj a pokracuj.
# Pri chybe IMAP spojeni (abort) se prepoj.
print(f" [{relativize(folder, args.mailbox)[:45]:45}] CHYBA: {type(ex).__name__}: {str(ex)[:80]}", flush=True)
try:
nonlocal_M["M"].logout()
except Exception:
pass
nonlocal_M["M"] = imap_connect()
continue
M = nonlocal_M["M"]
if not total:
continue
# kandidati: rok ok, neni v known, ma msgid
cands = []
for seq, uid, mid, dt in items:
if not mid or mid in known:
continue
yr = dt.year if dt else None
if args.since and (yr is None or yr < args.since):
continue
if args.until and (yr is None or yr > args.until):
continue
cands.append((seq, uid, mid))
grand_seen += total
grand_cand += len(cands)
rel = relativize(folder, args.mailbox)
print(f" [{rel[:45]:45}] zprav={total:>6} k dobrani={len(cands):>6}", flush=True)
if args.dry_run:
continue
for seq, uid, mid in cands:
if args.limit and grand_ingested >= args.limit:
break
raw = fetch_full(M, seq)
if not raw:
continue
doc = build_doc(raw, uid, folder, args.mailbox)
if not doc:
continue
queue.append(UpdateOne({"_id": doc["_id"]}, {"$setOnInsert": doc}, upsert=True))
known.add(doc["_id"])
grand_ingested += 1
if len(queue) >= UPSERT_BATCH:
flush()
flush()
if args.limit and grand_ingested >= args.limit:
print(f" (dosazen limit {args.limit})")
break
M.logout()
flush()
print("-" * 64)
print(f"Zprav proskenovano: {grand_seen:,}")
print(f"K dobrani (chybi, v okne): {grand_cand:,}")
if args.dry_run:
print(">>> DRY-RUN: nic nezapsano. Pro ostry beh spust bez --dry-run.")
else:
print(f"Zapsano do Mongo: {grand_ingested:,}")
print(f"Trvalo: {time.time()-t0:.1f}s")
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except KeyboardInterrupt:
print("\nPreruseno", file=sys.stderr)
sys.exit(1)