Files
janssen/Python-runner/2_refetch_text_bodies_v1.0.py
2026-06-05 21:21:30 +02:00

271 lines
9.0 KiB
Python

"""
==============================================================================
Skript: refetch_text_bodies_v1.0.py
Verze: 1.0
Datum: 2026-06-03
Autor: vladimir.buzalka
Popis:
ONETIME oprava — parse_emails_graph_v1.3 ukladal plain-text emaily jen jako
prvnich 2000 znaku do `body_preview`. Plne telo se zahazovalo.
Tento skript:
1) Najde v Mongo emaily kde body_html IS NULL/missing/empty
a soucasne maji graph_id (lze refetch)
2) Pro kazdy GET /users/{mailbox}/messages/{graph_id}?$select=body,bodyPreview
3) Pokud body.contentType == 'text' -> ulozi PLNY obsah do noveho pole
body_text (max 2 MB - stejny limit jako body_html)
4) Pokud body.contentType == 'html' (Graph mezitim prepnul) -> ulozi do body_html
5) Aktualizuje body_preview na realny 255-znakovy bodyPreview z Graphu
Bezpecne preusitelne a opakovatelne - skript znovu refetchne jen ty kde
stale chybi body_html i body_text.
Spusteni:
python refetch_text_bodies_v1.0.py # vsechny schranky
python refetch_text_bodies_v1.0.py --mailbox vladimir.buzalka@buzalka.cz
python refetch_text_bodies_v1.0.py --limit 100 # test
==============================================================================
"""
from __future__ import annotations
import argparse
import logging
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import msal
import requests
from pymongo import MongoClient, UpdateOne
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
# --- konfigurace ------------------------------------------------------------
GRAPH_TENANT_ID = "7d269944-37a4-43a1-8140-c7517dc426e9"
GRAPH_CLIENT_ID = "4b222bfd-78c9-4239-a53f-43006b3ed07f"
GRAPH_CLIENT_SECRET = "Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk"
GRAPH_URL = "https://graph.microsoft.com/v1.0"
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "emaily"
SKIP_COLLECTIONS = {"attachments_index"}
MAX_BODY_BYTES = 2 * 1024 * 1024 # 2 MB - stejny limit jako body_html v parseru
BATCH_SIZE = 50
LOG_FILE = Path(__file__).parent / "refetch_text_bodies_errors.log"
logging.basicConfig(
filename=str(LOG_FILE),
level=logging.ERROR,
format="%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
encoding="utf-8",
)
# --- Graph auth -------------------------------------------------------------
_token: Optional[str] = None
def get_token() -> str:
global _token
app = msal.ConfidentialClientApplication(
GRAPH_CLIENT_ID,
authority=f"https://login.microsoftonline.com/{GRAPH_TENANT_ID}",
client_credential=GRAPH_CLIENT_SECRET,
)
res = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])
if "access_token" not in res:
raise RuntimeError(f"Graph auth failed: {res}")
_token = res["access_token"]
return _token
def graph_get(url: str, params: dict = None) -> Optional[dict]:
global _token
if not _token:
get_token()
for attempt in range(3):
try:
r = requests.get(
url,
headers={"Authorization": f"Bearer {_token}"},
params=params,
timeout=30,
)
if r.status_code == 401:
get_token()
continue
if r.status_code == 404:
return None # zprava uz neexistuje na strane Outlook
if r.status_code == 429:
wait = int(r.headers.get("Retry-After", "5"))
print(f" [429] throttled, cekam {wait}s", flush=True)
time.sleep(wait)
continue
r.raise_for_status()
return r.json()
except requests.RequestException as e:
if attempt == 2:
raise
time.sleep(2)
return None
# --- hlavni smycka ----------------------------------------------------------
# emaily kde chybi obe tela (body_html i body_text) - tj. jeste nezpracovane
EMPTY_BODY_FILTER = {
"$and": [
{"$or": [
{"body_html": None},
{"body_html": {"$exists": False}},
{"body_html": ""},
]},
{"$or": [
{"body_text": None},
{"body_text": {"$exists": False}},
{"body_text": ""},
]},
{"graph_id": {"$exists": True, "$ne": None, "$ne": ""}},
]
}
def process_mailbox(col, mailbox: str, limit: Optional[int]) -> dict:
total = col.count_documents(EMPTY_BODY_FILTER)
print(f"[{mailbox}] kandidatu k refetchi: {total}"
+ (f" (limit {limit})" if limit else ""))
if total == 0:
return {"mailbox": mailbox, "candidates": 0, "refetched": 0,
"text": 0, "html": 0, "still_empty": 0, "errors": 0, "missing": 0}
cursor = col.find(EMPTY_BODY_FILTER, {"_id": 1, "graph_id": 1},
no_cursor_timeout=True)
if limit:
cursor = cursor.limit(limit)
n = refetched = txt = html = still_empty = err = missing = 0
bulk: list[UpdateOne] = []
try:
for doc in cursor:
n += 1
mid = doc["_id"]
gid = doc["graph_id"]
url = f"{GRAPH_URL}/users/{mailbox}/messages/{gid}"
params = {"$select": "body,bodyPreview"}
try:
data = graph_get(url, params)
except Exception as e:
err += 1
logging.error("[%s] graph_get %s: %s", mailbox, gid, e)
continue
if data is None:
missing += 1
continue
body = data.get("body") or {}
ctype = body.get("contentType")
content = body.get("content") or ""
preview = data.get("bodyPreview") or ""
update: dict = {"refetched_at": datetime.now(timezone.utc).replace(tzinfo=None)}
if not content:
still_empty += 1
update["body_refetch_status"] = "graph_empty"
elif ctype == "html":
update["body_html"] = (content[:MAX_BODY_BYTES]
if len(content) > MAX_BODY_BYTES else content)
update["body_refetch_status"] = "html"
html += 1
refetched += 1
elif ctype == "text":
update["body_text"] = (content[:MAX_BODY_BYTES]
if len(content) > MAX_BODY_BYTES else content)
update["body_refetch_status"] = "text"
txt += 1
refetched += 1
else:
update["body_refetch_status"] = f"unknown_ctype:{ctype}"
still_empty += 1
if preview:
update["body_preview"] = preview[:300]
bulk.append(UpdateOne({"_id": mid}, {"$set": update}))
if len(bulk) >= BATCH_SIZE:
col.bulk_write(bulk, ordered=False)
bulk.clear()
if n % 100 == 0 or n == 1:
print(f" [{n:>5}/{total}] refetched={refetched} "
f"text={txt} html={html} still_empty={still_empty} "
f"missing={missing} err={err}",
flush=True)
finally:
cursor.close()
if bulk:
col.bulk_write(bulk, ordered=False)
print(f" [{n}/{total}] DONE refetched={refetched} text={txt} html={html} "
f"still_empty={still_empty} missing={missing} err={err}")
return {"mailbox": mailbox, "candidates": total, "refetched": refetched,
"text": txt, "html": html, "still_empty": still_empty,
"errors": err, "missing": missing}
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--mailbox", help="Jedna konkretni schranka (default: vsechny)")
ap.add_argument("--limit", type=int, help="Limit emailu na schranku (test)")
args = ap.parse_args()
t0 = time.time()
print("Pripojuji se k MongoDB...")
mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
mongo.admin.command("ping")
db = mongo[MONGO_DB]
print("Token Graph API...")
get_token()
print("OK\n")
if args.mailbox:
mailboxes = [args.mailbox]
else:
mailboxes = [c for c in db.list_collection_names() if c not in SKIP_COLLECTIONS]
print(f"Schranky ({len(mailboxes)}): {mailboxes}\n")
results = []
for mb in mailboxes:
results.append(process_mailbox(db[mb], mb, limit=args.limit))
print()
print("=== SHRNUTI ===")
for r in results:
print(f" {r['mailbox']}: candidates={r['candidates']} "
f"refetched={r['refetched']} text={r['text']} html={r['html']} "
f"still_empty={r['still_empty']} missing={r['missing']} errors={r['errors']}")
print(f"\nCelkem trvalo: {time.time() - t0:.1f} s")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except KeyboardInterrupt:
print("\nPreruseno uzivatelem")
except Exception:
import traceback
traceback.print_exc()
sys.exit(1)