528 lines
20 KiB
Python
528 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
================================================================================
|
|
Nazev: mailstore_ingest_v1.0.py
|
|
Verze: 1.0
|
|
Datum: 2026-06-11
|
|
Autor: Vladimir Buzalka (asistovano Claude)
|
|
Popis: Backfill stare historie z MailStore archivu do MongoDB `emaily`.
|
|
Dobere do existujici kolekce schranky JEN zpravy, ktere tam jeste
|
|
nejsou - dedup podle internet Message-ID (= _id v Mongu).
|
|
|
|
Cilove schema dokumentu = stejne jako Graph import, takze navazujici
|
|
enrich_fulltext_emails + MCP `emaily` search funguji bez uprav.
|
|
|
|
Strategie:
|
|
1. Nacti SET vsech Message-ID (_id) co uz v Mongu pro schranku jsou.
|
|
2. Projdi slozky schranky (API GetChildFolders).
|
|
3. Per slozka davkove stahni hlavicky (UID, DATE, MESSAGE-ID) - rychle.
|
|
4. Kandidat = Message-ID neni v setu AND rok(DATE) >= --since.
|
|
5. Pro kandidaty stahni cele telo (RFC822), naparsuj, upsert do Mongo.
|
|
|
|
Filtr data je client-side z DATE headeru (IMAP SEARCH je u MailStme 78s/k nicemu).
|
|
|
|
Spusteni:
|
|
# KOLIK by se dobralo (nic nezapise) - delej VZDY prvni:
|
|
python mailstore_ingest_v1.0.py "vladimir.buzalka@buzalka.cz" --since 2020 --dry-run
|
|
# ostry beh:
|
|
python mailstore_ingest_v1.0.py "vladimir.buzalka@buzalka.cz" --since 2020
|
|
# test na jedne slozce / s limitem:
|
|
python mailstore_ingest_v1.0.py "vladimir.buzalka@buzalka.cz" --since 2020 \
|
|
--folder "vladimir.buzalka@buzalka.cz/Exchange vladimir.buzalka/Sent Items" --limit 50
|
|
================================================================================
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import email
|
|
import imaplib
|
|
import json
|
|
import re
|
|
import ssl
|
|
import sys
|
|
import time
|
|
import urllib.parse
|
|
import urllib.request
|
|
from base64 import b64encode
|
|
from datetime import datetime, timezone
|
|
from email.header import decode_header
|
|
from email.utils import getaddresses, parsedate_to_datetime
|
|
|
|
from pymongo import MongoClient, UpdateOne
|
|
|
|
# --- konfigurace ------------------------------------------------------------
|
|
MS_HOST = "192.168.1.53"
|
|
IMAP_PORT = 143
|
|
API_PORT = 8463
|
|
MS_USER = "admin"
|
|
MS_PASS = "*$N(B)vMUym!%"
|
|
|
|
MONGO_URI = "mongodb://192.168.1.76:27017"
|
|
MONGO_DB = "emaily"
|
|
|
|
HEADER_BATCH = 500 # kolik hlavicek FETCHovat naraz (mensi davka = setrnejsi k MailStore IMAP u obrich slozek)
|
|
UPSERT_BATCH = 100 # kolik dokumentu zapsat naraz do Mongo
|
|
|
|
# --- API (jen GetChildFolders na seznam slozek) -----------------------------
|
|
_API_BASE = f"https://{MS_HOST}:{API_PORT}/api"
|
|
_API_AUTH = "Basic " + b64encode(f"{MS_USER}:{MS_PASS}".encode()).decode()
|
|
_CTX = ssl.create_default_context()
|
|
_CTX.check_hostname = False
|
|
_CTX.verify_mode = ssl.CERT_NONE
|
|
|
|
|
|
def api_result(method: str, params: dict | None = None):
|
|
data = urllib.parse.urlencode(params or {}).encode()
|
|
req = urllib.request.Request(f"{_API_BASE}/invoke/{method}", data=data, method="POST",
|
|
headers={"Authorization": _API_AUTH,
|
|
"Content-Type": "application/x-www-form-urlencoded"})
|
|
with urllib.request.urlopen(req, context=_CTX, timeout=30) as resp:
|
|
r = json.loads(resp.read().decode("utf-8-sig"))
|
|
if r.get("statusCode") != "succeeded":
|
|
raise RuntimeError(f"{method}: {(r.get('error') or {}).get('message')}")
|
|
return r.get("result")
|
|
|
|
|
|
def collect_folders(mailbox: str) -> list[str]:
|
|
"""Vrati seznam plnych cest vsech slozek schranky (rekurzivne)."""
|
|
tree = api_result("GetChildFolders", {"folder": mailbox, "maxLevels": 20})
|
|
out: list[str] = []
|
|
|
|
def walk(node):
|
|
for ch in node.get("childFolders") or []:
|
|
out.append(ch["fullName"])
|
|
walk(ch)
|
|
|
|
walk(tree)
|
|
return out
|
|
|
|
|
|
# --- IMAP --------------------------------------------------------------------
|
|
|
|
def imap_connect(retries: int = 6, delay: float = 5.0) -> imaplib.IMAP4:
|
|
"""Pripoj se k IMAP; MailStore obcas utne spojeni i behem handshake
|
|
(CAPABILITY => EOF) -> retry s kratkym spankem, aby transientni vypadek
|
|
neshodil cely beh."""
|
|
last = None
|
|
for attempt in range(1, retries + 1):
|
|
try:
|
|
ctx = ssl.create_default_context()
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
M = imaplib.IMAP4(MS_HOST, IMAP_PORT)
|
|
M.starttls(ssl_context=ctx)
|
|
M.login(MS_USER, MS_PASS)
|
|
return M
|
|
except (imaplib.IMAP4.abort, imaplib.IMAP4.error, OSError) as ex:
|
|
last = ex
|
|
print(" ! imap_connect pokus %d/%d selhal: %s -> cekam %.0fs"
|
|
% (attempt, retries, ex, delay), flush=True)
|
|
time.sleep(delay)
|
|
raise last
|
|
|
|
|
|
_SEQ_RX = re.compile(rb"^(\d+)\s")
|
|
_UID_RX = re.compile(rb"UID (\d+)")
|
|
|
|
|
|
def _safe_decode(b: bytes, enc) -> str:
|
|
"""Dekoduj bytes; nestandardni/nezname charsety (napr. 'unknown-8bit')
|
|
nesmi shodit beh -> fallback na utf-8, pak latin-1."""
|
|
for e in (enc, "utf-8", "latin-1"):
|
|
if not e:
|
|
continue
|
|
try:
|
|
return b.decode(e, errors="replace")
|
|
except (LookupError, TypeError):
|
|
continue
|
|
return b.decode("utf-8", errors="replace")
|
|
|
|
|
|
def dec(s) -> str:
|
|
if not s:
|
|
return ""
|
|
out = []
|
|
for txt, enc in decode_header(s):
|
|
out.append(_safe_decode(txt, enc) if isinstance(txt, bytes) else txt)
|
|
return "".join(out).replace("\r", " ").replace("\n", " ").strip()
|
|
|
|
|
|
def parse_date(raw) -> datetime | None:
|
|
if not raw:
|
|
return None
|
|
try:
|
|
dt = parsedate_to_datetime(raw)
|
|
if dt.tzinfo:
|
|
dt = dt.astimezone(timezone.utc).replace(tzinfo=None)
|
|
return dt
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def encode_mutf7(s: str) -> str:
|
|
"""Nazev IMAP slozky -> modified UTF-7 (RFC 3501). MailStore neumi
|
|
UTF8=ACCEPT, takze slozky s diakritikou (Dorucena posta) musi byt mUTF-7.
|
|
Vysledek je cisty ASCII -> bezpecne projde imaplib (ascii encoding)."""
|
|
res = []
|
|
i, n = 0, len(s)
|
|
while i < n:
|
|
ch = s[i]
|
|
o = ord(ch)
|
|
if 0x20 <= o <= 0x7e:
|
|
res.append("&-" if ch == "&" else ch)
|
|
i += 1
|
|
else:
|
|
j = i
|
|
while j < n and not (0x20 <= ord(s[j]) <= 0x7e):
|
|
j += 1
|
|
import base64 as _b64
|
|
b = s[i:j].encode("utf-16-be")
|
|
enc = _b64.b64encode(b).decode("ascii").rstrip("=").replace("/", ",")
|
|
res.append("&" + enc + "-")
|
|
i = j
|
|
return "".join(res)
|
|
|
|
|
|
def imap_select(M: imaplib.IMAP4, folder: str):
|
|
"""SELECT slozky s mUTF-7 enkodovanim nazvu (kvuli diakritice)."""
|
|
return M.select(f'"{encode_mutf7(folder)}"', readonly=True)
|
|
|
|
|
|
def scan_folder_headers(M: imaplib.IMAP4, folder: str):
|
|
"""Davkove stahne (seq, uid, msgid, date) vsech zprav slozky."""
|
|
typ, data = imap_select(M, folder)
|
|
if typ != "OK":
|
|
return None, []
|
|
total = int(data[0]) if data and data[0] else 0
|
|
if total == 0:
|
|
return 0, []
|
|
items = []
|
|
lo = 1
|
|
while lo <= total:
|
|
hi = min(lo + HEADER_BATCH - 1, total)
|
|
typ, msgs = M.fetch(f"{lo}:{hi}",
|
|
"(UID BODY.PEEK[HEADER.FIELDS (MESSAGE-ID DATE)])")
|
|
for it in msgs:
|
|
if not isinstance(it, tuple):
|
|
continue
|
|
meta, hdr = it[0], it[1]
|
|
mseq = _SEQ_RX.match(meta or b"")
|
|
muid = _UID_RX.search(meta or b"")
|
|
h = email.message_from_bytes(hdr or b"")
|
|
mid = (h.get("Message-ID") or "").strip()
|
|
items.append((int(mseq.group(1)) if mseq else 0,
|
|
int(muid.group(1)) if muid else 0,
|
|
mid, parse_date(h.get("Date"))))
|
|
lo = hi + 1
|
|
return total, items
|
|
|
|
|
|
def fetch_full(M: imaplib.IMAP4, seq: int) -> bytes | None:
|
|
typ, data = M.fetch(str(seq), "(RFC822)")
|
|
if typ != "OK" or not data or not isinstance(data[0], tuple):
|
|
return None
|
|
return data[0][1]
|
|
|
|
|
|
# --- mapovani EML -> Mongo dokument -----------------------------------------
|
|
|
|
def relativize(folder: str, mailbox: str) -> str:
|
|
"""schranka/Exchange X/Sent Items -> Sent Items (jako Graph folder_path)."""
|
|
parts = folder.split("/")
|
|
# odstran prefix schranky a 'Exchange ...' uroven
|
|
if len(parts) >= 2 and parts[0] == mailbox:
|
|
rest = parts[2:] if len(parts) > 2 else parts[1:]
|
|
return "/".join(rest) if rest else parts[-1]
|
|
return parts[-1]
|
|
|
|
|
|
def parse_addr_one(raw) -> dict:
|
|
if not raw:
|
|
return {"email": None, "name": None}
|
|
pairs = getaddresses([raw])
|
|
if not pairs:
|
|
return {"email": None, "name": None}
|
|
name, addr = pairs[0]
|
|
return {"email": (addr or "").lower() or None, "name": dec(name) or (addr or None)}
|
|
|
|
|
|
def parse_recipients(msg) -> list[dict]:
|
|
out = []
|
|
for kind, hdr in (("to", "To"), ("cc", "Cc"), ("bcc", "Bcc")):
|
|
val = msg.get(hdr)
|
|
if not val:
|
|
continue
|
|
for name, addr in getaddresses([val]):
|
|
if addr:
|
|
out.append({"type": kind, "email": addr.lower(),
|
|
"name": dec(name) or addr})
|
|
return out
|
|
|
|
|
|
def extract_bodies(msg):
|
|
body_text = body_html = ""
|
|
atts = []
|
|
for part in msg.walk():
|
|
if part.is_multipart():
|
|
continue
|
|
ct = part.get_content_type()
|
|
disp = str(part.get("Content-Disposition") or "")
|
|
payload = part.get_payload(decode=True)
|
|
is_att = "attachment" in disp or (part.get_filename() and ct not in ("text/plain", "text/html"))
|
|
if is_att:
|
|
atts.append({
|
|
"filename": dec(part.get_filename()) or "(bez nazvu)",
|
|
"size_bytes": len(payload or b""),
|
|
"mime_type": ct,
|
|
"is_inline": "inline" in disp,
|
|
})
|
|
elif ct == "text/plain" and not body_text:
|
|
body_text = _safe_decode(payload or b"", part.get_content_charset())
|
|
elif ct == "text/html" and not body_html:
|
|
body_html = _safe_decode(payload or b"", part.get_content_charset())
|
|
return body_text, body_html, atts
|
|
|
|
|
|
def build_doc(raw: bytes, uid: int, folder: str, mailbox: str) -> dict | None:
|
|
msg = email.message_from_bytes(raw)
|
|
mid = (msg.get("Message-ID") or "").strip()
|
|
if not mid:
|
|
return None
|
|
dt = parse_date(msg.get("Date"))
|
|
body_text, body_html, atts = extract_bodies(msg)
|
|
now = datetime.now(timezone.utc).replace(tzinfo=None)
|
|
preview = (body_text or "")[:255]
|
|
return {
|
|
"_id": mid,
|
|
"source": "mailstore",
|
|
"mailstore_uid": uid,
|
|
"mailstore_folder": folder,
|
|
# graph_id zamerne VYNECHANO: kolekce ma unique+sparse index na graph_id,
|
|
# explicitni None by kolidoval (sparse ignoruje jen CHYBEJICI pole).
|
|
"conversation_id": None,
|
|
"folder_path": relativize(folder, mailbox),
|
|
"subject": dec(msg.get("Subject")),
|
|
"sender": parse_addr_one(msg.get("From")),
|
|
"recipients": parse_recipients(msg),
|
|
"to": dec(msg.get("To")),
|
|
"cc": dec(msg.get("Cc")),
|
|
"bcc": dec(msg.get("Bcc")),
|
|
"sent_at": dt,
|
|
"received_at": dt,
|
|
"modified_at": now,
|
|
"created_at": now,
|
|
"parsed_at": now,
|
|
"is_read": True,
|
|
"is_draft": "draft" in folder.lower() or "koncept" in folder.lower(),
|
|
"has_attachments": bool(atts),
|
|
"attachment_count": len(atts),
|
|
"attachments": atts,
|
|
"body_html": body_html or None,
|
|
"body_text": body_text or None,
|
|
"body_preview": preview,
|
|
}
|
|
|
|
|
|
# --- hlavni ------------------------------------------------------------------
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser(description="MailStore -> Mongo backfill (dedup dle Message-ID)")
|
|
ap.add_argument("mailbox", help="Schranka (top-level slozka MailStore = Mongo kolekce)")
|
|
ap.add_argument("--since", type=int, default=None,
|
|
help="Ber jen zpravy s rokem >= SINCE (napr. 2020)")
|
|
ap.add_argument("--until", type=int, default=None,
|
|
help="Ber jen zpravy s rokem <= UNTIL")
|
|
ap.add_argument("--folder", default=None, help="Jen jedna konkretni slozka (plna cesta)")
|
|
ap.add_argument("--limit", type=int, default=None, help="Max zprav k ingestu (test)")
|
|
ap.add_argument("--max-folders", type=int, default=None, help="Max slozek (diagnostika)")
|
|
ap.add_argument("--dry-run", action="store_true",
|
|
help="Jen spocitej kolik by se dobralo, NIC nezapisuj")
|
|
ap.add_argument("--log-file", default=None,
|
|
help="Presmeruj vystup do souboru (line-buffered). Pro detached beh "
|
|
"v kontejneru bez shell redirectu (ten by docker exec cleanup zabil).")
|
|
ap.add_argument("--checkpoint", default=None,
|
|
help="Soubor s hotovymi slozkami (jedna cesta na radek). Hotove slozky "
|
|
"se pri dalsim behu preskoci BEZ FETCH -> rychle navazani po wedgi "
|
|
"MailStore IMAP. Idempotentni.")
|
|
args = ap.parse_args()
|
|
|
|
# Vlastni log do souboru - aby detached `docker exec -d python ...` mohl bezet
|
|
# bez shell wrapperu (sh -c '... &' docker exec cleanup zabije).
|
|
if args.log_file:
|
|
_f = open(args.log_file, "a", buffering=1, encoding="utf-8")
|
|
sys.stdout = _f
|
|
sys.stderr = _f
|
|
|
|
t0 = time.time()
|
|
print(f"=== MailStore ingest v1.0 | schranka: {args.mailbox} ===")
|
|
print(f"Filtr: rok >= {args.since or '-'}{' a <= ' + str(args.until) if args.until else ''}"
|
|
f"{' [DRY-RUN]' if args.dry_run else ''}")
|
|
|
|
# Mongo + set znamych Message-ID
|
|
mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
|
|
mongo.admin.command("ping")
|
|
coll = mongo[MONGO_DB][args.mailbox]
|
|
print("Nacitam existujici Message-ID z Mongo...", flush=True)
|
|
# distinct('_id') prekroci 16MB cap u velkych kolekci -> kurzor po davkach
|
|
known = {d["_id"] for d in coll.find({}, {"_id": 1}).batch_size(5000)}
|
|
print(f" v Mongu uz mam: {len(known):,} zprav")
|
|
|
|
# slozky
|
|
if args.folder:
|
|
folders = [args.folder]
|
|
else:
|
|
folders = collect_folders(args.mailbox)
|
|
print(f"Slozek ke kontrole: {len(folders)}")
|
|
|
|
# checkpoint hotovych slozek (preskoci se bez FETCH)
|
|
done_folders: set[str] = set()
|
|
cp_fh = None
|
|
if args.checkpoint and not args.dry_run:
|
|
try:
|
|
with open(args.checkpoint, "r", encoding="utf-8") as _cf:
|
|
done_folders = {ln.strip() for ln in _cf if ln.strip()}
|
|
except FileNotFoundError:
|
|
pass
|
|
cp_fh = open(args.checkpoint, "a", buffering=1, encoding="utf-8")
|
|
print(f" checkpoint: {len(done_folders)} slozek uz hotovo (preskocim)")
|
|
|
|
M = imap_connect()
|
|
|
|
grand_seen = grand_cand = grand_ingested = grand_errors = 0
|
|
queue: list[UpdateOne] = []
|
|
|
|
def flush():
|
|
nonlocal queue
|
|
if queue and not args.dry_run:
|
|
coll.bulk_write(queue, ordered=False)
|
|
queue = []
|
|
|
|
nonlocal_M = {"M": M}
|
|
consec_aborts = 0 # po sobe jdouci aborty = MailStore zwedgoval -> exit(1) pro orchestrator
|
|
for fidx, folder in enumerate(folders):
|
|
if args.max_folders and fidx >= args.max_folders:
|
|
print(f" (--max-folders {args.max_folders} dosazeno)")
|
|
break
|
|
if folder in done_folders:
|
|
continue
|
|
try:
|
|
total, items = scan_folder_headers(nonlocal_M["M"], folder)
|
|
except Exception as ex:
|
|
# jedna chybna slozka nesmi shodit cely beh - zaloguj a pokracuj.
|
|
# Pri chybe IMAP spojeni (abort) se prepoj.
|
|
print(f" [{relativize(folder, args.mailbox)[:45]:45}] CHYBA: {type(ex).__name__}: {str(ex)[:80]}", flush=True)
|
|
consec_aborts += 1
|
|
if consec_aborts >= 4:
|
|
# MailStore IMAP je zwedgovany (login projde, ale FETCH hned EOF) ->
|
|
# nedet nastavanou kaskadu falesnych preskoku, skonci NEnulovym kodem,
|
|
# at orchestrator restartne sluzbu MailStore a navaze z checkpointu.
|
|
flush()
|
|
print("!!! %d po sobe jdoucich abortu -> MailStore wedge, koncim rc=2 pro restart"
|
|
% consec_aborts, flush=True)
|
|
if cp_fh:
|
|
cp_fh.close()
|
|
return 2
|
|
try:
|
|
nonlocal_M["M"].logout()
|
|
except Exception:
|
|
pass
|
|
nonlocal_M["M"] = imap_connect()
|
|
continue
|
|
M = nonlocal_M["M"]
|
|
if not total:
|
|
continue
|
|
# kandidati: rok ok, neni v known, ma msgid
|
|
cands = []
|
|
for seq, uid, mid, dt in items:
|
|
if not mid or mid in known:
|
|
continue
|
|
yr = dt.year if dt else None
|
|
if args.since and (yr is None or yr < args.since):
|
|
continue
|
|
if args.until and (yr is None or yr > args.until):
|
|
continue
|
|
cands.append((seq, uid, mid))
|
|
grand_seen += total
|
|
grand_cand += len(cands)
|
|
rel = relativize(folder, args.mailbox)
|
|
print(f" [{rel[:45]:45}] zprav={total:>6} k dobrani={len(cands):>6}", flush=True)
|
|
|
|
if args.dry_run:
|
|
continue
|
|
|
|
try:
|
|
for seq, uid, mid in cands:
|
|
if args.limit and grand_ingested >= args.limit:
|
|
break
|
|
try:
|
|
raw = fetch_full(M, seq)
|
|
if not raw:
|
|
continue
|
|
doc = build_doc(raw, uid, folder, args.mailbox)
|
|
if not doc:
|
|
continue
|
|
except imaplib.IMAP4.abort:
|
|
# spojeni umrelo (MailStore wedge) -> ven, prepoj, slozku zopakuje
|
|
# az dalsi run (NEoznacit hotovou)
|
|
raise
|
|
except Exception as ex:
|
|
# jedna vadna zprava nesmi shodit beh - preskoc a pokracuj
|
|
grand_errors += 1
|
|
print(f" ! zprava seq={seq} CHYBA: {type(ex).__name__}: {str(ex)[:60]}", flush=True)
|
|
continue
|
|
queue.append(UpdateOne({"_id": doc["_id"]}, {"$setOnInsert": doc}, upsert=True))
|
|
known.add(doc["_id"])
|
|
grand_ingested += 1
|
|
if len(queue) >= UPSERT_BATCH:
|
|
flush()
|
|
flush()
|
|
except imaplib.IMAP4.abort as ex:
|
|
flush()
|
|
print(f" [{rel[:45]:45}] IMAP abort behem fetch: {str(ex)[:50]} -> reconnect", flush=True)
|
|
consec_aborts += 1
|
|
if consec_aborts >= 4:
|
|
print("!!! %d po sobe jdoucich abortu -> MailStore wedge, koncim rc=2 pro restart"
|
|
% consec_aborts, flush=True)
|
|
if cp_fh:
|
|
cp_fh.close()
|
|
return 2
|
|
try:
|
|
nonlocal_M["M"].logout()
|
|
except Exception:
|
|
pass
|
|
nonlocal_M["M"] = imap_connect()
|
|
continue # slozka NENI hotova -> zopakuje ji dalsi run
|
|
# slozka uspesne dokoncena -> zapis do checkpointu
|
|
consec_aborts = 0
|
|
if cp_fh:
|
|
cp_fh.write(folder + "\n")
|
|
done_folders.add(folder)
|
|
if args.limit and grand_ingested >= args.limit:
|
|
print(f" (dosazen limit {args.limit})")
|
|
break
|
|
|
|
M.logout()
|
|
flush()
|
|
|
|
print("-" * 64)
|
|
print(f"Zprav proskenovano: {grand_seen:,}")
|
|
print(f"K dobrani (chybi, v okne): {grand_cand:,}")
|
|
if args.dry_run:
|
|
print(">>> DRY-RUN: nic nezapsano. Pro ostry beh spust bez --dry-run.")
|
|
else:
|
|
print(f"Zapsano do Mongo: {grand_ingested:,}")
|
|
if grand_errors:
|
|
print(f"Preskoceno zprav s chybou: {grand_errors:,}")
|
|
print(f"Trvalo: {time.time()-t0:.1f}s")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
sys.exit(main())
|
|
except KeyboardInterrupt:
|
|
print("\nPreruseno", file=sys.stderr)
|
|
sys.exit(1)
|