625 lines
25 KiB
Python
625 lines
25 KiB
Python
"""
|
|
parse_emails_graph_v1.4.py
|
|
Nazev: parse_emails_graph_v1.4.py
|
|
Verze: 1.4
|
|
Datum: 2026-06-03
|
|
Autor: vladimir.buzalka
|
|
|
|
Popis:
|
|
Cte vsechny emaily z libovolne schranky primo pres Microsoft Graph API
|
|
a importuje je jako dokumenty do MongoDB.
|
|
Ze kazde zpravy extrahuje vsechny dostupne vlastnosti:
|
|
|
|
- predmet, odesilatel, prijemci (To/CC/BCC s typy)
|
|
- cas doruceni, odeslani, vytvoreni, modifikace (UTC)
|
|
- telo HTML (max 2 MB) + textovy preview
|
|
- prilohy (metadata: jmeno, velikost, MIME typ, inline flag, graph_att_id)
|
|
- internet headers (SPF, DKIM, Received, X-*, ...)
|
|
- MAPI-ekvivalenty: dulezitost, priznak, konverzacni vlakno,
|
|
kategorie, In-Reply-To, References, ...
|
|
- navic: isRead, isDraft, folder_path, inferenceClassification
|
|
|
|
Prochazi VSECHNY slozky schranky rekurzivne (Inbox, Sent, Deleted,
|
|
archivni slozky, ...).
|
|
|
|
DB: emaily
|
|
Kolekce: <mailbox> (napr. ordinace@buzalkova.cz)
|
|
_id: Internet Message-ID (nebo "graphid:<id>" jako fallback)
|
|
|
|
POZOR: Skript pouze CIST ze schranky — zadny zapis do schranky!
|
|
|
|
Spousteni:
|
|
# Prvni import (vsechno):
|
|
python parse_emails_graph_v1.3.py --mailbox ordinace@buzalkova.cz
|
|
|
|
# Test na prvnich 50:
|
|
python parse_emails_graph_v1.3.py --mailbox ordinace@buzalkova.cz --limit 50 --no-indexes
|
|
|
|
# Jen jedna slozka:
|
|
python parse_emails_graph_v1.3.py --mailbox ordinace@buzalkova.cz --folder Inbox
|
|
|
|
# Pokracovani po preruseni (pouze nove):
|
|
python parse_emails_graph_v1.3.py --mailbox ordinace@buzalkova.cz --mode new-only
|
|
|
|
# Pravidelny sync (aktualizuje is_read, flag, slozku; importuje nove):
|
|
python parse_emails_graph_v1.3.py --mailbox ordinace@buzalkova.cz --mode sync
|
|
|
|
# Jina schranka:
|
|
python parse_emails_graph_v1.3.py --mailbox vladimir.buzalka@buzalka.cz
|
|
|
|
Rezimy (--mode):
|
|
full Plny upsert vsech poli pro kazdou zpravu (vychozi)
|
|
new-only Preskoci zpravy ktere uz jsou v MongoDB, importuje jen nove
|
|
sync Existujici: aktualizuje jen is_read/flag_status/categories/
|
|
modified_at/folder_path. Nove zpravy importuje cely.
|
|
Idealni pro pravidelne spousteni.
|
|
|
|
Zavislosti:
|
|
msal, requests, pymongo, python-dateutil
|
|
Python 3.10+
|
|
|
|
Struktura dokumentu v MongoDB:
|
|
_id Internet Message-ID (nebo graphid: fallback)
|
|
graph_id Graph API message ID
|
|
subject predmet zpravy
|
|
normalized_subject predmet bez RE:/FW:/AW: prefixu
|
|
importance 0=nizka 1=normalni 2=vysoka
|
|
flag_status 0=bez priznaku 1=oznaceno 2=dokonceno
|
|
is_read bool — aktualni stav precteni ve schrance
|
|
is_draft bool
|
|
has_attachments bool
|
|
attachment_count int
|
|
inference_classification focused / other
|
|
categories [str]
|
|
conversation_id Graph conversationId
|
|
conversation_index base64 conversationIndex
|
|
conversation_topic tema vlakna (z internet headers Thread-Topic)
|
|
in_reply_to Message-ID predchozi zpravy
|
|
internet_references [Message-ID]
|
|
received_at datetime UTC
|
|
sent_at datetime UTC
|
|
created_at datetime UTC
|
|
modified_at datetime UTC
|
|
folder_id Graph parentFolderId
|
|
folder_path cela cesta slozky (napr. Inbox/Subfolder)
|
|
sender.email emailova adresa odesilatele
|
|
sender.name zobrazovane jmeno
|
|
to retezec To (joined)
|
|
cc retezec CC
|
|
bcc retezec BCC
|
|
recipients [{type, email, name}]
|
|
body_html HTML telo (pokud contentType=='html', max 2 MB)
|
|
body_text plain-text telo (pokud contentType=='text', max 2 MB)
|
|
body_preview textovy nahled z Graph bodyPreview (max 255 znaku)
|
|
attachments [{filename, size_bytes, mime_type, is_inline, graph_att_id}]
|
|
headers dict internet headers
|
|
parsed_at datetime UTC
|
|
|
|
Indexy:
|
|
received_at, sent_at, sender.email, graph_id (unique),
|
|
conversation_id, folder_path, has_attachments, categories,
|
|
importance, flag_status, is_read,
|
|
text_search (subject + body_preview + to + cc)
|
|
|
|
Historie verzi:
|
|
1.0 2026-06-02 Inicialni verze
|
|
1.1 2026-06-02 Pridany rezimy --mode full/new-only/sync;
|
|
odstranen --skip-existing (nahrazen --mode new-only)
|
|
1.2 2026-06-02 $expand attachments s $select (bez contentBytes — rychlejsi);
|
|
prilohy ukladaji graph_att_id pro prime stazeni bez name-matchingu
|
|
1.3 2026-06-02 --mailbox jako povinny parametr — univerzalni pouziti pro
|
|
libovolnou schranku; kolekce v MongoDB = nazev schranky
|
|
1.4 2026-06-03 Plain-text emaily (contentType=='text') se ukladaji do
|
|
noveho pole body_text (max 2 MB), drive se truncovalo na
|
|
2000 znaku do body_preview a zbytek se zahazoval.
|
|
body_preview ted obsahuje vzdy puvodni Graph bodyPreview.
|
|
Pro existujici emaily z v1.3 lze pouzit
|
|
refetch_text_bodies_v1.0.py.
|
|
"""
|
|
|
|
import sys
|
|
import re
|
|
import logging
|
|
import argparse
|
|
import base64
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
import msal
|
|
import requests
|
|
from dateutil import parser as dtparser
|
|
from pymongo import MongoClient, UpdateOne, ASCENDING, TEXT
|
|
|
|
if hasattr(sys.stdout, "reconfigure"):
|
|
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
|
|
|
|
# ─── KONFIGURACE ──────────────────────────────────────────────────────────────
|
|
GRAPH_TENANT_ID = "7d269944-37a4-43a1-8140-c7517dc426e9"
|
|
GRAPH_CLIENT_ID = "4b222bfd-78c9-4239-a53f-43006b3ed07f"
|
|
GRAPH_CLIENT_SECRET = "Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk"
|
|
GRAPH_URL = "https://graph.microsoft.com/v1.0"
|
|
|
|
MONGO_URI = "mongodb://192.168.1.76:27017"
|
|
MONGO_DB = "emaily"
|
|
BATCH_SIZE = 100
|
|
PAGE_SIZE = 50
|
|
LOG_FILE = Path(__file__).parent / "parse_emails_errors.log"
|
|
SCRIPT_VERSION = "1.4"
|
|
|
|
# Schránka se nastavuje za behu z --mailbox parametru
|
|
GRAPH_MAILBOX: str = ""
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
logging.basicConfig(
|
|
filename=str(LOG_FILE),
|
|
level=logging.ERROR,
|
|
format="%(asctime)s | %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
IMPORTANCE_MAP = {"low": 0, "normal": 1, "high": 2}
|
|
FLAG_STATUS_MAP = {"notFlagged": 0, "flagged": 1, "complete": 2}
|
|
RE_SUBJECT = re.compile(r"^(RE|FW|AW|SV|VS|TR|WG|odpov[eě]d[ťt]|fwd?)[:\s]+", re.IGNORECASE)
|
|
|
|
# $expand prilohy bez contentBytes — jen metadata co potrebujeme
|
|
ATT_EXPAND = "attachments($select=id,name,contentType,size,isInline)"
|
|
|
|
MSG_SELECT = (
|
|
"id,internetMessageId,subject,bodyPreview,body,"
|
|
"importance,isRead,isDraft,hasAttachments,"
|
|
"receivedDateTime,sentDateTime,createdDateTime,lastModifiedDateTime,"
|
|
"sender,from,toRecipients,ccRecipients,bccRecipients,replyTo,"
|
|
"conversationId,conversationIndex,parentFolderId,"
|
|
"categories,flag,inferenceClassification,internetMessageHeaders"
|
|
)
|
|
|
|
MSG_SELECT_SYNC = (
|
|
"id,internetMessageId,isRead,isDraft,flag,categories,"
|
|
"lastModifiedDateTime,parentFolderId,importance"
|
|
)
|
|
|
|
|
|
# ─── Graph API helpers ────────────────────────────────────────────────────────
|
|
|
|
_graph_token: Optional[str] = None
|
|
|
|
|
|
def get_token() -> str:
|
|
global _graph_token
|
|
app = msal.ConfidentialClientApplication(
|
|
GRAPH_CLIENT_ID,
|
|
authority=f"https://login.microsoftonline.com/{GRAPH_TENANT_ID}",
|
|
client_credential=GRAPH_CLIENT_SECRET,
|
|
)
|
|
result = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])
|
|
if "access_token" not in result:
|
|
raise RuntimeError(f"Graph auth failed: {result}")
|
|
_graph_token = result["access_token"]
|
|
return _graph_token
|
|
|
|
|
|
def graph_get(url: str, params: dict = None) -> dict:
|
|
global _graph_token
|
|
if not _graph_token:
|
|
get_token()
|
|
for attempt in range(2):
|
|
r = requests.get(url, headers={"Authorization": f"Bearer {_graph_token}"}, params=params, timeout=30)
|
|
if r.status_code == 401:
|
|
get_token()
|
|
continue
|
|
r.raise_for_status()
|
|
return r.json()
|
|
raise RuntimeError(f"Graph GET failed after retry: {url}")
|
|
|
|
|
|
def get_all_folders(parent_id: str = None, parent_path: str = "") -> list[dict]:
|
|
"""Rekurzivne nacte vsechny slozky schranky. Vraci [{id, path}]."""
|
|
if parent_id is None:
|
|
url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/mailFolders"
|
|
else:
|
|
url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/mailFolders/{parent_id}/childFolders"
|
|
|
|
folders = []
|
|
params = {"$top": 100, "$select": "id,displayName,childFolderCount"}
|
|
while url:
|
|
data = graph_get(url, params)
|
|
for f in data.get("value", []):
|
|
path = f"{parent_path}/{f['displayName']}".lstrip("/")
|
|
folders.append({"id": f["id"], "path": path})
|
|
if f.get("childFolderCount", 0) > 0:
|
|
folders.extend(get_all_folders(f["id"], path))
|
|
url = data.get("@odata.nextLink")
|
|
params = None
|
|
return folders
|
|
|
|
|
|
def iter_folder_messages(folder_id: str, select: str = MSG_SELECT, expand_attachments: bool = True):
|
|
"""Generator: vraci zpravy ze slozky po strankach."""
|
|
url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/mailFolders/{folder_id}/messages"
|
|
params = {"$top": PAGE_SIZE, "$select": select}
|
|
if expand_attachments:
|
|
params["$expand"] = ATT_EXPAND
|
|
while url:
|
|
data = graph_get(url, params)
|
|
for msg in data.get("value", []):
|
|
yield msg
|
|
url = data.get("@odata.nextLink")
|
|
params = None
|
|
|
|
|
|
# ─── Pomocné funkce ───────────────────────────────────────────────────────────
|
|
|
|
def parse_date(raw) -> Optional[datetime]:
|
|
if raw is None:
|
|
return None
|
|
if isinstance(raw, datetime):
|
|
if raw.tzinfo:
|
|
return raw.astimezone(timezone.utc).replace(tzinfo=None)
|
|
return raw
|
|
try:
|
|
dt = dtparser.parse(str(raw))
|
|
if dt.tzinfo:
|
|
return dt.astimezone(timezone.utc).replace(tzinfo=None)
|
|
return dt
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def normalize_subject(subject: str) -> str:
|
|
s = subject.strip()
|
|
while True:
|
|
m = RE_SUBJECT.match(s)
|
|
if not m:
|
|
break
|
|
s = s[m.end():].strip()
|
|
return s
|
|
|
|
|
|
def parse_headers(raw_headers: list) -> dict:
|
|
result = {}
|
|
for h in raw_headers:
|
|
k = h["name"].lower().replace("-", "_")
|
|
v = h["value"]
|
|
if k in result:
|
|
existing = result[k]
|
|
result[k] = existing + [v] if isinstance(existing, list) else [existing, v]
|
|
else:
|
|
result[k] = v
|
|
return result
|
|
|
|
|
|
def format_recipients(lst: list) -> str:
|
|
return "; ".join(
|
|
f'{r["emailAddress"].get("name", "")} <{r["emailAddress"].get("address", "")}>'.strip()
|
|
for r in lst
|
|
)
|
|
|
|
|
|
# ─── Extrakce zprávy ─────────────────────────────────────────────────────────
|
|
|
|
def extract_message(msg: dict, folder_path: str) -> Optional[dict]:
|
|
"""Plna extrakce — pouziva se pro mode full a nove zpravy v sync/new-only."""
|
|
try:
|
|
mid = (msg.get("internetMessageId") or "").strip() or f"graphid:{msg['id']}"
|
|
subject = msg.get("subject") or ""
|
|
|
|
body_html = None
|
|
body_text = None
|
|
body_preview = msg.get("bodyPreview") or ""
|
|
body = msg.get("body", {})
|
|
_MAX_BODY = 2 * 1024 * 1024 # 2 MB
|
|
if body.get("contentType") == "html":
|
|
content = body.get("content") or ""
|
|
body_html = content if len(content) <= _MAX_BODY else content[:_MAX_BODY]
|
|
elif body.get("contentType") == "text":
|
|
content = body.get("content") or ""
|
|
# v1.4: ulozime PLNY plain text do body_text (drive se truncovalo na 2000 znaku
|
|
# do body_preview a zbytek se zahodil)
|
|
body_text = content if len(content) <= _MAX_BODY else content[:_MAX_BODY]
|
|
|
|
sender_ea = (msg.get("from") or msg.get("sender") or {}).get("emailAddress", {})
|
|
to_list = msg.get("toRecipients", [])
|
|
cc_list = msg.get("ccRecipients", [])
|
|
bcc_list = msg.get("bccRecipients", [])
|
|
|
|
recipients = (
|
|
[{"type": "to", "email": r["emailAddress"].get("address",""), "name": r["emailAddress"].get("name","")} for r in to_list] +
|
|
[{"type": "cc", "email": r["emailAddress"].get("address",""), "name": r["emailAddress"].get("name","")} for r in cc_list] +
|
|
[{"type": "bcc", "email": r["emailAddress"].get("address",""), "name": r["emailAddress"].get("name","")} for r in bcc_list]
|
|
)
|
|
|
|
importance = IMPORTANCE_MAP.get(msg.get("importance", "normal"), 1)
|
|
flag_status = FLAG_STATUS_MAP.get((msg.get("flag") or {}).get("flagStatus", "notFlagged"), 0)
|
|
|
|
raw_headers = msg.get("internetMessageHeaders") or []
|
|
headers = parse_headers(raw_headers)
|
|
|
|
in_reply_to = headers.get("in_reply_to", "")
|
|
if isinstance(in_reply_to, list):
|
|
in_reply_to = in_reply_to[0]
|
|
|
|
refs_raw = headers.get("references", "")
|
|
if isinstance(refs_raw, list):
|
|
refs_raw = " ".join(refs_raw)
|
|
internet_refs = [r.strip() for r in refs_raw.split() if r.strip()] if refs_raw else []
|
|
|
|
conv_topic = headers.get("thread_topic", "")
|
|
if isinstance(conv_topic, list):
|
|
conv_topic = conv_topic[0]
|
|
|
|
conv_index = ""
|
|
ci_raw = msg.get("conversationIndex")
|
|
if ci_raw:
|
|
try:
|
|
conv_index = base64.b64encode(base64.b64decode(ci_raw)).decode()
|
|
except Exception:
|
|
conv_index = ci_raw
|
|
|
|
attachments = []
|
|
for att in msg.get("attachments") or []:
|
|
fname = att.get("name") or ""
|
|
if not fname:
|
|
continue
|
|
attachments.append({
|
|
"filename": fname,
|
|
"size_bytes": att.get("size", 0),
|
|
"mime_type": att.get("contentType", "application/octet-stream"),
|
|
"is_inline": att.get("isInline", False),
|
|
"graph_att_id": att.get("id"),
|
|
})
|
|
|
|
return {
|
|
"_id": mid,
|
|
"graph_id": msg["id"],
|
|
|
|
"subject": subject,
|
|
"normalized_subject": normalize_subject(subject),
|
|
"importance": importance,
|
|
"flag_status": flag_status,
|
|
"is_read": msg.get("isRead", False),
|
|
"is_draft": msg.get("isDraft", False),
|
|
"has_attachments": msg.get("hasAttachments", False),
|
|
"attachment_count": len(attachments),
|
|
"inference_classification": msg.get("inferenceClassification", ""),
|
|
"categories": msg.get("categories") or [],
|
|
|
|
"conversation_id": msg.get("conversationId", ""),
|
|
"conversation_index": conv_index,
|
|
"conversation_topic": conv_topic,
|
|
"in_reply_to": in_reply_to,
|
|
"internet_references": internet_refs,
|
|
|
|
"received_at": parse_date(msg.get("receivedDateTime")),
|
|
"sent_at": parse_date(msg.get("sentDateTime")),
|
|
"created_at": parse_date(msg.get("createdDateTime")),
|
|
"modified_at": parse_date(msg.get("lastModifiedDateTime")),
|
|
|
|
"folder_id": msg.get("parentFolderId", ""),
|
|
"folder_path": folder_path,
|
|
|
|
"sender": {
|
|
"email": sender_ea.get("address", ""),
|
|
"name": sender_ea.get("name", ""),
|
|
},
|
|
"to": format_recipients(to_list),
|
|
"cc": format_recipients(cc_list),
|
|
"bcc": format_recipients(bcc_list),
|
|
"recipients": recipients,
|
|
|
|
"body_html": body_html,
|
|
"body_text": body_text,
|
|
"body_preview": body_preview,
|
|
|
|
"attachments": attachments,
|
|
"headers": headers,
|
|
|
|
"parsed_at": datetime.now(timezone.utc).replace(tzinfo=None),
|
|
}
|
|
|
|
except Exception as e:
|
|
logging.error("extract_message failed [%s]: %s", msg.get("id", "?"), e)
|
|
return None
|
|
|
|
|
|
def extract_sync_fields(msg: dict, folder_path: str) -> dict:
|
|
"""Jen menitelna pole — pouziva se v sync mode pro existujici zpravy."""
|
|
return {
|
|
"is_read": msg.get("isRead", False),
|
|
"is_draft": msg.get("isDraft", False),
|
|
"flag_status": FLAG_STATUS_MAP.get((msg.get("flag") or {}).get("flagStatus", "notFlagged"), 0),
|
|
"importance": IMPORTANCE_MAP.get(msg.get("importance", "normal"), 1),
|
|
"categories": msg.get("categories") or [],
|
|
"modified_at": parse_date(msg.get("lastModifiedDateTime")),
|
|
"folder_id": msg.get("parentFolderId", ""),
|
|
"folder_path": folder_path,
|
|
"parsed_at": datetime.now(timezone.utc).replace(tzinfo=None),
|
|
}
|
|
|
|
|
|
# ─── MongoDB indexy ───────────────────────────────────────────────────────────
|
|
|
|
def create_indexes(col):
|
|
print(" Vytvarim indexy...")
|
|
col.create_index([("received_at", ASCENDING)])
|
|
col.create_index([("sent_at", ASCENDING)])
|
|
col.create_index([("sender.email", ASCENDING)])
|
|
col.create_index([("graph_id", ASCENDING)], unique=True, sparse=True)
|
|
col.create_index([("conversation_id", ASCENDING)])
|
|
col.create_index([("folder_path", ASCENDING)])
|
|
col.create_index([("has_attachments", ASCENDING)])
|
|
col.create_index([("categories", ASCENDING)])
|
|
col.create_index([("importance", ASCENDING)])
|
|
col.create_index([("flag_status", ASCENDING)])
|
|
col.create_index([("is_read", ASCENDING)])
|
|
col.create_index([
|
|
("subject", TEXT),
|
|
("body_preview", TEXT),
|
|
("to", TEXT),
|
|
("cc", TEXT),
|
|
], name="text_search", default_language="none")
|
|
print(" Indexy hotovy.")
|
|
|
|
|
|
# ─── MAIN ─────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
global GRAPH_MAILBOX
|
|
|
|
ap = argparse.ArgumentParser(description=f"parse_emails_graph v{SCRIPT_VERSION}")
|
|
ap.add_argument("--mailbox", required=True,
|
|
help="Emailova schranka (napr. ordinace@buzalkova.cz)")
|
|
ap.add_argument("--mode", default="full", choices=["full", "new-only", "sync"],
|
|
help="full=plny upsert (vychozi) | new-only=jen nove zpravy | "
|
|
"sync=existujici aktualizuje jen menitelna pole, nove importuje cely")
|
|
ap.add_argument("--limit", type=int, default=0,
|
|
help="Zpracovat max N zprav (0 = vse)")
|
|
ap.add_argument("--folder", default="",
|
|
help="Zpracovat jen slozku se zadanym nazvem (napr. Inbox)")
|
|
ap.add_argument("--no-indexes", action="store_true",
|
|
help="Nevytvorit indexy na konci")
|
|
args = ap.parse_args()
|
|
|
|
GRAPH_MAILBOX = args.mailbox
|
|
mongo_col = args.mailbox
|
|
|
|
start = datetime.now()
|
|
print(f"=== parse_emails_graph v{SCRIPT_VERSION} ===")
|
|
print(f"Start: {start.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print(f"Schránka: {GRAPH_MAILBOX}")
|
|
print(f"MongoDB: {MONGO_URI} -> {MONGO_DB}.{mongo_col}")
|
|
print(f"Režim: {args.mode}")
|
|
|
|
print("\nPřipojuji se k Graph API...")
|
|
try:
|
|
get_token()
|
|
print(" Graph API OK")
|
|
except Exception as e:
|
|
print(f" CHYBA: {e}")
|
|
sys.exit(1)
|
|
|
|
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
|
|
try:
|
|
client.admin.command("ping")
|
|
print(" MongoDB OK")
|
|
except Exception as e:
|
|
print(f" CHYBA: MongoDB neni dostupna -- {e}")
|
|
sys.exit(1)
|
|
col = client[MONGO_DB][mongo_col]
|
|
|
|
existing: set = set()
|
|
if args.mode in ("new-only", "sync"):
|
|
print(" Nacitam existujici zaznamy z MongoDB...")
|
|
existing = set(col.distinct("_id"))
|
|
print(f" {len(existing)} jiz importovano")
|
|
|
|
print("\nNacitam seznam slozek...")
|
|
all_folders = get_all_folders()
|
|
if args.folder:
|
|
all_folders = [f for f in all_folders if args.folder.lower() in f["path"].lower()]
|
|
print(f" Slozek ke zpracovani: {len(all_folders)}")
|
|
for f in all_folders:
|
|
print(f" {f['path']}")
|
|
|
|
is_sync = args.mode == "sync"
|
|
msg_select = MSG_SELECT_SYNC if is_sync else MSG_SELECT
|
|
expand_att = not is_sync
|
|
|
|
batch = []
|
|
ok_count = 0
|
|
sync_count = 0
|
|
err_count = 0
|
|
skip_count = 0
|
|
total_i = 0
|
|
|
|
def flush():
|
|
if not batch:
|
|
return
|
|
try:
|
|
col.bulk_write(batch, ordered=False)
|
|
except Exception as e:
|
|
logging.error("bulk_write: %s", e)
|
|
print(f" CHYBA bulk_write: {e}")
|
|
batch.clear()
|
|
|
|
print()
|
|
for folder in all_folders:
|
|
print(f"--- Složka: {folder['path']} ---")
|
|
folder_count = 0
|
|
|
|
for msg in iter_folder_messages(folder["id"], select=msg_select, expand_attachments=expand_att):
|
|
if args.limit and total_i >= args.limit:
|
|
break
|
|
|
|
mid = (msg.get("internetMessageId") or "").strip() or f"graphid:{msg['id']}"
|
|
total_i += 1
|
|
folder_count += 1
|
|
|
|
if args.mode == "new-only" and mid in existing:
|
|
skip_count += 1
|
|
continue
|
|
|
|
if is_sync and mid in existing:
|
|
fields = extract_sync_fields(msg, folder["path"])
|
|
batch.append(UpdateOne({"_id": mid}, {"$set": fields}))
|
|
sync_count += 1
|
|
print(f" {total_i:>6} SYN {mid[:80]}")
|
|
else:
|
|
if is_sync:
|
|
full_url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/messages/{msg['id']}"
|
|
full_params = {"$select": MSG_SELECT, "$expand": ATT_EXPAND}
|
|
try:
|
|
msg = graph_get(full_url, full_params)
|
|
except Exception as e:
|
|
logging.error("full fetch failed [%s]: %s", msg.get("id","?"), e)
|
|
err_count += 1
|
|
continue
|
|
|
|
doc = extract_message(msg, folder["path"])
|
|
if doc is None:
|
|
err_count += 1
|
|
print(f" {total_i:>6} ERR {mid[:80]}")
|
|
else:
|
|
batch.append(UpdateOne({"_id": doc["_id"]}, {"$set": doc}, upsert=True))
|
|
ok_count += 1
|
|
subject_str = (doc.get("subject") or "")[:60]
|
|
sender_str = (doc.get("sender", {}).get("email") or "")[:40]
|
|
print(f" {total_i:>6} OK {subject_str:<60} {sender_str}")
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
flush()
|
|
|
|
if total_i % 500 == 0:
|
|
elapsed = (datetime.now() - start).total_seconds()
|
|
rate = total_i / elapsed if elapsed > 0 else 0
|
|
print(f" {'─'*80}")
|
|
print(f" Průběh: ok={ok_count} sync={sync_count} skip={skip_count} err={err_count} {rate:.1f} msg/s")
|
|
print(f" {'─'*80}")
|
|
|
|
flush()
|
|
print(f" → {folder_count} zprav ze slozky {folder['path']}")
|
|
|
|
if args.limit and total_i >= args.limit:
|
|
break
|
|
|
|
elapsed_total = (datetime.now() - start).total_seconds()
|
|
print(f"\n{'='*52}")
|
|
print(f"Vysledek: ok={ok_count} | sync={sync_count} | skip={skip_count} | err={err_count}")
|
|
print(f"Celkovy cas: {int(elapsed_total//3600)}h {int((elapsed_total%3600)//60)}m {int(elapsed_total%60)}s")
|
|
print(f"Dokumentu v kolekci: {col.count_documents({})}")
|
|
|
|
if not args.no_indexes:
|
|
print()
|
|
create_indexes(col)
|
|
|
|
print(f"\nKonec: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
if err_count:
|
|
print(f"Chyby logovany do: {LOG_FILE}")
|
|
|
|
client.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|