Files
janssen/Python-runner/1_parse_emails_graph_v1.4.py
2026-06-05 21:21:30 +02:00

625 lines
25 KiB
Python

"""
parse_emails_graph_v1.4.py
Nazev: parse_emails_graph_v1.4.py
Verze: 1.4
Datum: 2026-06-03
Autor: vladimir.buzalka
Popis:
Cte vsechny emaily z libovolne schranky primo pres Microsoft Graph API
a importuje je jako dokumenty do MongoDB.
Ze kazde zpravy extrahuje vsechny dostupne vlastnosti:
- predmet, odesilatel, prijemci (To/CC/BCC s typy)
- cas doruceni, odeslani, vytvoreni, modifikace (UTC)
- telo HTML (max 2 MB) + textovy preview
- prilohy (metadata: jmeno, velikost, MIME typ, inline flag, graph_att_id)
- internet headers (SPF, DKIM, Received, X-*, ...)
- MAPI-ekvivalenty: dulezitost, priznak, konverzacni vlakno,
kategorie, In-Reply-To, References, ...
- navic: isRead, isDraft, folder_path, inferenceClassification
Prochazi VSECHNY slozky schranky rekurzivne (Inbox, Sent, Deleted,
archivni slozky, ...).
DB: emaily
Kolekce: <mailbox> (napr. ordinace@buzalkova.cz)
_id: Internet Message-ID (nebo "graphid:<id>" jako fallback)
POZOR: Skript pouze CIST ze schranky — zadny zapis do schranky!
Spousteni:
# Prvni import (vsechno):
python parse_emails_graph_v1.3.py --mailbox ordinace@buzalkova.cz
# Test na prvnich 50:
python parse_emails_graph_v1.3.py --mailbox ordinace@buzalkova.cz --limit 50 --no-indexes
# Jen jedna slozka:
python parse_emails_graph_v1.3.py --mailbox ordinace@buzalkova.cz --folder Inbox
# Pokracovani po preruseni (pouze nove):
python parse_emails_graph_v1.3.py --mailbox ordinace@buzalkova.cz --mode new-only
# Pravidelny sync (aktualizuje is_read, flag, slozku; importuje nove):
python parse_emails_graph_v1.3.py --mailbox ordinace@buzalkova.cz --mode sync
# Jina schranka:
python parse_emails_graph_v1.3.py --mailbox vladimir.buzalka@buzalka.cz
Rezimy (--mode):
full Plny upsert vsech poli pro kazdou zpravu (vychozi)
new-only Preskoci zpravy ktere uz jsou v MongoDB, importuje jen nove
sync Existujici: aktualizuje jen is_read/flag_status/categories/
modified_at/folder_path. Nove zpravy importuje cely.
Idealni pro pravidelne spousteni.
Zavislosti:
msal, requests, pymongo, python-dateutil
Python 3.10+
Struktura dokumentu v MongoDB:
_id Internet Message-ID (nebo graphid: fallback)
graph_id Graph API message ID
subject predmet zpravy
normalized_subject predmet bez RE:/FW:/AW: prefixu
importance 0=nizka 1=normalni 2=vysoka
flag_status 0=bez priznaku 1=oznaceno 2=dokonceno
is_read bool — aktualni stav precteni ve schrance
is_draft bool
has_attachments bool
attachment_count int
inference_classification focused / other
categories [str]
conversation_id Graph conversationId
conversation_index base64 conversationIndex
conversation_topic tema vlakna (z internet headers Thread-Topic)
in_reply_to Message-ID predchozi zpravy
internet_references [Message-ID]
received_at datetime UTC
sent_at datetime UTC
created_at datetime UTC
modified_at datetime UTC
folder_id Graph parentFolderId
folder_path cela cesta slozky (napr. Inbox/Subfolder)
sender.email emailova adresa odesilatele
sender.name zobrazovane jmeno
to retezec To (joined)
cc retezec CC
bcc retezec BCC
recipients [{type, email, name}]
body_html HTML telo (pokud contentType=='html', max 2 MB)
body_text plain-text telo (pokud contentType=='text', max 2 MB)
body_preview textovy nahled z Graph bodyPreview (max 255 znaku)
attachments [{filename, size_bytes, mime_type, is_inline, graph_att_id}]
headers dict internet headers
parsed_at datetime UTC
Indexy:
received_at, sent_at, sender.email, graph_id (unique),
conversation_id, folder_path, has_attachments, categories,
importance, flag_status, is_read,
text_search (subject + body_preview + to + cc)
Historie verzi:
1.0 2026-06-02 Inicialni verze
1.1 2026-06-02 Pridany rezimy --mode full/new-only/sync;
odstranen --skip-existing (nahrazen --mode new-only)
1.2 2026-06-02 $expand attachments s $select (bez contentBytes — rychlejsi);
prilohy ukladaji graph_att_id pro prime stazeni bez name-matchingu
1.3 2026-06-02 --mailbox jako povinny parametr — univerzalni pouziti pro
libovolnou schranku; kolekce v MongoDB = nazev schranky
1.4 2026-06-03 Plain-text emaily (contentType=='text') se ukladaji do
noveho pole body_text (max 2 MB), drive se truncovalo na
2000 znaku do body_preview a zbytek se zahazoval.
body_preview ted obsahuje vzdy puvodni Graph bodyPreview.
Pro existujici emaily z v1.3 lze pouzit
refetch_text_bodies_v1.0.py.
"""
import sys
import re
import logging
import argparse
import base64
from pathlib import Path
from datetime import datetime, timezone
from typing import Optional
import msal
import requests
from dateutil import parser as dtparser
from pymongo import MongoClient, UpdateOne, ASCENDING, TEXT
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
# ─── KONFIGURACE ──────────────────────────────────────────────────────────────
GRAPH_TENANT_ID = "7d269944-37a4-43a1-8140-c7517dc426e9"
GRAPH_CLIENT_ID = "4b222bfd-78c9-4239-a53f-43006b3ed07f"
GRAPH_CLIENT_SECRET = "Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk"
GRAPH_URL = "https://graph.microsoft.com/v1.0"
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "emaily"
BATCH_SIZE = 100
PAGE_SIZE = 50
LOG_FILE = Path(__file__).parent / "parse_emails_errors.log"
SCRIPT_VERSION = "1.4"
# Schránka se nastavuje za behu z --mailbox parametru
GRAPH_MAILBOX: str = ""
# ──────────────────────────────────────────────────────────────────────────────
logging.basicConfig(
filename=str(LOG_FILE),
level=logging.ERROR,
format="%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
encoding="utf-8",
)
IMPORTANCE_MAP = {"low": 0, "normal": 1, "high": 2}
FLAG_STATUS_MAP = {"notFlagged": 0, "flagged": 1, "complete": 2}
RE_SUBJECT = re.compile(r"^(RE|FW|AW|SV|VS|TR|WG|odpov[eě]d[ťt]|fwd?)[:\s]+", re.IGNORECASE)
# $expand prilohy bez contentBytes — jen metadata co potrebujeme
ATT_EXPAND = "attachments($select=id,name,contentType,size,isInline)"
MSG_SELECT = (
"id,internetMessageId,subject,bodyPreview,body,"
"importance,isRead,isDraft,hasAttachments,"
"receivedDateTime,sentDateTime,createdDateTime,lastModifiedDateTime,"
"sender,from,toRecipients,ccRecipients,bccRecipients,replyTo,"
"conversationId,conversationIndex,parentFolderId,"
"categories,flag,inferenceClassification,internetMessageHeaders"
)
MSG_SELECT_SYNC = (
"id,internetMessageId,isRead,isDraft,flag,categories,"
"lastModifiedDateTime,parentFolderId,importance"
)
# ─── Graph API helpers ────────────────────────────────────────────────────────
_graph_token: Optional[str] = None
def get_token() -> str:
global _graph_token
app = msal.ConfidentialClientApplication(
GRAPH_CLIENT_ID,
authority=f"https://login.microsoftonline.com/{GRAPH_TENANT_ID}",
client_credential=GRAPH_CLIENT_SECRET,
)
result = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])
if "access_token" not in result:
raise RuntimeError(f"Graph auth failed: {result}")
_graph_token = result["access_token"]
return _graph_token
def graph_get(url: str, params: dict = None) -> dict:
global _graph_token
if not _graph_token:
get_token()
for attempt in range(2):
r = requests.get(url, headers={"Authorization": f"Bearer {_graph_token}"}, params=params, timeout=30)
if r.status_code == 401:
get_token()
continue
r.raise_for_status()
return r.json()
raise RuntimeError(f"Graph GET failed after retry: {url}")
def get_all_folders(parent_id: str = None, parent_path: str = "") -> list[dict]:
"""Rekurzivne nacte vsechny slozky schranky. Vraci [{id, path}]."""
if parent_id is None:
url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/mailFolders"
else:
url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/mailFolders/{parent_id}/childFolders"
folders = []
params = {"$top": 100, "$select": "id,displayName,childFolderCount"}
while url:
data = graph_get(url, params)
for f in data.get("value", []):
path = f"{parent_path}/{f['displayName']}".lstrip("/")
folders.append({"id": f["id"], "path": path})
if f.get("childFolderCount", 0) > 0:
folders.extend(get_all_folders(f["id"], path))
url = data.get("@odata.nextLink")
params = None
return folders
def iter_folder_messages(folder_id: str, select: str = MSG_SELECT, expand_attachments: bool = True):
"""Generator: vraci zpravy ze slozky po strankach."""
url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/mailFolders/{folder_id}/messages"
params = {"$top": PAGE_SIZE, "$select": select}
if expand_attachments:
params["$expand"] = ATT_EXPAND
while url:
data = graph_get(url, params)
for msg in data.get("value", []):
yield msg
url = data.get("@odata.nextLink")
params = None
# ─── Pomocné funkce ───────────────────────────────────────────────────────────
def parse_date(raw) -> Optional[datetime]:
if raw is None:
return None
if isinstance(raw, datetime):
if raw.tzinfo:
return raw.astimezone(timezone.utc).replace(tzinfo=None)
return raw
try:
dt = dtparser.parse(str(raw))
if dt.tzinfo:
return dt.astimezone(timezone.utc).replace(tzinfo=None)
return dt
except Exception:
return None
def normalize_subject(subject: str) -> str:
s = subject.strip()
while True:
m = RE_SUBJECT.match(s)
if not m:
break
s = s[m.end():].strip()
return s
def parse_headers(raw_headers: list) -> dict:
result = {}
for h in raw_headers:
k = h["name"].lower().replace("-", "_")
v = h["value"]
if k in result:
existing = result[k]
result[k] = existing + [v] if isinstance(existing, list) else [existing, v]
else:
result[k] = v
return result
def format_recipients(lst: list) -> str:
return "; ".join(
f'{r["emailAddress"].get("name", "")} <{r["emailAddress"].get("address", "")}>'.strip()
for r in lst
)
# ─── Extrakce zprávy ─────────────────────────────────────────────────────────
def extract_message(msg: dict, folder_path: str) -> Optional[dict]:
"""Plna extrakce — pouziva se pro mode full a nove zpravy v sync/new-only."""
try:
mid = (msg.get("internetMessageId") or "").strip() or f"graphid:{msg['id']}"
subject = msg.get("subject") or ""
body_html = None
body_text = None
body_preview = msg.get("bodyPreview") or ""
body = msg.get("body", {})
_MAX_BODY = 2 * 1024 * 1024 # 2 MB
if body.get("contentType") == "html":
content = body.get("content") or ""
body_html = content if len(content) <= _MAX_BODY else content[:_MAX_BODY]
elif body.get("contentType") == "text":
content = body.get("content") or ""
# v1.4: ulozime PLNY plain text do body_text (drive se truncovalo na 2000 znaku
# do body_preview a zbytek se zahodil)
body_text = content if len(content) <= _MAX_BODY else content[:_MAX_BODY]
sender_ea = (msg.get("from") or msg.get("sender") or {}).get("emailAddress", {})
to_list = msg.get("toRecipients", [])
cc_list = msg.get("ccRecipients", [])
bcc_list = msg.get("bccRecipients", [])
recipients = (
[{"type": "to", "email": r["emailAddress"].get("address",""), "name": r["emailAddress"].get("name","")} for r in to_list] +
[{"type": "cc", "email": r["emailAddress"].get("address",""), "name": r["emailAddress"].get("name","")} for r in cc_list] +
[{"type": "bcc", "email": r["emailAddress"].get("address",""), "name": r["emailAddress"].get("name","")} for r in bcc_list]
)
importance = IMPORTANCE_MAP.get(msg.get("importance", "normal"), 1)
flag_status = FLAG_STATUS_MAP.get((msg.get("flag") or {}).get("flagStatus", "notFlagged"), 0)
raw_headers = msg.get("internetMessageHeaders") or []
headers = parse_headers(raw_headers)
in_reply_to = headers.get("in_reply_to", "")
if isinstance(in_reply_to, list):
in_reply_to = in_reply_to[0]
refs_raw = headers.get("references", "")
if isinstance(refs_raw, list):
refs_raw = " ".join(refs_raw)
internet_refs = [r.strip() for r in refs_raw.split() if r.strip()] if refs_raw else []
conv_topic = headers.get("thread_topic", "")
if isinstance(conv_topic, list):
conv_topic = conv_topic[0]
conv_index = ""
ci_raw = msg.get("conversationIndex")
if ci_raw:
try:
conv_index = base64.b64encode(base64.b64decode(ci_raw)).decode()
except Exception:
conv_index = ci_raw
attachments = []
for att in msg.get("attachments") or []:
fname = att.get("name") or ""
if not fname:
continue
attachments.append({
"filename": fname,
"size_bytes": att.get("size", 0),
"mime_type": att.get("contentType", "application/octet-stream"),
"is_inline": att.get("isInline", False),
"graph_att_id": att.get("id"),
})
return {
"_id": mid,
"graph_id": msg["id"],
"subject": subject,
"normalized_subject": normalize_subject(subject),
"importance": importance,
"flag_status": flag_status,
"is_read": msg.get("isRead", False),
"is_draft": msg.get("isDraft", False),
"has_attachments": msg.get("hasAttachments", False),
"attachment_count": len(attachments),
"inference_classification": msg.get("inferenceClassification", ""),
"categories": msg.get("categories") or [],
"conversation_id": msg.get("conversationId", ""),
"conversation_index": conv_index,
"conversation_topic": conv_topic,
"in_reply_to": in_reply_to,
"internet_references": internet_refs,
"received_at": parse_date(msg.get("receivedDateTime")),
"sent_at": parse_date(msg.get("sentDateTime")),
"created_at": parse_date(msg.get("createdDateTime")),
"modified_at": parse_date(msg.get("lastModifiedDateTime")),
"folder_id": msg.get("parentFolderId", ""),
"folder_path": folder_path,
"sender": {
"email": sender_ea.get("address", ""),
"name": sender_ea.get("name", ""),
},
"to": format_recipients(to_list),
"cc": format_recipients(cc_list),
"bcc": format_recipients(bcc_list),
"recipients": recipients,
"body_html": body_html,
"body_text": body_text,
"body_preview": body_preview,
"attachments": attachments,
"headers": headers,
"parsed_at": datetime.now(timezone.utc).replace(tzinfo=None),
}
except Exception as e:
logging.error("extract_message failed [%s]: %s", msg.get("id", "?"), e)
return None
def extract_sync_fields(msg: dict, folder_path: str) -> dict:
"""Jen menitelna pole — pouziva se v sync mode pro existujici zpravy."""
return {
"is_read": msg.get("isRead", False),
"is_draft": msg.get("isDraft", False),
"flag_status": FLAG_STATUS_MAP.get((msg.get("flag") or {}).get("flagStatus", "notFlagged"), 0),
"importance": IMPORTANCE_MAP.get(msg.get("importance", "normal"), 1),
"categories": msg.get("categories") or [],
"modified_at": parse_date(msg.get("lastModifiedDateTime")),
"folder_id": msg.get("parentFolderId", ""),
"folder_path": folder_path,
"parsed_at": datetime.now(timezone.utc).replace(tzinfo=None),
}
# ─── MongoDB indexy ───────────────────────────────────────────────────────────
def create_indexes(col):
print(" Vytvarim indexy...")
col.create_index([("received_at", ASCENDING)])
col.create_index([("sent_at", ASCENDING)])
col.create_index([("sender.email", ASCENDING)])
col.create_index([("graph_id", ASCENDING)], unique=True, sparse=True)
col.create_index([("conversation_id", ASCENDING)])
col.create_index([("folder_path", ASCENDING)])
col.create_index([("has_attachments", ASCENDING)])
col.create_index([("categories", ASCENDING)])
col.create_index([("importance", ASCENDING)])
col.create_index([("flag_status", ASCENDING)])
col.create_index([("is_read", ASCENDING)])
col.create_index([
("subject", TEXT),
("body_preview", TEXT),
("to", TEXT),
("cc", TEXT),
], name="text_search", default_language="none")
print(" Indexy hotovy.")
# ─── MAIN ─────────────────────────────────────────────────────────────────────
def main():
global GRAPH_MAILBOX
ap = argparse.ArgumentParser(description=f"parse_emails_graph v{SCRIPT_VERSION}")
ap.add_argument("--mailbox", required=True,
help="Emailova schranka (napr. ordinace@buzalkova.cz)")
ap.add_argument("--mode", default="full", choices=["full", "new-only", "sync"],
help="full=plny upsert (vychozi) | new-only=jen nove zpravy | "
"sync=existujici aktualizuje jen menitelna pole, nove importuje cely")
ap.add_argument("--limit", type=int, default=0,
help="Zpracovat max N zprav (0 = vse)")
ap.add_argument("--folder", default="",
help="Zpracovat jen slozku se zadanym nazvem (napr. Inbox)")
ap.add_argument("--no-indexes", action="store_true",
help="Nevytvorit indexy na konci")
args = ap.parse_args()
GRAPH_MAILBOX = args.mailbox
mongo_col = args.mailbox
start = datetime.now()
print(f"=== parse_emails_graph v{SCRIPT_VERSION} ===")
print(f"Start: {start.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Schránka: {GRAPH_MAILBOX}")
print(f"MongoDB: {MONGO_URI} -> {MONGO_DB}.{mongo_col}")
print(f"Režim: {args.mode}")
print("\nPřipojuji se k Graph API...")
try:
get_token()
print(" Graph API OK")
except Exception as e:
print(f" CHYBA: {e}")
sys.exit(1)
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
try:
client.admin.command("ping")
print(" MongoDB OK")
except Exception as e:
print(f" CHYBA: MongoDB neni dostupna -- {e}")
sys.exit(1)
col = client[MONGO_DB][mongo_col]
existing: set = set()
if args.mode in ("new-only", "sync"):
print(" Nacitam existujici zaznamy z MongoDB...")
existing = set(col.distinct("_id"))
print(f" {len(existing)} jiz importovano")
print("\nNacitam seznam slozek...")
all_folders = get_all_folders()
if args.folder:
all_folders = [f for f in all_folders if args.folder.lower() in f["path"].lower()]
print(f" Slozek ke zpracovani: {len(all_folders)}")
for f in all_folders:
print(f" {f['path']}")
is_sync = args.mode == "sync"
msg_select = MSG_SELECT_SYNC if is_sync else MSG_SELECT
expand_att = not is_sync
batch = []
ok_count = 0
sync_count = 0
err_count = 0
skip_count = 0
total_i = 0
def flush():
if not batch:
return
try:
col.bulk_write(batch, ordered=False)
except Exception as e:
logging.error("bulk_write: %s", e)
print(f" CHYBA bulk_write: {e}")
batch.clear()
print()
for folder in all_folders:
print(f"--- Složka: {folder['path']} ---")
folder_count = 0
for msg in iter_folder_messages(folder["id"], select=msg_select, expand_attachments=expand_att):
if args.limit and total_i >= args.limit:
break
mid = (msg.get("internetMessageId") or "").strip() or f"graphid:{msg['id']}"
total_i += 1
folder_count += 1
if args.mode == "new-only" and mid in existing:
skip_count += 1
continue
if is_sync and mid in existing:
fields = extract_sync_fields(msg, folder["path"])
batch.append(UpdateOne({"_id": mid}, {"$set": fields}))
sync_count += 1
print(f" {total_i:>6} SYN {mid[:80]}")
else:
if is_sync:
full_url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/messages/{msg['id']}"
full_params = {"$select": MSG_SELECT, "$expand": ATT_EXPAND}
try:
msg = graph_get(full_url, full_params)
except Exception as e:
logging.error("full fetch failed [%s]: %s", msg.get("id","?"), e)
err_count += 1
continue
doc = extract_message(msg, folder["path"])
if doc is None:
err_count += 1
print(f" {total_i:>6} ERR {mid[:80]}")
else:
batch.append(UpdateOne({"_id": doc["_id"]}, {"$set": doc}, upsert=True))
ok_count += 1
subject_str = (doc.get("subject") or "")[:60]
sender_str = (doc.get("sender", {}).get("email") or "")[:40]
print(f" {total_i:>6} OK {subject_str:<60} {sender_str}")
if len(batch) >= BATCH_SIZE:
flush()
if total_i % 500 == 0:
elapsed = (datetime.now() - start).total_seconds()
rate = total_i / elapsed if elapsed > 0 else 0
print(f" {''*80}")
print(f" Průběh: ok={ok_count} sync={sync_count} skip={skip_count} err={err_count} {rate:.1f} msg/s")
print(f" {''*80}")
flush()
print(f"{folder_count} zprav ze slozky {folder['path']}")
if args.limit and total_i >= args.limit:
break
elapsed_total = (datetime.now() - start).total_seconds()
print(f"\n{'='*52}")
print(f"Vysledek: ok={ok_count} | sync={sync_count} | skip={skip_count} | err={err_count}")
print(f"Celkovy cas: {int(elapsed_total//3600)}h {int((elapsed_total%3600)//60)}m {int(elapsed_total%60)}s")
print(f"Dokumentu v kolekci: {col.count_documents({})}")
if not args.no_indexes:
print()
create_indexes(col)
print(f"\nKonec: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
if err_count:
print(f"Chyby logovany do: {LOG_FILE}")
client.close()
if __name__ == "__main__":
main()