This commit is contained in:
2026-06-13 21:45:28 +02:00
parent 6bcb721eb4
commit f94573ea6e
473 changed files with 7805 additions and 31 deletions
+129 -29
View File
@@ -62,7 +62,7 @@ MS_PASS = "*$N(B)vMUym!%"
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "emaily"
HEADER_BATCH = 2000 # kolik hlavicek FETCHovat naraz
HEADER_BATCH = 500 # kolik hlavicek FETCHovat naraz (mensi davka = setrnejsi k MailStore IMAP u obrich slozek)
UPSERT_BATCH = 100 # kolik dokumentu zapsat naraz do Mongo
# --- API (jen GetChildFolders na seznam slozek) -----------------------------
@@ -101,26 +101,51 @@ def collect_folders(mailbox: str) -> list[str]:
# --- IMAP --------------------------------------------------------------------
def imap_connect() -> imaplib.IMAP4:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
M = imaplib.IMAP4(MS_HOST, IMAP_PORT)
M.starttls(ssl_context=ctx)
M.login(MS_USER, MS_PASS)
return M
def imap_connect(retries: int = 6, delay: float = 5.0) -> imaplib.IMAP4:
"""Pripoj se k IMAP; MailStore obcas utne spojeni i behem handshake
(CAPABILITY => EOF) -> retry s kratkym spankem, aby transientni vypadek
neshodil cely beh."""
last = None
for attempt in range(1, retries + 1):
try:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
M = imaplib.IMAP4(MS_HOST, IMAP_PORT)
M.starttls(ssl_context=ctx)
M.login(MS_USER, MS_PASS)
return M
except (imaplib.IMAP4.abort, imaplib.IMAP4.error, OSError) as ex:
last = ex
print(" ! imap_connect pokus %d/%d selhal: %s -> cekam %.0fs"
% (attempt, retries, ex, delay), flush=True)
time.sleep(delay)
raise last
_SEQ_RX = re.compile(rb"^(\d+)\s")
_UID_RX = re.compile(rb"UID (\d+)")
def _safe_decode(b: bytes, enc) -> str:
"""Dekoduj bytes; nestandardni/nezname charsety (napr. 'unknown-8bit')
nesmi shodit beh -> fallback na utf-8, pak latin-1."""
for e in (enc, "utf-8", "latin-1"):
if not e:
continue
try:
return b.decode(e, errors="replace")
except (LookupError, TypeError):
continue
return b.decode("utf-8", errors="replace")
def dec(s) -> str:
if not s:
return ""
out = []
for txt, enc in decode_header(s):
out.append(txt.decode(enc or "utf-8", errors="replace") if isinstance(txt, bytes) else txt)
out.append(_safe_decode(txt, enc) if isinstance(txt, bytes) else txt)
return "".join(out).replace("\r", " ").replace("\n", " ").strip()
@@ -254,9 +279,9 @@ def extract_bodies(msg):
"is_inline": "inline" in disp,
})
elif ct == "text/plain" and not body_text:
body_text = (payload or b"").decode(part.get_content_charset() or "utf-8", errors="replace")
body_text = _safe_decode(payload or b"", part.get_content_charset())
elif ct == "text/html" and not body_html:
body_html = (payload or b"").decode(part.get_content_charset() or "utf-8", errors="replace")
body_html = _safe_decode(payload or b"", part.get_content_charset())
return body_text, body_html, atts
@@ -314,8 +339,22 @@ def main() -> int:
ap.add_argument("--max-folders", type=int, default=None, help="Max slozek (diagnostika)")
ap.add_argument("--dry-run", action="store_true",
help="Jen spocitej kolik by se dobralo, NIC nezapisuj")
ap.add_argument("--log-file", default=None,
help="Presmeruj vystup do souboru (line-buffered). Pro detached beh "
"v kontejneru bez shell redirectu (ten by docker exec cleanup zabil).")
ap.add_argument("--checkpoint", default=None,
help="Soubor s hotovymi slozkami (jedna cesta na radek). Hotove slozky "
"se pri dalsim behu preskoci BEZ FETCH -> rychle navazani po wedgi "
"MailStore IMAP. Idempotentni.")
args = ap.parse_args()
# Vlastni log do souboru - aby detached `docker exec -d python ...` mohl bezet
# bez shell wrapperu (sh -c '... &' docker exec cleanup zabije).
if args.log_file:
_f = open(args.log_file, "a", buffering=1, encoding="utf-8")
sys.stdout = _f
sys.stderr = _f
t0 = time.time()
print(f"=== MailStore ingest v1.0 | schranka: {args.mailbox} ===")
print(f"Filtr: rok >= {args.since or '-'}{' a <= ' + str(args.until) if args.until else ''}"
@@ -326,7 +365,8 @@ def main() -> int:
mongo.admin.command("ping")
coll = mongo[MONGO_DB][args.mailbox]
print("Nacitam existujici Message-ID z Mongo...", flush=True)
known = set(coll.distinct("_id"))
# distinct('_id') prekroci 16MB cap u velkych kolekci -> kurzor po davkach
known = {d["_id"] for d in coll.find({}, {"_id": 1}).batch_size(5000)}
print(f" v Mongu uz mam: {len(known):,} zprav")
# slozky
@@ -336,9 +376,21 @@ def main() -> int:
folders = collect_folders(args.mailbox)
print(f"Slozek ke kontrole: {len(folders)}")
# checkpoint hotovych slozek (preskoci se bez FETCH)
done_folders: set[str] = set()
cp_fh = None
if args.checkpoint and not args.dry_run:
try:
with open(args.checkpoint, "r", encoding="utf-8") as _cf:
done_folders = {ln.strip() for ln in _cf if ln.strip()}
except FileNotFoundError:
pass
cp_fh = open(args.checkpoint, "a", buffering=1, encoding="utf-8")
print(f" checkpoint: {len(done_folders)} slozek uz hotovo (preskocim)")
M = imap_connect()
grand_seen = grand_cand = grand_ingested = 0
grand_seen = grand_cand = grand_ingested = grand_errors = 0
queue: list[UpdateOne] = []
def flush():
@@ -348,16 +400,30 @@ def main() -> int:
queue = []
nonlocal_M = {"M": M}
consec_aborts = 0 # po sobe jdouci aborty = MailStore zwedgoval -> exit(1) pro orchestrator
for fidx, folder in enumerate(folders):
if args.max_folders and fidx >= args.max_folders:
print(f" (--max-folders {args.max_folders} dosazeno)")
break
if folder in done_folders:
continue
try:
total, items = scan_folder_headers(nonlocal_M["M"], folder)
except Exception as ex:
# jedna chybna slozka nesmi shodit cely beh - zaloguj a pokracuj.
# Pri chybe IMAP spojeni (abort) se prepoj.
print(f" [{relativize(folder, args.mailbox)[:45]:45}] CHYBA: {type(ex).__name__}: {str(ex)[:80]}", flush=True)
consec_aborts += 1
if consec_aborts >= 4:
# MailStore IMAP je zwedgovany (login projde, ale FETCH hned EOF) ->
# nedet nastavanou kaskadu falesnych preskoku, skonci NEnulovym kodem,
# at orchestrator restartne sluzbu MailStore a navaze z checkpointu.
flush()
print("!!! %d po sobe jdoucich abortu -> MailStore wedge, koncim rc=2 pro restart"
% consec_aborts, flush=True)
if cp_fh:
cp_fh.close()
return 2
try:
nonlocal_M["M"].logout()
except Exception:
@@ -386,21 +452,53 @@ def main() -> int:
if args.dry_run:
continue
for seq, uid, mid in cands:
if args.limit and grand_ingested >= args.limit:
break
raw = fetch_full(M, seq)
if not raw:
continue
doc = build_doc(raw, uid, folder, args.mailbox)
if not doc:
continue
queue.append(UpdateOne({"_id": doc["_id"]}, {"$setOnInsert": doc}, upsert=True))
known.add(doc["_id"])
grand_ingested += 1
if len(queue) >= UPSERT_BATCH:
flush()
flush()
try:
for seq, uid, mid in cands:
if args.limit and grand_ingested >= args.limit:
break
try:
raw = fetch_full(M, seq)
if not raw:
continue
doc = build_doc(raw, uid, folder, args.mailbox)
if not doc:
continue
except imaplib.IMAP4.abort:
# spojeni umrelo (MailStore wedge) -> ven, prepoj, slozku zopakuje
# az dalsi run (NEoznacit hotovou)
raise
except Exception as ex:
# jedna vadna zprava nesmi shodit beh - preskoc a pokracuj
grand_errors += 1
print(f" ! zprava seq={seq} CHYBA: {type(ex).__name__}: {str(ex)[:60]}", flush=True)
continue
queue.append(UpdateOne({"_id": doc["_id"]}, {"$setOnInsert": doc}, upsert=True))
known.add(doc["_id"])
grand_ingested += 1
if len(queue) >= UPSERT_BATCH:
flush()
flush()
except imaplib.IMAP4.abort as ex:
flush()
print(f" [{rel[:45]:45}] IMAP abort behem fetch: {str(ex)[:50]} -> reconnect", flush=True)
consec_aborts += 1
if consec_aborts >= 4:
print("!!! %d po sobe jdoucich abortu -> MailStore wedge, koncim rc=2 pro restart"
% consec_aborts, flush=True)
if cp_fh:
cp_fh.close()
return 2
try:
nonlocal_M["M"].logout()
except Exception:
pass
nonlocal_M["M"] = imap_connect()
continue # slozka NENI hotova -> zopakuje ji dalsi run
# slozka uspesne dokoncena -> zapis do checkpointu
consec_aborts = 0
if cp_fh:
cp_fh.write(folder + "\n")
done_folders.add(folder)
if args.limit and grand_ingested >= args.limit:
print(f" (dosazen limit {args.limit})")
break
@@ -415,6 +513,8 @@ def main() -> int:
print(">>> DRY-RUN: nic nezapsano. Pro ostry beh spust bez --dry-run.")
else:
print(f"Zapsano do Mongo: {grand_ingested:,}")
if grand_errors:
print(f"Preskoceno zprav s chybou: {grand_errors:,}")
print(f"Trvalo: {time.time()-t0:.1f}s")
return 0