Files
janssen/Python-runner/1b_parse_emails_graph_delta_v1.1.py
2026-06-14 08:25:15 +02:00

524 lines
20 KiB
Python

"""
==============================================================================
Skript: 1b_parse_emails_graph_delta_v1.1.py
Verze: 1.1
Datum: 2026-06-10
Autor: vladimir.buzalka
Zmeny v1.1 (2026-06-10):
- Bugfix: NON_MAILBOX_COLLECTIONS rozsireno o "jnj_messages" a
"jnj_sync_state" (pomocne kolekce JNJ folder trackingu). Predtim je
discover_mailboxes bral jako schranky -> Graph 404 na
/users/jnj_messages/mailFolders -> cely krok 1b FAIL(1) pri kazdem behu.
Popis:
Inkrementalni sync emailu pres Microsoft Graph DELTA QUERY.
Sourozenec `1_parse_emails_graph_v1.4.py` — kazdy resi jiny use case:
1_parse_emails_graph_v1.4.py = prvni plny import schranky
1b_parse_emails_graph_delta_v1.1.py = pravidelny sync (zmeny od minula)
Delta query je server-side change tracking — Graph si pamatuje "zalozku"
(deltaLink) a vraci jen to, co se od ni zmenilo:
- nove zpravy
- zmeny existujicich (isRead, flag, presun do jine slozky, kategorie)
- SMAZANE zpravy (@removed) — definitivne smazane, nikoli v kosi
Pro mail v "Deleted Items" delta nic specialniho nedela — je to porad
normalni zprava, jen s folder_path="Deleted Items". @removed prijde az
kdyz uzivatel vysype kos / Shift+Del.
State:
Kolekce `emaily.sync_state`, _id = "<mailbox>|<folder_id>".
{
mailbox, folder_id, folder_path,
delta_link, # plny URL s $deltatoken na pristi beh
last_run_at,
cumulative_new, cumulative_sync, cumulative_removed
}
Permanentne smazane zpravy:
Skript je NEMAZE z Mongo. Pouze nastavi:
permanently_deleted: True
permanently_deleted_at: <UTC datetime detekce>
Dohledani: col.find({"permanently_deleted": True})
Reuse:
Funkce extract_message / extract_sync_fields se nactou primo z modulu
1_parse_emails_graph_v1.4.py (importlib, file-based), aby se logika
extrahce nikdy nerozesla.
Spousteni:
python 1b_parse_emails_graph_delta_v1.1.py # VSECHNY schranky (mimo SKIP_MAILBOXES)
python 1b_parse_emails_graph_delta_v1.1.py --mailbox ordinace@buzalkova.cz # jedna schranka
python 1b_parse_emails_graph_delta_v1.1.py --mailbox ordinace@buzalkova.cz --folder Inbox
python 1b_parse_emails_graph_delta_v1.1.py --reset # zahodit deltaLinky a najet znova
python 1b_parse_emails_graph_delta_v1.1.py --dry-run # nic neulozit
SKIP_MAILBOXES (hardcoded):
vbuzalka@its.jnj.com — JNJ tenant, nemame Graph API pristup. Pro tuto
schranku je nutny samostatny skript (lokalni .msg).
Zavislosti:
msal, requests, pymongo, python-dateutil
Python 3.10+
==============================================================================
"""
from __future__ import annotations
import argparse
import importlib.util
import logging
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import msal
import requests
from pymongo import MongoClient, ASCENDING
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
# ─── KONFIGURACE ──────────────────────────────────────────────────────────────
GRAPH_TENANT_ID = "7d269944-37a4-43a1-8140-c7517dc426e9"
GRAPH_CLIENT_ID = "4b222bfd-78c9-4239-a53f-43006b3ed07f"
GRAPH_CLIENT_SECRET = "Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk"
GRAPH_URL = "https://graph.microsoft.com/v1.0"
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "emaily"
SYNC_STATE_COL = "sync_state"
PAGE_SIZE = 100 # delta endpoint typicky vraci max 100/stranka
LOG_FILE = Path(__file__).parent / "delta_errors.log"
SCRIPT_VERSION = "1.1"
# Kolekce v `emaily` ktere NEJSOU mailboxy:
# (jnj_messages + jnj_sync_state = pomocne kolekce JNJ folder trackingu,
# bez exclude je discover_mailboxes bere jako schranky -> Graph 404 -> FAIL)
NON_MAILBOX_COLLECTIONS = {"attachments_index", "sync_state",
"jnj_messages", "jnj_sync_state"}
# Schranky, kde NEMAME Graph API pristup — pri bezneho behu se preskoci.
# Pro tyto je nutny separatni skript (napr. lokalni .msg parser).
SKIP_MAILBOXES = {
"vbuzalka@its.jnj.com", # JNJ tenant — nemame Graph credentials
}
logging.basicConfig(
filename=str(LOG_FILE),
level=logging.ERROR,
format="%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
encoding="utf-8",
)
# Co tahnout z delta endpointu (stejne jako MSG_SELECT v v1.4, mimo internetMessageHeaders
# ktere delta neumi vratit pro vsechny polozky — pro nove zpravy si je dotahneme
# samostatnym fetchem).
DELTA_SELECT = (
"id,internetMessageId,subject,bodyPreview,body,"
"importance,isRead,isDraft,hasAttachments,"
"receivedDateTime,sentDateTime,createdDateTime,lastModifiedDateTime,"
"sender,from,toRecipients,ccRecipients,bccRecipients,replyTo,"
"conversationId,conversationIndex,parentFolderId,"
"categories,flag,inferenceClassification"
)
# Pro plne nacteni nove zpravy (vcetne hlavicek + priloh) pouzijeme stejny
# select+expand jako v1.4
FULL_FETCH_SELECT = (
"id,internetMessageId,subject,bodyPreview,body,"
"importance,isRead,isDraft,hasAttachments,"
"receivedDateTime,sentDateTime,createdDateTime,lastModifiedDateTime,"
"sender,from,toRecipients,ccRecipients,bccRecipients,replyTo,"
"conversationId,conversationIndex,parentFolderId,"
"categories,flag,inferenceClassification,internetMessageHeaders"
)
FULL_FETCH_EXPAND = "attachments($select=id,name,contentType,size,isInline)"
# ─── Reuse extract logiky z v1.4 ──────────────────────────────────────────────
_HERE = Path(__file__).parent
_V14_PATH = _HERE / "1_parse_emails_graph_v1.4.py"
if not _V14_PATH.exists():
print(f"CHYBA: chybi sourozenec {_V14_PATH.name} — extract logiku nelze nacist", file=sys.stderr)
sys.exit(1)
_spec = importlib.util.spec_from_file_location("v14_parse", _V14_PATH)
_v14 = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(_v14)
extract_message = _v14.extract_message
extract_sync_fields = _v14.extract_sync_fields
# GRAPH_MAILBOX modul-level v v1.4 — pro extract neni potreba, ale pro
# konzistenci nastavujeme ho v main()
# ─── Graph API ────────────────────────────────────────────────────────────────
_graph_token: Optional[str] = None
def get_token() -> str:
global _graph_token
app = msal.ConfidentialClientApplication(
GRAPH_CLIENT_ID,
authority=f"https://login.microsoftonline.com/{GRAPH_TENANT_ID}",
client_credential=GRAPH_CLIENT_SECRET,
)
result = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])
if "access_token" not in result:
raise RuntimeError(f"Graph auth failed: {result}")
_graph_token = result["access_token"]
return _graph_token
class DeltaExpired(Exception):
"""deltaLink expiroval (HTTP 410) — je nutne zacit od plne delta znovu."""
def graph_get(url: str, params: dict = None, allow_410: bool = False) -> dict:
"""GET na Graph s retry pri 401. Pri 410 a allow_410=True vyhodi DeltaExpired."""
global _graph_token
if not _graph_token:
get_token()
for attempt in range(3):
r = requests.get(
url,
headers={"Authorization": f"Bearer {_graph_token}"},
params=params,
timeout=60,
)
if r.status_code == 401:
get_token()
continue
if r.status_code == 410 and allow_410:
raise DeltaExpired(url)
if r.status_code == 429:
# rate limit — respect Retry-After
wait = int(r.headers.get("Retry-After", "5"))
print(f" [429] cekam {wait}s ...")
time.sleep(wait)
continue
r.raise_for_status()
return r.json()
raise RuntimeError(f"Graph GET failed after retries: {url}")
def get_all_folders(mailbox: str, parent_id: str = None, parent_path: str = "") -> list[dict]:
if parent_id is None:
url = f"{GRAPH_URL}/users/{mailbox}/mailFolders"
else:
url = f"{GRAPH_URL}/users/{mailbox}/mailFolders/{parent_id}/childFolders"
folders = []
params = {"$top": 100, "$select": "id,displayName,childFolderCount"}
while url:
data = graph_get(url, params)
for f in data.get("value", []):
path = f"{parent_path}/{f['displayName']}".lstrip("/")
folders.append({"id": f["id"], "path": path})
if f.get("childFolderCount", 0) > 0:
folders.extend(get_all_folders(mailbox, f["id"], path))
url = data.get("@odata.nextLink")
params = None
return folders
def fetch_full_message(mailbox: str, msg_id: str) -> Optional[dict]:
"""Stahne celou zpravu vcetne hlavicek a priloh — pro nove zpravy zachycene v delte."""
url = f"{GRAPH_URL}/users/{mailbox}/messages/{msg_id}"
params = {"$select": FULL_FETCH_SELECT, "$expand": FULL_FETCH_EXPAND}
try:
return graph_get(url, params)
except requests.HTTPError as e:
logging.error("fetch_full_message %s: %s", msg_id, e)
return None
# ─── Delta iterace ────────────────────────────────────────────────────────────
def iter_folder_delta(mailbox: str, folder_id: str, delta_link: Optional[str], limit: int = 0):
"""
Generator: vraci (item, final_delta_link).
item je dict s polozkou (bud zmena nebo {'@removed': ...}).
Posledni vyhozeny tuple ma final_delta_link != None (zbytek None).
Pri HTTP 410 (expirovany deltaLink) vyhodi DeltaExpired — caller ma
pustit znova s delta_link=None (= fresh full delta).
"""
if delta_link:
url = delta_link
params = None
else:
url = f"{GRAPH_URL}/users/{mailbox}/mailFolders/{folder_id}/messages/delta"
params = {"$select": DELTA_SELECT, "$top": PAGE_SIZE}
n = 0
while url:
data = graph_get(url, params, allow_410=True)
params = None
for item in data.get("value", []):
yield item, None
n += 1
if limit and n >= limit:
# ulozime aspon stavajici nextLink jako "delta" — neni to ciste,
# ale pri --limit jde o test, takze pristi beh proste pocnize znovu
return
next_link = data.get("@odata.nextLink")
final_link = data.get("@odata.deltaLink")
if final_link:
# konec — predame final delta
yield None, final_link
return
url = next_link
# ─── Per-folder sync ──────────────────────────────────────────────────────────
def sync_folder(col, sync_col, mailbox: str, folder: dict, dry_run: bool, limit: int) -> dict:
"""Vrati statistiky."""
fid = folder["id"]
fpath = folder["path"]
state_id = f"{mailbox}|{fid}"
state = sync_col.find_one({"_id": state_id})
delta_link = state.get("delta_link") if state else None
is_first_run = delta_link is None
label = "FRESH" if is_first_run else "DELTA"
print(f"\n[{label}] {fpath}")
stats = {"new": 0, "sync": 0, "removed": 0, "errors": 0}
final_delta = None
try:
gen = iter_folder_delta(mailbox, fid, delta_link, limit=limit)
for item, fin in gen:
if fin:
final_delta = fin
break
try:
process_item(col, mailbox, fpath, item, stats, dry_run)
except Exception as e:
stats["errors"] += 1
logging.error("process_item %s: %s", item.get("id", "?"), e)
except DeltaExpired:
print(f" [410] deltaLink expiroval — restart od fresh delta")
# rekurzivni restart s vymazanym statem
sync_col.delete_one({"_id": state_id})
return sync_folder(col, sync_col, mailbox, folder, dry_run, limit)
print(f" new={stats['new']} sync={stats['sync']} removed={stats['removed']} err={stats['errors']}")
# Ulozit sync_state pokud mame final_delta a neni dry run
if final_delta and not dry_run:
sync_col.update_one(
{"_id": state_id},
{
"$set": {
"mailbox": mailbox,
"folder_id": fid,
"folder_path": fpath,
"delta_link": final_delta,
"last_run_at": datetime.now(timezone.utc).replace(tzinfo=None),
},
"$inc": {
"cumulative_new": stats["new"],
"cumulative_sync": stats["sync"],
"cumulative_removed": stats["removed"],
"run_count": 1,
},
},
upsert=True,
)
elif not final_delta:
# neprisel deltaLink (napr. limit nebo chyba) — nemenime state, pristi beh
# bude pokracovat normalne podle stareho deltaLinku nebo zacne od fresh
if not is_first_run:
print(f" [pozn] delta neukoncena — pristi beh pojede od ulozeneho deltaLinku")
return stats
def process_item(col, mailbox: str, folder_path: str, item: dict, stats: dict, dry_run: bool):
"""Zpracuje jednu polozku z delta odpovedi."""
# 1) Smazana zprava (@removed)
if "@removed" in item or item.get("@removed.reason"):
graph_id = item.get("id")
if not graph_id:
return
if dry_run:
print(f" REMOVED graph_id={graph_id[:30]}...")
else:
col.update_one(
{"graph_id": graph_id},
{"$set": {
"permanently_deleted": True,
"permanently_deleted_at": datetime.now(timezone.utc).replace(tzinfo=None),
}},
)
stats["removed"] += 1
return
# 2) Nova nebo zmenena zprava — rozhodneme podle existence graph_id v Mongo
graph_id = item.get("id")
if not graph_id:
return
existing = col.find_one({"graph_id": graph_id}, {"_id": 1})
if existing:
# Existujici zprava — update jen sync poli (delta payload je obsahuje)
fields = extract_sync_fields(item, folder_path)
if dry_run:
print(f" SYNC {item.get('subject','')[:60]}")
else:
col.update_one({"_id": existing["_id"]}, {"$set": fields})
stats["sync"] += 1
else:
# Nova zprava — pro telo+attachments+headers fetchneme plnou verzi
full = fetch_full_message(mailbox, graph_id)
if full is None:
stats["errors"] += 1
return
doc = extract_message(full, folder_path)
if doc is None:
stats["errors"] += 1
return
if dry_run:
print(f" NEW {doc.get('subject','')[:60]}")
else:
col.update_one({"_id": doc["_id"]}, {"$set": doc}, upsert=True)
stats["new"] += 1
# ─── Indexy pro sync_state ────────────────────────────────────────────────────
def ensure_sync_state_indexes(sync_col):
sync_col.create_index([("mailbox", ASCENDING), ("folder_id", ASCENDING)])
sync_col.create_index([("last_run_at", ASCENDING)])
def ensure_perm_deleted_index(col):
col.create_index([("permanently_deleted", ASCENDING)], sparse=True)
# ─── Main ─────────────────────────────────────────────────────────────────────
def discover_mailboxes(db) -> list[str]:
"""Vrati seznam mailboxu = vsechny kolekce v `emaily` mimo NON_MAILBOX_COLLECTIONS
a SKIP_MAILBOXES."""
out = []
for name in sorted(db.list_collection_names()):
if name in NON_MAILBOX_COLLECTIONS:
continue
if name in SKIP_MAILBOXES:
print(f" [skip] {name} — v SKIP_MAILBOXES (neni Graph pristup)")
continue
out.append(name)
return out
def sync_mailbox(client, mailbox: str, args) -> dict:
"""Sync jedne schranky. Vraci totals dict."""
_v14.GRAPH_MAILBOX = mailbox
print(f"\n========== {mailbox} ==========")
col = client[MONGO_DB][mailbox]
sync_col = client[MONGO_DB][SYNC_STATE_COL]
if not args.dry_run:
ensure_sync_state_indexes(sync_col)
ensure_perm_deleted_index(col)
if args.reset:
n = sync_col.delete_many({"mailbox": mailbox}).deleted_count
print(f" --reset: smazano {n} deltaLinku pro {mailbox}")
print("Nacitam seznam slozek...")
try:
folders = get_all_folders(mailbox)
except requests.HTTPError as e:
print(f" CHYBA: nelze nacist slozky pro {mailbox}: {e}")
logging.error("get_all_folders %s: %s", mailbox, e)
return {"new": 0, "sync": 0, "removed": 0, "errors": 1}
if args.folder:
folders = [f for f in folders if args.folder.lower() in f["path"].lower()]
print(f" Slozek ke zpracovani: {len(folders)}")
totals = {"new": 0, "sync": 0, "removed": 0, "errors": 0}
for folder in folders:
s = sync_folder(col, sync_col, mailbox, folder, args.dry_run, args.limit)
for k in totals:
totals[k] += s[k]
print(f" -> mailbox total: new={totals['new']} sync={totals['sync']} removed={totals['removed']} err={totals['errors']}")
return totals
def main():
ap = argparse.ArgumentParser(description=f"parse_emails_graph delta sync v{SCRIPT_VERSION}")
ap.add_argument("--mailbox", default="",
help="E-mail schranky (= kolekce v Mongo). "
"Bez argumentu projede vsechny schranky z `emaily` (mimo SKIP_MAILBOXES).")
ap.add_argument("--folder", default="", help="Filtruje slozky obsahujici tento retezec (default: vsechny)")
ap.add_argument("--limit", type=int, default=0, help="Max polozek na slozku (test)")
ap.add_argument("--reset", action="store_true",
help="Smaze deltaLinky pro vybrane schranky — pristi beh zacne od fresh delta")
ap.add_argument("--dry-run", action="store_true", help="Nic neulozi do Mongo, jen vypise co by se stalo")
args = ap.parse_args()
print(f"=== Delta sync v{SCRIPT_VERSION} ===")
if args.dry_run:
print(" DRY-RUN — zadne zmeny v Mongo")
print("Pripojuji se k MongoDB...")
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
client.admin.command("ping")
db = client[MONGO_DB]
if args.mailbox:
if args.mailbox in SKIP_MAILBOXES:
print(f" CHYBA: {args.mailbox} je v SKIP_MAILBOXES — neni Graph pristup.")
sys.exit(2)
mailboxes = [args.mailbox]
else:
mailboxes = discover_mailboxes(db)
print(f" Schranky ke zpracovani: {len(mailboxes)}")
for m in mailboxes:
print(f" {m}")
print("Token Graph API...")
get_token()
print(" OK")
t0 = time.time()
grand = {"new": 0, "sync": 0, "removed": 0, "errors": 0}
per_mailbox = []
for mb in mailboxes:
try:
s = sync_mailbox(client, mb, args)
except Exception as e:
print(f" FATAL pri sync {mb}: {e}")
logging.error("sync_mailbox %s: %s", mb, e)
s = {"new": 0, "sync": 0, "removed": 0, "errors": 1}
per_mailbox.append((mb, s))
for k in grand:
grand[k] += s[k]
dt = time.time() - t0
print(f"\n=== SHRNUTI ===")
for mb, s in per_mailbox:
print(f" {mb:40} new={s['new']:>5} sync={s['sync']:>5} removed={s['removed']:>4} err={s['errors']:>3}")
print(f" {'TOTAL':40} new={grand['new']:>5} sync={grand['sync']:>5} removed={grand['removed']:>4} err={grand['errors']:>3}")
print(f" trvalo: {dt:.1f} s")
return 1 if grand["errors"] > 0 else 0
if __name__ == "__main__":
sys.exit(main() or 0)