""" ============================================================================== Skript: refetch_text_bodies_v1.0.py Verze: 1.0 Datum: 2026-06-03 Autor: vladimir.buzalka Popis: ONETIME oprava — parse_emails_graph_v1.3 ukladal plain-text emaily jen jako prvnich 2000 znaku do `body_preview`. Plne telo se zahazovalo. Tento skript: 1) Najde v Mongo emaily kde body_html IS NULL/missing/empty a soucasne maji graph_id (lze refetch) 2) Pro kazdy GET /users/{mailbox}/messages/{graph_id}?$select=body,bodyPreview 3) Pokud body.contentType == 'text' -> ulozi PLNY obsah do noveho pole body_text (max 2 MB - stejny limit jako body_html) 4) Pokud body.contentType == 'html' (Graph mezitim prepnul) -> ulozi do body_html 5) Aktualizuje body_preview na realny 255-znakovy bodyPreview z Graphu Bezpecne preusitelne a opakovatelne - skript znovu refetchne jen ty kde stale chybi body_html i body_text. Spusteni: python refetch_text_bodies_v1.0.py # vsechny schranky python refetch_text_bodies_v1.0.py --mailbox vladimir.buzalka@buzalka.cz python refetch_text_bodies_v1.0.py --limit 100 # test ============================================================================== """ from __future__ import annotations import argparse import logging import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Optional import msal import requests from pymongo import MongoClient, UpdateOne if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8", errors="replace") # --- konfigurace ------------------------------------------------------------ GRAPH_TENANT_ID = "7d269944-37a4-43a1-8140-c7517dc426e9" GRAPH_CLIENT_ID = "4b222bfd-78c9-4239-a53f-43006b3ed07f" GRAPH_CLIENT_SECRET = "Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk" GRAPH_URL = "https://graph.microsoft.com/v1.0" MONGO_URI = "mongodb://192.168.1.76:27017" MONGO_DB = "emaily" SKIP_COLLECTIONS = {"attachments_index"} MAX_BODY_BYTES = 2 * 1024 * 1024 # 2 MB - stejny limit jako body_html v parseru BATCH_SIZE = 50 LOG_FILE = Path(__file__).parent / "refetch_text_bodies_errors.log" logging.basicConfig( filename=str(LOG_FILE), level=logging.ERROR, format="%(asctime)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", encoding="utf-8", ) # --- Graph auth ------------------------------------------------------------- _token: Optional[str] = None def get_token() -> str: global _token app = msal.ConfidentialClientApplication( GRAPH_CLIENT_ID, authority=f"https://login.microsoftonline.com/{GRAPH_TENANT_ID}", client_credential=GRAPH_CLIENT_SECRET, ) res = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"]) if "access_token" not in res: raise RuntimeError(f"Graph auth failed: {res}") _token = res["access_token"] return _token def graph_get(url: str, params: dict = None) -> Optional[dict]: global _token if not _token: get_token() for attempt in range(3): try: r = requests.get( url, headers={"Authorization": f"Bearer {_token}"}, params=params, timeout=30, ) if r.status_code == 401: get_token() continue if r.status_code == 404: return None # zprava uz neexistuje na strane Outlook if r.status_code == 429: wait = int(r.headers.get("Retry-After", "5")) print(f" [429] throttled, cekam {wait}s", flush=True) time.sleep(wait) continue r.raise_for_status() return r.json() except requests.RequestException as e: if attempt == 2: raise time.sleep(2) return None # --- hlavni smycka ---------------------------------------------------------- # emaily kde chybi obe tela (body_html i body_text) - tj. jeste nezpracovane EMPTY_BODY_FILTER = { "$and": [ {"$or": [ {"body_html": None}, {"body_html": {"$exists": False}}, {"body_html": ""}, ]}, {"$or": [ {"body_text": None}, {"body_text": {"$exists": False}}, {"body_text": ""}, ]}, {"graph_id": {"$exists": True, "$ne": None, "$ne": ""}}, ] } def process_mailbox(col, mailbox: str, limit: Optional[int]) -> dict: total = col.count_documents(EMPTY_BODY_FILTER) print(f"[{mailbox}] kandidatu k refetchi: {total}" + (f" (limit {limit})" if limit else "")) if total == 0: return {"mailbox": mailbox, "candidates": 0, "refetched": 0, "text": 0, "html": 0, "still_empty": 0, "errors": 0, "missing": 0} cursor = col.find(EMPTY_BODY_FILTER, {"_id": 1, "graph_id": 1}, no_cursor_timeout=True) if limit: cursor = cursor.limit(limit) n = refetched = txt = html = still_empty = err = missing = 0 bulk: list[UpdateOne] = [] try: for doc in cursor: n += 1 mid = doc["_id"] gid = doc["graph_id"] url = f"{GRAPH_URL}/users/{mailbox}/messages/{gid}" params = {"$select": "body,bodyPreview"} try: data = graph_get(url, params) except Exception as e: err += 1 logging.error("[%s] graph_get %s: %s", mailbox, gid, e) continue if data is None: missing += 1 continue body = data.get("body") or {} ctype = body.get("contentType") content = body.get("content") or "" preview = data.get("bodyPreview") or "" update: dict = {"refetched_at": datetime.now(timezone.utc).replace(tzinfo=None)} if not content: still_empty += 1 update["body_refetch_status"] = "graph_empty" elif ctype == "html": update["body_html"] = (content[:MAX_BODY_BYTES] if len(content) > MAX_BODY_BYTES else content) update["body_refetch_status"] = "html" html += 1 refetched += 1 elif ctype == "text": update["body_text"] = (content[:MAX_BODY_BYTES] if len(content) > MAX_BODY_BYTES else content) update["body_refetch_status"] = "text" txt += 1 refetched += 1 else: update["body_refetch_status"] = f"unknown_ctype:{ctype}" still_empty += 1 if preview: update["body_preview"] = preview[:300] bulk.append(UpdateOne({"_id": mid}, {"$set": update})) if len(bulk) >= BATCH_SIZE: col.bulk_write(bulk, ordered=False) bulk.clear() if n % 100 == 0 or n == 1: print(f" [{n:>5}/{total}] refetched={refetched} " f"text={txt} html={html} still_empty={still_empty} " f"missing={missing} err={err}", flush=True) finally: cursor.close() if bulk: col.bulk_write(bulk, ordered=False) print(f" [{n}/{total}] DONE refetched={refetched} text={txt} html={html} " f"still_empty={still_empty} missing={missing} err={err}") return {"mailbox": mailbox, "candidates": total, "refetched": refetched, "text": txt, "html": html, "still_empty": still_empty, "errors": err, "missing": missing} def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--mailbox", help="Jedna konkretni schranka (default: vsechny)") ap.add_argument("--limit", type=int, help="Limit emailu na schranku (test)") args = ap.parse_args() t0 = time.time() print("Pripojuji se k MongoDB...") mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) mongo.admin.command("ping") db = mongo[MONGO_DB] print("Token Graph API...") get_token() print("OK\n") if args.mailbox: mailboxes = [args.mailbox] else: mailboxes = [c for c in db.list_collection_names() if c not in SKIP_COLLECTIONS] print(f"Schranky ({len(mailboxes)}): {mailboxes}\n") results = [] for mb in mailboxes: results.append(process_mailbox(db[mb], mb, limit=args.limit)) print() print("=== SHRNUTI ===") for r in results: print(f" {r['mailbox']}: candidates={r['candidates']} " f"refetched={r['refetched']} text={r['text']} html={r['html']} " f"still_empty={r['still_empty']} missing={r['missing']} errors={r['errors']}") print(f"\nCelkem trvalo: {time.time() - t0:.1f} s") return 0 if __name__ == "__main__": try: raise SystemExit(main()) except KeyboardInterrupt: print("\nPreruseno uzivatelem") except Exception: import traceback traceback.print_exc() sys.exit(1)