""" ============================================================================== Skript: 1b_parse_emails_graph_delta_v1.0.py Verze: 1.0 Datum: 2026-06-04 Autor: vladimir.buzalka Popis: Inkrementalni sync emailu pres Microsoft Graph DELTA QUERY. Sourozenec `1_parse_emails_graph_v1.4.py` — kazdy resi jiny use case: 1_parse_emails_graph_v1.4.py = prvni plny import schranky 1b_parse_emails_graph_delta_v1.0.py = pravidelny sync (zmeny od minula) Delta query je server-side change tracking — Graph si pamatuje "zalozku" (deltaLink) a vraci jen to, co se od ni zmenilo: - nove zpravy - zmeny existujicich (isRead, flag, presun do jine slozky, kategorie) - SMAZANE zpravy (@removed) — definitivne smazane, nikoli v kosi Pro mail v "Deleted Items" delta nic specialniho nedela — je to porad normalni zprava, jen s folder_path="Deleted Items". @removed prijde az kdyz uzivatel vysype kos / Shift+Del. State: Kolekce `emaily.sync_state`, _id = "|". { mailbox, folder_id, folder_path, delta_link, # plny URL s $deltatoken na pristi beh last_run_at, cumulative_new, cumulative_sync, cumulative_removed } Permanentne smazane zpravy: Skript je NEMAZE z Mongo. Pouze nastavi: permanently_deleted: True permanently_deleted_at: Dohledani: col.find({"permanently_deleted": True}) Reuse: Funkce extract_message / extract_sync_fields se nactou primo z modulu 1_parse_emails_graph_v1.4.py (importlib, file-based), aby se logika extrahce nikdy nerozesla. Spousteni: python 1b_parse_emails_graph_delta_v1.0.py # VSECHNY schranky (mimo SKIP_MAILBOXES) python 1b_parse_emails_graph_delta_v1.0.py --mailbox ordinace@buzalkova.cz # jedna schranka python 1b_parse_emails_graph_delta_v1.0.py --mailbox ordinace@buzalkova.cz --folder Inbox python 1b_parse_emails_graph_delta_v1.0.py --reset # zahodit deltaLinky a najet znova python 1b_parse_emails_graph_delta_v1.0.py --dry-run # nic neulozit SKIP_MAILBOXES (hardcoded): vbuzalka@its.jnj.com — JNJ tenant, nemame Graph API pristup. Pro tuto schranku je nutny samostatny skript (lokalni .msg). Zavislosti: msal, requests, pymongo, python-dateutil Python 3.10+ ============================================================================== """ from __future__ import annotations import argparse import importlib.util import logging import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Optional import msal import requests from pymongo import MongoClient, ASCENDING if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8", errors="replace") # ─── KONFIGURACE ────────────────────────────────────────────────────────────── GRAPH_TENANT_ID = "7d269944-37a4-43a1-8140-c7517dc426e9" GRAPH_CLIENT_ID = "4b222bfd-78c9-4239-a53f-43006b3ed07f" GRAPH_CLIENT_SECRET = "Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk" GRAPH_URL = "https://graph.microsoft.com/v1.0" MONGO_URI = "mongodb://192.168.1.76:27017" MONGO_DB = "emaily" SYNC_STATE_COL = "sync_state" PAGE_SIZE = 100 # delta endpoint typicky vraci max 100/stranka LOG_FILE = Path(__file__).parent / "delta_errors.log" SCRIPT_VERSION = "1.0" # Kolekce v `emaily` ktere NEJSOU mailboxy: NON_MAILBOX_COLLECTIONS = {"attachments_index", "sync_state"} # Schranky, kde NEMAME Graph API pristup — pri bezneho behu se preskoci. # Pro tyto je nutny separatni skript (napr. lokalni .msg parser). SKIP_MAILBOXES = { "vbuzalka@its.jnj.com", # JNJ tenant — nemame Graph credentials } logging.basicConfig( filename=str(LOG_FILE), level=logging.ERROR, format="%(asctime)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", encoding="utf-8", ) # Co tahnout z delta endpointu (stejne jako MSG_SELECT v v1.4, mimo internetMessageHeaders # ktere delta neumi vratit pro vsechny polozky — pro nove zpravy si je dotahneme # samostatnym fetchem). DELTA_SELECT = ( "id,internetMessageId,subject,bodyPreview,body," "importance,isRead,isDraft,hasAttachments," "receivedDateTime,sentDateTime,createdDateTime,lastModifiedDateTime," "sender,from,toRecipients,ccRecipients,bccRecipients,replyTo," "conversationId,conversationIndex,parentFolderId," "categories,flag,inferenceClassification" ) # Pro plne nacteni nove zpravy (vcetne hlavicek + priloh) pouzijeme stejny # select+expand jako v1.4 FULL_FETCH_SELECT = ( "id,internetMessageId,subject,bodyPreview,body," "importance,isRead,isDraft,hasAttachments," "receivedDateTime,sentDateTime,createdDateTime,lastModifiedDateTime," "sender,from,toRecipients,ccRecipients,bccRecipients,replyTo," "conversationId,conversationIndex,parentFolderId," "categories,flag,inferenceClassification,internetMessageHeaders" ) FULL_FETCH_EXPAND = "attachments($select=id,name,contentType,size,isInline)" # ─── Reuse extract logiky z v1.4 ────────────────────────────────────────────── _HERE = Path(__file__).parent _V14_PATH = _HERE / "1_parse_emails_graph_v1.4.py" if not _V14_PATH.exists(): print(f"CHYBA: chybi sourozenec {_V14_PATH.name} — extract logiku nelze nacist", file=sys.stderr) sys.exit(1) _spec = importlib.util.spec_from_file_location("v14_parse", _V14_PATH) _v14 = importlib.util.module_from_spec(_spec) _spec.loader.exec_module(_v14) extract_message = _v14.extract_message extract_sync_fields = _v14.extract_sync_fields # GRAPH_MAILBOX modul-level v v1.4 — pro extract neni potreba, ale pro # konzistenci nastavujeme ho v main() # ─── Graph API ──────────────────────────────────────────────────────────────── _graph_token: Optional[str] = None def get_token() -> str: global _graph_token app = msal.ConfidentialClientApplication( GRAPH_CLIENT_ID, authority=f"https://login.microsoftonline.com/{GRAPH_TENANT_ID}", client_credential=GRAPH_CLIENT_SECRET, ) result = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"]) if "access_token" not in result: raise RuntimeError(f"Graph auth failed: {result}") _graph_token = result["access_token"] return _graph_token class DeltaExpired(Exception): """deltaLink expiroval (HTTP 410) — je nutne zacit od plne delta znovu.""" def graph_get(url: str, params: dict = None, allow_410: bool = False) -> dict: """GET na Graph s retry pri 401. Pri 410 a allow_410=True vyhodi DeltaExpired.""" global _graph_token if not _graph_token: get_token() for attempt in range(3): r = requests.get( url, headers={"Authorization": f"Bearer {_graph_token}"}, params=params, timeout=60, ) if r.status_code == 401: get_token() continue if r.status_code == 410 and allow_410: raise DeltaExpired(url) if r.status_code == 429: # rate limit — respect Retry-After wait = int(r.headers.get("Retry-After", "5")) print(f" [429] cekam {wait}s ...") time.sleep(wait) continue r.raise_for_status() return r.json() raise RuntimeError(f"Graph GET failed after retries: {url}") def get_all_folders(mailbox: str, parent_id: str = None, parent_path: str = "") -> list[dict]: if parent_id is None: url = f"{GRAPH_URL}/users/{mailbox}/mailFolders" else: url = f"{GRAPH_URL}/users/{mailbox}/mailFolders/{parent_id}/childFolders" folders = [] params = {"$top": 100, "$select": "id,displayName,childFolderCount"} while url: data = graph_get(url, params) for f in data.get("value", []): path = f"{parent_path}/{f['displayName']}".lstrip("/") folders.append({"id": f["id"], "path": path}) if f.get("childFolderCount", 0) > 0: folders.extend(get_all_folders(mailbox, f["id"], path)) url = data.get("@odata.nextLink") params = None return folders def fetch_full_message(mailbox: str, msg_id: str) -> Optional[dict]: """Stahne celou zpravu vcetne hlavicek a priloh — pro nove zpravy zachycene v delte.""" url = f"{GRAPH_URL}/users/{mailbox}/messages/{msg_id}" params = {"$select": FULL_FETCH_SELECT, "$expand": FULL_FETCH_EXPAND} try: return graph_get(url, params) except requests.HTTPError as e: logging.error("fetch_full_message %s: %s", msg_id, e) return None # ─── Delta iterace ──────────────────────────────────────────────────────────── def iter_folder_delta(mailbox: str, folder_id: str, delta_link: Optional[str], limit: int = 0): """ Generator: vraci (item, final_delta_link). item je dict s polozkou (bud zmena nebo {'@removed': ...}). Posledni vyhozeny tuple ma final_delta_link != None (zbytek None). Pri HTTP 410 (expirovany deltaLink) vyhodi DeltaExpired — caller ma pustit znova s delta_link=None (= fresh full delta). """ if delta_link: url = delta_link params = None else: url = f"{GRAPH_URL}/users/{mailbox}/mailFolders/{folder_id}/messages/delta" params = {"$select": DELTA_SELECT, "$top": PAGE_SIZE} n = 0 while url: data = graph_get(url, params, allow_410=True) params = None for item in data.get("value", []): yield item, None n += 1 if limit and n >= limit: # ulozime aspon stavajici nextLink jako "delta" — neni to ciste, # ale pri --limit jde o test, takze pristi beh proste pocnize znovu return next_link = data.get("@odata.nextLink") final_link = data.get("@odata.deltaLink") if final_link: # konec — predame final delta yield None, final_link return url = next_link # ─── Per-folder sync ────────────────────────────────────────────────────────── def sync_folder(col, sync_col, mailbox: str, folder: dict, dry_run: bool, limit: int) -> dict: """Vrati statistiky.""" fid = folder["id"] fpath = folder["path"] state_id = f"{mailbox}|{fid}" state = sync_col.find_one({"_id": state_id}) delta_link = state.get("delta_link") if state else None is_first_run = delta_link is None label = "FRESH" if is_first_run else "DELTA" print(f"\n[{label}] {fpath}") stats = {"new": 0, "sync": 0, "removed": 0, "errors": 0} final_delta = None try: gen = iter_folder_delta(mailbox, fid, delta_link, limit=limit) for item, fin in gen: if fin: final_delta = fin break try: process_item(col, mailbox, fpath, item, stats, dry_run) except Exception as e: stats["errors"] += 1 logging.error("process_item %s: %s", item.get("id", "?"), e) except DeltaExpired: print(f" [410] deltaLink expiroval — restart od fresh delta") # rekurzivni restart s vymazanym statem sync_col.delete_one({"_id": state_id}) return sync_folder(col, sync_col, mailbox, folder, dry_run, limit) print(f" new={stats['new']} sync={stats['sync']} removed={stats['removed']} err={stats['errors']}") # Ulozit sync_state pokud mame final_delta a neni dry run if final_delta and not dry_run: sync_col.update_one( {"_id": state_id}, { "$set": { "mailbox": mailbox, "folder_id": fid, "folder_path": fpath, "delta_link": final_delta, "last_run_at": datetime.now(timezone.utc).replace(tzinfo=None), }, "$inc": { "cumulative_new": stats["new"], "cumulative_sync": stats["sync"], "cumulative_removed": stats["removed"], "run_count": 1, }, }, upsert=True, ) elif not final_delta: # neprisel deltaLink (napr. limit nebo chyba) — nemenime state, pristi beh # bude pokracovat normalne podle stareho deltaLinku nebo zacne od fresh if not is_first_run: print(f" [pozn] delta neukoncena — pristi beh pojede od ulozeneho deltaLinku") return stats def process_item(col, mailbox: str, folder_path: str, item: dict, stats: dict, dry_run: bool): """Zpracuje jednu polozku z delta odpovedi.""" # 1) Smazana zprava (@removed) if "@removed" in item or item.get("@removed.reason"): graph_id = item.get("id") if not graph_id: return if dry_run: print(f" REMOVED graph_id={graph_id[:30]}...") else: col.update_one( {"graph_id": graph_id}, {"$set": { "permanently_deleted": True, "permanently_deleted_at": datetime.now(timezone.utc).replace(tzinfo=None), }}, ) stats["removed"] += 1 return # 2) Nova nebo zmenena zprava — rozhodneme podle existence graph_id v Mongo graph_id = item.get("id") if not graph_id: return existing = col.find_one({"graph_id": graph_id}, {"_id": 1}) if existing: # Existujici zprava — update jen sync poli (delta payload je obsahuje) fields = extract_sync_fields(item, folder_path) if dry_run: print(f" SYNC {item.get('subject','')[:60]}") else: col.update_one({"_id": existing["_id"]}, {"$set": fields}) stats["sync"] += 1 else: # Nova zprava — pro telo+attachments+headers fetchneme plnou verzi full = fetch_full_message(mailbox, graph_id) if full is None: stats["errors"] += 1 return doc = extract_message(full, folder_path) if doc is None: stats["errors"] += 1 return if dry_run: print(f" NEW {doc.get('subject','')[:60]}") else: col.update_one({"_id": doc["_id"]}, {"$set": doc}, upsert=True) stats["new"] += 1 # ─── Indexy pro sync_state ──────────────────────────────────────────────────── def ensure_sync_state_indexes(sync_col): sync_col.create_index([("mailbox", ASCENDING), ("folder_id", ASCENDING)]) sync_col.create_index([("last_run_at", ASCENDING)]) def ensure_perm_deleted_index(col): col.create_index([("permanently_deleted", ASCENDING)], sparse=True) # ─── Main ───────────────────────────────────────────────────────────────────── def discover_mailboxes(db) -> list[str]: """Vrati seznam mailboxu = vsechny kolekce v `emaily` mimo NON_MAILBOX_COLLECTIONS a SKIP_MAILBOXES.""" out = [] for name in sorted(db.list_collection_names()): if name in NON_MAILBOX_COLLECTIONS: continue if name in SKIP_MAILBOXES: print(f" [skip] {name} — v SKIP_MAILBOXES (neni Graph pristup)") continue out.append(name) return out def sync_mailbox(client, mailbox: str, args) -> dict: """Sync jedne schranky. Vraci totals dict.""" _v14.GRAPH_MAILBOX = mailbox print(f"\n========== {mailbox} ==========") col = client[MONGO_DB][mailbox] sync_col = client[MONGO_DB][SYNC_STATE_COL] if not args.dry_run: ensure_sync_state_indexes(sync_col) ensure_perm_deleted_index(col) if args.reset: n = sync_col.delete_many({"mailbox": mailbox}).deleted_count print(f" --reset: smazano {n} deltaLinku pro {mailbox}") print("Nacitam seznam slozek...") try: folders = get_all_folders(mailbox) except requests.HTTPError as e: print(f" CHYBA: nelze nacist slozky pro {mailbox}: {e}") logging.error("get_all_folders %s: %s", mailbox, e) return {"new": 0, "sync": 0, "removed": 0, "errors": 1} if args.folder: folders = [f for f in folders if args.folder.lower() in f["path"].lower()] print(f" Slozek ke zpracovani: {len(folders)}") totals = {"new": 0, "sync": 0, "removed": 0, "errors": 0} for folder in folders: s = sync_folder(col, sync_col, mailbox, folder, args.dry_run, args.limit) for k in totals: totals[k] += s[k] print(f" -> mailbox total: new={totals['new']} sync={totals['sync']} removed={totals['removed']} err={totals['errors']}") return totals def main(): ap = argparse.ArgumentParser(description=f"parse_emails_graph delta sync v{SCRIPT_VERSION}") ap.add_argument("--mailbox", default="", help="E-mail schranky (= kolekce v Mongo). " "Bez argumentu projede vsechny schranky z `emaily` (mimo SKIP_MAILBOXES).") ap.add_argument("--folder", default="", help="Filtruje slozky obsahujici tento retezec (default: vsechny)") ap.add_argument("--limit", type=int, default=0, help="Max polozek na slozku (test)") ap.add_argument("--reset", action="store_true", help="Smaze deltaLinky pro vybrane schranky — pristi beh zacne od fresh delta") ap.add_argument("--dry-run", action="store_true", help="Nic neulozi do Mongo, jen vypise co by se stalo") args = ap.parse_args() print(f"=== Delta sync v{SCRIPT_VERSION} ===") if args.dry_run: print(" DRY-RUN — zadne zmeny v Mongo") print("Pripojuji se k MongoDB...") client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) client.admin.command("ping") db = client[MONGO_DB] if args.mailbox: if args.mailbox in SKIP_MAILBOXES: print(f" CHYBA: {args.mailbox} je v SKIP_MAILBOXES — neni Graph pristup.") sys.exit(2) mailboxes = [args.mailbox] else: mailboxes = discover_mailboxes(db) print(f" Schranky ke zpracovani: {len(mailboxes)}") for m in mailboxes: print(f" {m}") print("Token Graph API...") get_token() print(" OK") t0 = time.time() grand = {"new": 0, "sync": 0, "removed": 0, "errors": 0} per_mailbox = [] for mb in mailboxes: try: s = sync_mailbox(client, mb, args) except Exception as e: print(f" FATAL pri sync {mb}: {e}") logging.error("sync_mailbox %s: %s", mb, e) s = {"new": 0, "sync": 0, "removed": 0, "errors": 1} per_mailbox.append((mb, s)) for k in grand: grand[k] += s[k] dt = time.time() - t0 print(f"\n=== SHRNUTI ===") for mb, s in per_mailbox: print(f" {mb:40} new={s['new']:>5} sync={s['sync']:>5} removed={s['removed']:>4} err={s['errors']:>3}") print(f" {'TOTAL':40} new={grand['new']:>5} sync={grand['sync']:>5} removed={grand['removed']:>4} err={grand['errors']:>3}") print(f" trvalo: {dt:.1f} s") return 1 if grand["errors"] > 0 else 0 if __name__ == "__main__": sys.exit(main() or 0)