""" parse_emails_graph_v1.1.py Nazev: parse_emails_graph_v1.1.py Verze: 1.1 Datum: 2026-06-02 Autor: vladimir.buzalka Popis: Cte vsechny emaily ze schranky ordinace@buzalkova.cz primo pres Microsoft Graph API a importuje je jako dokumenty do MongoDB. Ze kazde zpravy extrahuje vsechny dostupne vlastnosti: - predmet, odesilatel, prijemci (To/CC/BCC s typy) - cas doruceni, odeslani, vytvoreni, modifikace (UTC) - telo HTML (max 2 MB) + textovy preview - prilohy (metadata: jmeno, velikost, MIME typ, inline flag) - internet headers (SPF, DKIM, Received, X-*, ...) - MAPI-ekvivalenty: dulezitost, priznak, konverzacni vlakno, kategorie, In-Reply-To, References, ... - navic: isRead, isDraft, folder_path, inferenceClassification Prochazi VSECHNY slozky schranky rekurzivne (Inbox, Sent, Deleted, archivni slozky, ...). DB: emaily Kolekce: ordinace@buzalkova.cz _id: Internet Message-ID (nebo "graphid:" jako fallback) POZOR: Skript pouze CIST ze schranky — zadny zapis do schranky! Spousteni: # Prvni import (vsechno): python parse_emails_graph_v1.1.py # Test na prvnich 50: python parse_emails_graph_v1.1.py --limit 50 --no-indexes # Jen jedna slozka: python parse_emails_graph_v1.1.py --folder Inbox # Pokracovani po preruseni (pouze nove): python parse_emails_graph_v1.1.py --mode new-only # Pravidelny sync (aktualizuje is_read, flag, slozku; importuje nove): python parse_emails_graph_v1.1.py --mode sync # Plny reimport vsech dat: python parse_emails_graph_v1.1.py --mode full Rezimy (--mode): full Plny upsert vsech poli pro kazdou zpravu (vychozi) new-only Preskoci zpravy ktere uz jsou v MongoDB, importuje jen nove sync Existujici: aktualizuje jen is_read/flag_status/categories/ modified_at/folder_path. Nove zpravy importuje cely. Idealni pro pravidelne spousteni. Zavislosti: msal, requests, pymongo, python-dateutil Python 3.10+ Struktura dokumentu v MongoDB: _id Internet Message-ID (nebo graphid: fallback) graph_id Graph API message ID subject predmet zpravy normalized_subject predmet bez RE:/FW:/AW: prefixu importance 0=nizka 1=normalni 2=vysoka flag_status 0=bez priznaku 1=oznaceno 2=dokonceno is_read bool — aktualni stav precteni ve schrance is_draft bool has_attachments bool attachment_count int inference_classification focused / other categories [str] conversation_id Graph conversationId conversation_index base64 conversationIndex conversation_topic tema vlakna (z internet headers Thread-Topic) in_reply_to Message-ID predchozi zpravy internet_references [Message-ID] received_at datetime UTC sent_at datetime UTC created_at datetime UTC modified_at datetime UTC folder_id Graph parentFolderId folder_path cela cesta slozky (napr. Inbox/Subfolder) sender.email emailova adresa odesilatele sender.name zobrazovane jmeno to retezec To (joined) cc retezec CC bcc retezec BCC recipients [{type, email, name}] body_html HTML telo (max 2 MB) body_preview textovy nahled (max 255 znaku) attachments [{filename, size_bytes, mime_type, content_id, is_inline}] headers dict internet headers parsed_at datetime UTC Indexy: received_at, sent_at, sender.email, graph_id (unique), conversation_id, folder_path, has_attachments, categories, importance, flag_status, is_read, text_search (subject + body_preview + to + cc) Historie verzi: 1.0 2026-06-02 Inicialni verze 1.1 2026-06-02 Pridany rezimy --mode full/new-only/sync; odstranen --skip-existing (nahrazen --mode new-only) """ import sys import re import logging import argparse import base64 from pathlib import Path from datetime import datetime, timezone from typing import Optional import msal import requests from dateutil import parser as dtparser from pymongo import MongoClient, UpdateOne, ASCENDING, TEXT if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8", errors="replace") # ─── KONFIGURACE ────────────────────────────────────────────────────────────── GRAPH_TENANT_ID = "7d269944-37a4-43a1-8140-c7517dc426e9" GRAPH_CLIENT_ID = "4b222bfd-78c9-4239-a53f-43006b3ed07f" GRAPH_CLIENT_SECRET = "Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk" GRAPH_MAILBOX = "ordinace@buzalkova.cz" GRAPH_URL = "https://graph.microsoft.com/v1.0" MONGO_URI = "mongodb://192.168.1.76:27017" MONGO_DB = "emaily" MONGO_COL = "ordinace@buzalkova.cz" BATCH_SIZE = 100 PAGE_SIZE = 50 LOG_FILE = Path(__file__).parent / "parse_emails_errors.log" SCRIPT_VERSION = "1.1" # ────────────────────────────────────────────────────────────────────────────── logging.basicConfig( filename=str(LOG_FILE), level=logging.ERROR, format="%(asctime)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", encoding="utf-8", ) IMPORTANCE_MAP = {"low": 0, "normal": 1, "high": 2} FLAG_STATUS_MAP = {"notFlagged": 0, "flagged": 1, "complete": 2} RE_SUBJECT = re.compile(r"^(RE|FW|AW|SV|VS|TR|WG|odpov[eě]d[ťt]|fwd?)[:\s]+", re.IGNORECASE) MSG_SELECT = ( "id,internetMessageId,subject,bodyPreview,body," "importance,isRead,isDraft,hasAttachments," "receivedDateTime,sentDateTime,createdDateTime,lastModifiedDateTime," "sender,from,toRecipients,ccRecipients,bccRecipients,replyTo," "conversationId,conversationIndex,parentFolderId," "categories,flag,inferenceClassification,internetMessageHeaders" ) # Pro sync mode staci jen menitelna pole — rychlejsi fetch MSG_SELECT_SYNC = ( "id,internetMessageId,isRead,isDraft,flag,categories," "lastModifiedDateTime,parentFolderId,importance" ) # ─── Graph API helpers ──────────────────────────────────────────────────────── _graph_token: Optional[str] = None def get_token() -> str: global _graph_token app = msal.ConfidentialClientApplication( GRAPH_CLIENT_ID, authority=f"https://login.microsoftonline.com/{GRAPH_TENANT_ID}", client_credential=GRAPH_CLIENT_SECRET, ) result = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"]) if "access_token" not in result: raise RuntimeError(f"Graph auth failed: {result}") _graph_token = result["access_token"] return _graph_token def graph_get(url: str, params: dict = None) -> dict: global _graph_token if not _graph_token: get_token() for attempt in range(2): r = requests.get(url, headers={"Authorization": f"Bearer {_graph_token}"}, params=params, timeout=30) if r.status_code == 401: get_token() continue r.raise_for_status() return r.json() raise RuntimeError(f"Graph GET failed after retry: {url}") def get_all_folders(parent_id: str = None, parent_path: str = "") -> list[dict]: """Rekurzivne nacte vsechny slozky schranky. Vraci [{id, path}].""" if parent_id is None: url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/mailFolders" else: url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/mailFolders/{parent_id}/childFolders" folders = [] params = {"$top": 100, "$select": "id,displayName,childFolderCount"} while url: data = graph_get(url, params) for f in data.get("value", []): path = f"{parent_path}/{f['displayName']}".lstrip("/") folders.append({"id": f["id"], "path": path}) if f.get("childFolderCount", 0) > 0: folders.extend(get_all_folders(f["id"], path)) url = data.get("@odata.nextLink") params = None return folders def iter_folder_messages(folder_id: str, select: str = MSG_SELECT, expand_attachments: bool = True): """Generator: vraci zpravy ze slozky po strankach.""" url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/mailFolders/{folder_id}/messages" params = {"$top": PAGE_SIZE, "$select": select} if expand_attachments: params["$expand"] = "attachments" while url: data = graph_get(url, params) for msg in data.get("value", []): yield msg url = data.get("@odata.nextLink") params = None # ─── Pomocné funkce ─────────────────────────────────────────────────────────── def parse_date(raw) -> Optional[datetime]: if raw is None: return None if isinstance(raw, datetime): if raw.tzinfo: return raw.astimezone(timezone.utc).replace(tzinfo=None) return raw try: dt = dtparser.parse(str(raw)) if dt.tzinfo: return dt.astimezone(timezone.utc).replace(tzinfo=None) return dt except Exception: return None def normalize_subject(subject: str) -> str: s = subject.strip() while True: m = RE_SUBJECT.match(s) if not m: break s = s[m.end():].strip() return s def parse_headers(raw_headers: list) -> dict: result = {} for h in raw_headers: k = h["name"].lower().replace("-", "_") v = h["value"] if k in result: existing = result[k] result[k] = existing + [v] if isinstance(existing, list) else [existing, v] else: result[k] = v return result def format_recipients(lst: list) -> str: return "; ".join( f'{r["emailAddress"].get("name", "")} <{r["emailAddress"].get("address", "")}>'.strip() for r in lst ) # ─── Extrakce zprávy ───────────────────────────────────────────────────────── def extract_message(msg: dict, folder_path: str) -> Optional[dict]: """Plna extrakce — pouziva se pro mode full a nove zpravy v sync/new-only.""" try: mid = (msg.get("internetMessageId") or "").strip() or f"graphid:{msg['id']}" subject = msg.get("subject") or "" body_html = None body_preview = msg.get("bodyPreview") or "" body = msg.get("body", {}) if body.get("contentType") == "html": content = body.get("content") or "" body_html = content if len(content) <= 2 * 1024 * 1024 else content[:2 * 1024 * 1024] elif body.get("contentType") == "text": body_preview = (body.get("content") or "")[:2000] sender_ea = (msg.get("from") or msg.get("sender") or {}).get("emailAddress", {}) to_list = msg.get("toRecipients", []) cc_list = msg.get("ccRecipients", []) bcc_list = msg.get("bccRecipients", []) recipients = ( [{"type": "to", "email": r["emailAddress"].get("address",""), "name": r["emailAddress"].get("name","")} for r in to_list] + [{"type": "cc", "email": r["emailAddress"].get("address",""), "name": r["emailAddress"].get("name","")} for r in cc_list] + [{"type": "bcc", "email": r["emailAddress"].get("address",""), "name": r["emailAddress"].get("name","")} for r in bcc_list] ) importance = IMPORTANCE_MAP.get(msg.get("importance", "normal"), 1) flag_status = FLAG_STATUS_MAP.get((msg.get("flag") or {}).get("flagStatus", "notFlagged"), 0) raw_headers = msg.get("internetMessageHeaders") or [] headers = parse_headers(raw_headers) in_reply_to = headers.get("in_reply_to", "") if isinstance(in_reply_to, list): in_reply_to = in_reply_to[0] refs_raw = headers.get("references", "") if isinstance(refs_raw, list): refs_raw = " ".join(refs_raw) internet_refs = [r.strip() for r in refs_raw.split() if r.strip()] if refs_raw else [] conv_topic = headers.get("thread_topic", "") if isinstance(conv_topic, list): conv_topic = conv_topic[0] conv_index = "" ci_raw = msg.get("conversationIndex") if ci_raw: try: conv_index = base64.b64encode(base64.b64decode(ci_raw)).decode() except Exception: conv_index = ci_raw attachments = [] for att in msg.get("attachments") or []: fname = att.get("name") or "" if not fname: continue attachments.append({ "filename": fname, "size_bytes": att.get("size", 0), "mime_type": att.get("contentType", "application/octet-stream"), "content_id": att.get("contentId"), "is_inline": att.get("isInline", False), }) return { "_id": mid, "graph_id": msg["id"], "subject": subject, "normalized_subject": normalize_subject(subject), "importance": importance, "flag_status": flag_status, "is_read": msg.get("isRead", False), "is_draft": msg.get("isDraft", False), "has_attachments": msg.get("hasAttachments", False), "attachment_count": len(attachments), "inference_classification": msg.get("inferenceClassification", ""), "categories": msg.get("categories") or [], "conversation_id": msg.get("conversationId", ""), "conversation_index": conv_index, "conversation_topic": conv_topic, "in_reply_to": in_reply_to, "internet_references": internet_refs, "received_at": parse_date(msg.get("receivedDateTime")), "sent_at": parse_date(msg.get("sentDateTime")), "created_at": parse_date(msg.get("createdDateTime")), "modified_at": parse_date(msg.get("lastModifiedDateTime")), "folder_id": msg.get("parentFolderId", ""), "folder_path": folder_path, "sender": { "email": sender_ea.get("address", ""), "name": sender_ea.get("name", ""), }, "to": format_recipients(to_list), "cc": format_recipients(cc_list), "bcc": format_recipients(bcc_list), "recipients": recipients, "body_html": body_html, "body_preview": body_preview, "attachments": attachments, "headers": headers, "parsed_at": datetime.now(timezone.utc).replace(tzinfo=None), } except Exception as e: logging.error("extract_message failed [%s]: %s", msg.get("id", "?"), e) return None def extract_sync_fields(msg: dict, folder_path: str) -> dict: """Jen menitelna pole — pouziva se v sync mode pro existujici zpravy.""" return { "is_read": msg.get("isRead", False), "is_draft": msg.get("isDraft", False), "flag_status": FLAG_STATUS_MAP.get((msg.get("flag") or {}).get("flagStatus", "notFlagged"), 0), "importance": IMPORTANCE_MAP.get(msg.get("importance", "normal"), 1), "categories": msg.get("categories") or [], "modified_at": parse_date(msg.get("lastModifiedDateTime")), "folder_id": msg.get("parentFolderId", ""), "folder_path": folder_path, "parsed_at": datetime.now(timezone.utc).replace(tzinfo=None), } # ─── MongoDB indexy ─────────────────────────────────────────────────────────── def create_indexes(col): print(" Vytvarim indexy...") col.create_index([("received_at", ASCENDING)]) col.create_index([("sent_at", ASCENDING)]) col.create_index([("sender.email", ASCENDING)]) col.create_index([("graph_id", ASCENDING)], unique=True, sparse=True) col.create_index([("conversation_id", ASCENDING)]) col.create_index([("folder_path", ASCENDING)]) col.create_index([("has_attachments", ASCENDING)]) col.create_index([("categories", ASCENDING)]) col.create_index([("importance", ASCENDING)]) col.create_index([("flag_status", ASCENDING)]) col.create_index([("is_read", ASCENDING)]) col.create_index([ ("subject", TEXT), ("body_preview", TEXT), ("to", TEXT), ("cc", TEXT), ], name="text_search", default_language="none") print(" Indexy hotovy.") # ─── MAIN ───────────────────────────────────────────────────────────────────── def main(): ap = argparse.ArgumentParser(description=f"parse_emails_graph v{SCRIPT_VERSION}") ap.add_argument("--mode", default="full", choices=["full", "new-only", "sync"], help="full=plny upsert (vychozi) | new-only=jen nove zpravy | " "sync=existujici aktualizuje jen menitelna pole, nove importuje cely") ap.add_argument("--limit", type=int, default=0, help="Zpracovat max N zprav (0 = vse)") ap.add_argument("--folder", default="", help="Zpracovat jen slozku se zadanym nazvem (napr. Inbox)") ap.add_argument("--no-indexes", action="store_true", help="Nevytvorit indexy na konci") args = ap.parse_args() start = datetime.now() print(f"=== parse_emails_graph v{SCRIPT_VERSION} ===") print(f"Start: {start.strftime('%Y-%m-%d %H:%M:%S')}") print(f"Schránka: {GRAPH_MAILBOX}") print(f"MongoDB: {MONGO_URI} -> {MONGO_DB}.{MONGO_COL}") print(f"Režim: {args.mode}") print("\nPřipojuji se k Graph API...") try: get_token() print(" Graph API OK") except Exception as e: print(f" CHYBA: {e}") sys.exit(1) client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) try: client.admin.command("ping") print(" MongoDB OK") except Exception as e: print(f" CHYBA: MongoDB neni dostupna -- {e}") sys.exit(1) col = client[MONGO_DB][MONGO_COL] # Existující _id (potřeba pro new-only a sync) existing: set = set() if args.mode in ("new-only", "sync"): print(" Nacitam existujici zaznamy z MongoDB...") existing = set(col.distinct("_id")) print(f" {len(existing)} jiz importovano") print("\nNacitam seznam slozek...") all_folders = get_all_folders() if args.folder: all_folders = [f for f in all_folders if args.folder.lower() in f["path"].lower()] print(f" Slozek ke zpracovani: {len(all_folders)}") for f in all_folders: print(f" {f['path']}") # V sync mode fetchujeme jen menitelna pole is_sync = args.mode == "sync" msg_select = MSG_SELECT_SYNC if is_sync else MSG_SELECT expand_att = not is_sync batch = [] ok_count = 0 sync_count = 0 err_count = 0 skip_count = 0 total_i = 0 def flush(): if not batch: return try: col.bulk_write(batch, ordered=False) except Exception as e: logging.error("bulk_write: %s", e) print(f" CHYBA bulk_write: {e}") batch.clear() print() for folder in all_folders: print(f"--- Složka: {folder['path']} ---") folder_count = 0 for msg in iter_folder_messages(folder["id"], select=msg_select, expand_attachments=expand_att): if args.limit and total_i >= args.limit: break mid = (msg.get("internetMessageId") or "").strip() or f"graphid:{msg['id']}" total_i += 1 folder_count += 1 if args.mode == "new-only" and mid in existing: skip_count += 1 continue if is_sync and mid in existing: # Sync existujici — jen menitelna pole fields = extract_sync_fields(msg, folder["path"]) batch.append(UpdateOne({"_id": mid}, {"$set": fields})) sync_count += 1 status = "SYN " print(f" {total_i:>6} {status} {mid[:80]}") else: # Full extract (new-only nove, sync nove, full vse) # Pro sync nove zpravy potrebujeme plny fetch if is_sync: full_url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/messages/{msg['id']}" full_params = {"$select": MSG_SELECT, "$expand": "attachments"} try: msg = graph_get(full_url, full_params) except Exception as e: logging.error("full fetch failed [%s]: %s", msg.get("id","?"), e) err_count += 1 continue doc = extract_message(msg, folder["path"]) if doc is None: err_count += 1 status = "ERR " print(f" {total_i:>6} {status} {mid[:80]}") else: batch.append(UpdateOne({"_id": doc["_id"]}, {"$set": doc}, upsert=True)) ok_count += 1 status = "OK " subject_str = (doc.get("subject") or "")[:60] sender_str = (doc.get("sender", {}).get("email") or "")[:40] print(f" {total_i:>6} {status} {subject_str:<60} {sender_str}") if len(batch) >= BATCH_SIZE: flush() if total_i % 500 == 0: elapsed = (datetime.now() - start).total_seconds() rate = total_i / elapsed if elapsed > 0 else 0 print(f" {'─'*80}") print(f" Průběh: ok={ok_count} sync={sync_count} skip={skip_count} err={err_count} {rate:.1f} msg/s") print(f" {'─'*80}") flush() print(f" → {folder_count} zprav ze slozky {folder['path']}") if args.limit and total_i >= args.limit: break elapsed_total = (datetime.now() - start).total_seconds() print(f"\n{'='*52}") print(f"Vysledek: ok={ok_count} | sync={sync_count} | skip={skip_count} | err={err_count}") print(f"Celkovy cas: {int(elapsed_total//3600)}h {int((elapsed_total%3600)//60)}m {int(elapsed_total%60)}s") print(f"Dokumentu v kolekci: {col.count_documents({})}") if not args.no_indexes: print() create_indexes(col) print(f"\nKonec: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") if err_count: print(f"Chyby logovany do: {LOG_FILE}") client.close() if __name__ == "__main__": main()