""" parse_emails_graph_v1.0.py Nazev: parse_emails_graph_v1.0.py Verze: 1.0 Datum: 2026-06-02 Autor: vladimir.buzalka Popis: Cte vsechny emaily ze schranky ordinace@buzalkova.cz primo pres Microsoft Graph API a importuje je jako dokumenty do MongoDB. Ze kazde zpravy extrahuje vsechny dostupne vlastnosti: - predmet, odesilatel, prijemci (To/CC/BCC s typy) - cas doruceni, odeslani, vytvoreni, modifikace (UTC) - telo HTML (max 2 MB) + textovy preview - prilohy (metadata: jmeno, velikost, MIME typ, inline flag) - internet headers (SPF, DKIM, Received, X-*, ...) - MAPI-ekvivalenty: dulezitost, priznak, konverzacni vlakno, kategorie, In-Reply-To, References, ... - navic: isRead, isDraft, folder_path, inferenceClassification Prochazi VSECHNY slozky schranky rekurzivne (Inbox, Sent, Deleted, archivni slozky, ...). DB: emaily Kolekce: ordinace@buzalkova.cz _id: Internet Message-ID (nebo "graphid:" jako fallback) Bezpecne prerusit a opakovat: - upsert podle _id — duplicity se automaticky prepisi - --skip-existing nacte seznam hotovych _id z MongoDB a preskoci je POZOR: Skript pouze CIST ze schranky — zadny zapis do schranky! Spousteni: python parse_emails_graph_v1.0.py # kompletni import python parse_emails_graph_v1.0.py --limit 50 # test na prvnich 50 python parse_emails_graph_v1.0.py --skip-existing # pokracovani po preruseni python parse_emails_graph_v1.0.py --folder Inbox # jen jedna slozka python parse_emails_graph_v1.0.py --no-indexes # bez indexu na konci Zavislosti: msal, requests, pymongo, python-dateutil Python 3.10+ Struktura dokumentu v MongoDB: _id Internet Message-ID (nebo graphid: fallback) graph_id Graph API message ID (pro pripadne dalsi operace) subject predmet zpravy normalized_subject predmet bez RE:/FW:/AW: prefixu importance 0=nizka 1=normalni 2=vysoka flag_status 0=bez priznaku 1=oznaceno 2=dokonceno is_read bool — aktualni stav precteni ve schrance is_draft bool has_attachments bool attachment_count int inference_classification focused / other (Outlook AI trideni) categories [str] conversation_id Graph conversationId conversation_index base64 conversationIndex conversation_topic tema vlakna (z internet headers Thread-Topic) in_reply_to Message-ID predchozi zpravy internet_references [Message-ID] — cela historia vlakna received_at datetime UTC sent_at datetime UTC created_at datetime UTC — cas vytvoreni zaznamu v M365 modified_at datetime UTC — cas posledni modifikace folder_id Graph parentFolderId folder_path cela cesta slozky (napr. Inbox/Subfolder) sender.email emailova adresa odesilatele sender.name zobrazovane jmeno odesilatele to retezec To (joined) cc retezec CC bcc retezec BCC recipients [{type, email, name}] — to/cc/bcc s typy body_html HTML telo (max 2 MB) body_preview textovy nahled (max 255 znaku z Graph) attachments [{filename, size_bytes, mime_type, content_id, is_inline}] headers dict internet headers (lowercase_s_podtrzitky) parsed_at datetime UTC — cas parsovani Indexy: received_at, sent_at, sender.email, graph_id (unique), conversation_id, folder_path, has_attachments, categories, importance, flag_status, is_read, text_search (subject + body_preview + to + cc) Historie verzi: 1.0 2026-06-02 Inicialni verze — Graph API jako zdroj """ import sys import re import logging import argparse import base64 from pathlib import Path from datetime import datetime, timezone from typing import Optional import msal import requests from dateutil import parser as dtparser from pymongo import MongoClient, UpdateOne, ASCENDING, TEXT if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8", errors="replace") # ─── KONFIGURACE ────────────────────────────────────────────────────────────── GRAPH_TENANT_ID = "7d269944-37a4-43a1-8140-c7517dc426e9" GRAPH_CLIENT_ID = "4b222bfd-78c9-4239-a53f-43006b3ed07f" GRAPH_CLIENT_SECRET = "Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk" GRAPH_MAILBOX = "ordinace@buzalkova.cz" GRAPH_URL = "https://graph.microsoft.com/v1.0" MONGO_URI = "mongodb://192.168.1.76:27017" MONGO_DB = "emaily" MONGO_COL = "ordinace@buzalkova.cz" BATCH_SIZE = 100 PAGE_SIZE = 50 LOG_FILE = Path(__file__).parent / "parse_emails_errors.log" SCRIPT_VERSION = "1.0" # ────────────────────────────────────────────────────────────────────────────── logging.basicConfig( filename=str(LOG_FILE), level=logging.ERROR, format="%(asctime)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", encoding="utf-8", ) IMPORTANCE_MAP = {"low": 0, "normal": 1, "high": 2} FLAG_STATUS_MAP = {"notFlagged": 0, "flagged": 1, "complete": 2} RE_SUBJECT = re.compile(r"^(RE|FW|AW|SV|VS|TR|WG|odpov[eě]d[ťt]|fwd?)[:\s]+", re.IGNORECASE) MSG_SELECT = ( "id,internetMessageId,subject,bodyPreview,body," "importance,isRead,isDraft,hasAttachments," "receivedDateTime,sentDateTime,createdDateTime,lastModifiedDateTime," "sender,from,toRecipients,ccRecipients,bccRecipients,replyTo," "conversationId,conversationIndex,parentFolderId," "categories,flag,inferenceClassification,internetMessageHeaders" ) # ─── Graph API helpers ──────────────────────────────────────────────────────── _graph_token: Optional[str] = None def get_token() -> str: global _graph_token app = msal.ConfidentialClientApplication( GRAPH_CLIENT_ID, authority=f"https://login.microsoftonline.com/{GRAPH_TENANT_ID}", client_credential=GRAPH_CLIENT_SECRET, ) result = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"]) if "access_token" not in result: raise RuntimeError(f"Graph auth failed: {result}") _graph_token = result["access_token"] return _graph_token def graph_get(url: str, params: dict = None) -> dict: global _graph_token if not _graph_token: get_token() for attempt in range(2): r = requests.get(url, headers={"Authorization": f"Bearer {_graph_token}"}, params=params, timeout=30) if r.status_code == 401: get_token() continue r.raise_for_status() return r.json() raise RuntimeError(f"Graph GET failed after retry: {url}") def get_all_folders(parent_id: str = None, parent_path: str = "") -> list[dict]: """Rekurzivne nacte vsechny slozky schranky. Vraci [{id, path}].""" if parent_id is None: url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/mailFolders" else: url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/mailFolders/{parent_id}/childFolders" folders = [] params = {"$top": 100, "$select": "id,displayName,childFolderCount"} while url: data = graph_get(url, params) for f in data.get("value", []): path = f"{parent_path}/{f['displayName']}".lstrip("/") folders.append({"id": f["id"], "path": path}) if f.get("childFolderCount", 0) > 0: folders.extend(get_all_folders(f["id"], path)) url = data.get("@odata.nextLink") params = None return folders def iter_folder_messages(folder_id: str): """Generator: vraci zpravy ze slozky po strankach.""" url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/mailFolders/{folder_id}/messages" params = {"$top": PAGE_SIZE, "$select": MSG_SELECT, "$expand": "attachments"} while url: data = graph_get(url, params) for msg in data.get("value", []): yield msg url = data.get("@odata.nextLink") params = None # ─── Pomocné funkce ─────────────────────────────────────────────────────────── def parse_date(raw) -> Optional[datetime]: if raw is None: return None if isinstance(raw, datetime): if raw.tzinfo: return raw.astimezone(timezone.utc).replace(tzinfo=None) return raw try: dt = dtparser.parse(str(raw)) if dt.tzinfo: return dt.astimezone(timezone.utc).replace(tzinfo=None) return dt except Exception: return None def normalize_subject(subject: str) -> str: s = subject.strip() while True: m = RE_SUBJECT.match(s) if not m: break s = s[m.end():].strip() return s def parse_headers(raw_headers: list) -> dict: result = {} for h in raw_headers: k = h["name"].lower().replace("-", "_") v = h["value"] if k in result: existing = result[k] if isinstance(existing, list): existing.append(v) else: result[k] = [existing, v] else: result[k] = v return result def format_recipients(lst: list) -> str: return "; ".join( f'{r["emailAddress"].get("name", "")} <{r["emailAddress"].get("address", "")}>'.strip() for r in lst ) # ─── Hlavní extrakce ───────────────────────────────────────────────────────── def extract_message(msg: dict, folder_path: str) -> Optional[dict]: try: # _id mid = (msg.get("internetMessageId") or "").strip() if not mid: mid = f"graphid:{msg['id']}" subject = msg.get("subject") or "" norm_subject = normalize_subject(subject) # tělo body_html = None body_preview = msg.get("bodyPreview") or "" body = msg.get("body", {}) if body.get("contentType") == "html": content = body.get("content") or "" body_html = content if len(content) <= 2 * 1024 * 1024 else content[:2 * 1024 * 1024] elif body.get("contentType") == "text": body_preview = (body.get("content") or "")[:2000] # odesílatel sender_ea = (msg.get("from") or msg.get("sender") or {}).get("emailAddress", {}) sender_email = sender_ea.get("address", "") sender_name = sender_ea.get("name", "") # příjemci to_list = msg.get("toRecipients", []) cc_list = msg.get("ccRecipients", []) bcc_list = msg.get("bccRecipients", []) recipients = ( [{"type": "to", "email": r["emailAddress"].get("address",""), "name": r["emailAddress"].get("name","")} for r in to_list] + [{"type": "cc", "email": r["emailAddress"].get("address",""), "name": r["emailAddress"].get("name","")} for r in cc_list] + [{"type": "bcc", "email": r["emailAddress"].get("address",""), "name": r["emailAddress"].get("name","")} for r in bcc_list] ) # příznaky importance = IMPORTANCE_MAP.get(msg.get("importance", "normal"), 1) flag_status = FLAG_STATUS_MAP.get((msg.get("flag") or {}).get("flagStatus", "notFlagged"), 0) # internet headers raw_headers = msg.get("internetMessageHeaders") or [] headers = parse_headers(raw_headers) in_reply_to = headers.get("in_reply_to", "") if isinstance(in_reply_to, list): in_reply_to = in_reply_to[0] refs_raw = headers.get("references", "") if isinstance(refs_raw, list): refs_raw = " ".join(refs_raw) internet_refs = [r.strip() for r in refs_raw.split() if r.strip()] if refs_raw else [] conv_topic = headers.get("thread_topic", "") if isinstance(conv_topic, list): conv_topic = conv_topic[0] # conversation index conv_index = "" ci_raw = msg.get("conversationIndex") if ci_raw: try: conv_index = base64.b64encode(base64.b64decode(ci_raw)).decode() except Exception: conv_index = ci_raw # přílohy (jen metadata, bez obsahu) attachments = [] for att in msg.get("attachments") or []: fname = att.get("name") or "" if not fname: continue attachments.append({ "filename": fname, "size_bytes": att.get("size", 0), "mime_type": att.get("contentType", "application/octet-stream"), "content_id": att.get("contentId"), "is_inline": att.get("isInline", False), }) return { "_id": mid, "graph_id": msg["id"], "subject": subject, "normalized_subject": norm_subject, "importance": importance, "flag_status": flag_status, "is_read": msg.get("isRead", False), "is_draft": msg.get("isDraft", False), "has_attachments": msg.get("hasAttachments", False), "attachment_count": len(attachments), "inference_classification": msg.get("inferenceClassification", ""), "categories": msg.get("categories") or [], "conversation_id": msg.get("conversationId", ""), "conversation_index": conv_index, "conversation_topic": conv_topic, "in_reply_to": in_reply_to, "internet_references": internet_refs, "received_at": parse_date(msg.get("receivedDateTime")), "sent_at": parse_date(msg.get("sentDateTime")), "created_at": parse_date(msg.get("createdDateTime")), "modified_at": parse_date(msg.get("lastModifiedDateTime")), "folder_id": msg.get("parentFolderId", ""), "folder_path": folder_path, "sender": { "email": sender_email, "name": sender_name, }, "to": format_recipients(to_list), "cc": format_recipients(cc_list), "bcc": format_recipients(bcc_list), "recipients": recipients, "body_html": body_html, "body_preview": body_preview, "attachments": attachments, "headers": headers, "parsed_at": datetime.now(timezone.utc).replace(tzinfo=None), } except Exception as e: logging.error("extract_message failed [%s]: %s", msg.get("id", "?"), e) return None # ─── MongoDB indexy ─────────────────────────────────────────────────────────── def create_indexes(col): print(" Vytvarim indexy...") col.create_index([("received_at", ASCENDING)]) col.create_index([("sent_at", ASCENDING)]) col.create_index([("sender.email", ASCENDING)]) col.create_index([("graph_id", ASCENDING)], unique=True, sparse=True) col.create_index([("conversation_id", ASCENDING)]) col.create_index([("folder_path", ASCENDING)]) col.create_index([("has_attachments", ASCENDING)]) col.create_index([("categories", ASCENDING)]) col.create_index([("importance", ASCENDING)]) col.create_index([("flag_status", ASCENDING)]) col.create_index([("is_read", ASCENDING)]) col.create_index([ ("subject", TEXT), ("body_preview", TEXT), ("to", TEXT), ("cc", TEXT), ], name="text_search", default_language="none") print(" Indexy hotovy.") # ─── MAIN ───────────────────────────────────────────────────────────────────── def main(): ap = argparse.ArgumentParser(description=f"parse_emails_graph v{SCRIPT_VERSION}") ap.add_argument("--limit", type=int, default=0, help="Zpracovat max N zprav (0 = vse)") ap.add_argument("--skip-existing", action="store_true", help="Preskocit zpravy ktere jiz jsou v MongoDB") ap.add_argument("--folder", default="", help="Zpracovat jen slozku se zadanym nazvem (napr. Inbox)") ap.add_argument("--no-indexes", action="store_true", help="Nevytvorit indexy na konci") args = ap.parse_args() start = datetime.now() print(f"=== parse_emails_graph v{SCRIPT_VERSION} ===") print(f"Start: {start.strftime('%Y-%m-%d %H:%M:%S')}") print(f"Schránka: {GRAPH_MAILBOX}") print(f"MongoDB: {MONGO_URI} -> {MONGO_DB}.{MONGO_COL}") # Graph token print("\nPřipojuji se k Graph API...") try: get_token() print(" Graph API OK") except Exception as e: print(f" CHYBA: {e}") sys.exit(1) # MongoDB client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) try: client.admin.command("ping") print(" MongoDB OK") except Exception as e: print(f" CHYBA: MongoDB neni dostupna -- {e}") sys.exit(1) col = client[MONGO_DB][MONGO_COL] # Skip existing existing: set = set() if args.skip_existing: print(" Nacitam existujici zaznamy z MongoDB...") existing = set(col.distinct("_id")) print(f" {len(existing)} jiz importovano") # Slozky print("\nNacitam seznam slozek...") all_folders = get_all_folders() if args.folder: all_folders = [f for f in all_folders if args.folder.lower() in f["path"].lower()] print(f" Slozek ke zpracovani: {len(all_folders)}") for f in all_folders: print(f" {f['path']}") # Import batch = [] ok_count = 0 err_count = 0 skip_count = 0 total_i = 0 def flush(): if not batch: return try: col.bulk_write(batch, ordered=False) except Exception as e: logging.error("bulk_write: %s", e) print(f" CHYBA bulk_write: {e}") batch.clear() print() for folder in all_folders: print(f"--- Složka: {folder['path']} ---") folder_count = 0 for msg in iter_folder_messages(folder["id"]): if args.limit and total_i >= args.limit: break mid = (msg.get("internetMessageId") or "").strip() or f"graphid:{msg['id']}" if mid in existing: skip_count += 1 total_i += 1 continue doc = extract_message(msg, folder["path"]) total_i += 1 folder_count += 1 if doc is None: err_count += 1 else: batch.append(UpdateOne({"_id": doc["_id"]}, {"$set": doc}, upsert=True)) ok_count += 1 if len(batch) >= BATCH_SIZE: flush() status = "ERR " if doc is None else "OK " subject_str = (doc.get("subject") or "")[:60] if doc else "?" sender_str = (doc.get("sender", {}).get("email") or "")[:40] if doc else "?" print(f" {total_i:>6} {status} {subject_str:<60} {sender_str}") if total_i % 500 == 0: elapsed = (datetime.now() - start).total_seconds() rate = total_i / elapsed if elapsed > 0 else 0 print(f" {'─'*80}") print(f" Průběh: ok={ok_count} skip={skip_count} err={err_count} {rate:.1f} msg/s") print(f" {'─'*80}") flush() print(f" → {folder_count} zprav ze slozky {folder['path']}") if args.limit and total_i >= args.limit: break elapsed_total = (datetime.now() - start).total_seconds() print(f"\n{'='*52}") print(f"Vysledek: ok={ok_count} | skip={skip_count} | err={err_count}") print(f"Celkovy cas: {int(elapsed_total//3600)}h {int((elapsed_total%3600)//60)}m {int(elapsed_total%60)}s") print(f"Dokumentu v kolekci: {col.count_documents({})}") if not args.no_indexes: print() create_indexes(col) print(f"\nKonec: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") if err_count: print(f"Chyby logovany do: {LOG_FILE}") client.close() if __name__ == "__main__": main()