This commit is contained in:
2026-06-14 08:25:15 +02:00
parent f94573ea6e
commit ed6455787a
7 changed files with 876 additions and 20 deletions
+48 -7
View File
@@ -20,6 +20,7 @@ Spusteni:
from __future__ import annotations
import re
import sys
import traceback
from datetime import datetime, timezone, timedelta
@@ -178,6 +179,8 @@ def search(
mailbox: Optional[Union[str, list]] = None,
since: Optional[str] = None,
until: Optional[str] = None,
days: Optional[int] = None,
inflect: bool = False,
folder_contains: Optional[str] = None,
sender_contains: Optional[str] = None,
has_attachments: Optional[bool] = None,
@@ -186,15 +189,23 @@ def search(
"""PRIMARY TOOL — fulltext search across all indexed emails.
Index includes: subject, sender (email + name), recipients (to/cc),
attachment filenames, AND full body text.
attachment filenames, AND full body text. Diacritics are stripped at index
time, so search is already accent-insensitive (recept == řecept == RECEPT).
query: websearch_to_tsquery syntax:
query: websearch_to_tsquery syntax (only when inflect=False):
invoice payment -> AND
"lot expiration" -> phrase
SAE OR "serious adverse" -> OR
urgent -newsletter -> exclude
mailbox: one mailbox string or list (e.g. "vbuzalka@its.jnj.com"). None = all.
since/until: ISO date "YYYY-MM-DD" on received_at
days: convenience window — only the last N days (overrides `since`). "za posledních X dní".
inflect: Czech declension. The index uses no stemmer, so a plain search for
"recept" misses "recepty/receptu/receptem/...". With inflect=True each word
in `query` is prefix-matched (recept -> recept:*) and AND-ed, catching the
other grammatical cases. Set this for Czech-word searches. Trade-off: a
prefix also matches unrelated longer words (recept:* also hits "receptor").
In this mode the query is treated as plain words (operators/quotes ignored).
folder_contains: substring match against folder_path (case-insensitive)
sender_contains: substring match against sender_email OR sender_name (case-insensitive)
has_attachments: True / False / None (any)
@@ -207,11 +218,23 @@ def search(
mboxes = normalize_mailbox(mailbox)
since_dt = parse_since(since)
until_dt = parse_since(until)
if days and days > 0:
since_dt = datetime.now(timezone.utc) - timedelta(days=days)
limit = min(max(1, limit), 100)
sql = """
# Build the tsquery. inflect=True → prefix-match each word (Czech cases)
# via to_tsquery; otherwise use websearch_to_tsquery for full operator support.
tsq_func = "websearch_to_tsquery"
tsq_text = query
if inflect:
tokens = re.findall(r"\w+", query, flags=re.UNICODE)
if tokens:
tsq_func = "to_tsquery"
tsq_text = " & ".join(f"{t}:*" for t in tokens)
sql = f"""
WITH q AS (
SELECT websearch_to_tsquery('soubory'::regconfig, %(query)s) AS tsq
SELECT {tsq_func}('soubory'::regconfig, %(query)s) AS tsq
)
SELECT
e.id, e.mailbox, e.message_id, e.conversation_id, e.folder_path,
@@ -573,14 +596,26 @@ def find_attachment(
def top_senders(
mailbox: Optional[Union[str, list]] = None,
since: Optional[str] = None,
days: Optional[int] = None,
folder_contains: Optional[str] = None,
limit: int = 20,
) -> dict:
"""Top senders by volume (count of received emails). Optionally limit by mailbox or date window.
Use for "who emails me most" or "top senders this month".
"""Unique senders grouped by sender_email, counted, sorted by count DESC.
Use for "who emails me most" / "top senders this month".
mailbox: one mailbox string or list. None = all.
since: ISO date "YYYY-MM-DD" lower bound on received_at.
days: convenience window — count only the last N days (overrides `since`
when both given). Use for "za posledních X dní".
folder_contains: substring match against folder_path (case-insensitive).
Pass "Inbox" to count ONLY received/incoming mail and exclude the
mailbox owner's own Sent Items, Drafts, etc. Default None = all folders.
"""
try:
mboxes = normalize_mailbox(mailbox)
since_dt = parse_since(since)
if days and days > 0:
since_dt = datetime.now(timezone.utc) - timedelta(days=days)
limit = min(max(1, limit), 100)
sql = """
SELECT sender_email, count(*) AS c, max(received_at) AS last_at
@@ -588,12 +623,18 @@ def top_senders(
WHERE ok = TRUE AND sender_email IS NOT NULL
AND (%(mboxes)s::text[] IS NULL OR mailbox = ANY(%(mboxes)s::text[]))
AND (%(since)s::timestamptz IS NULL OR received_at >= %(since)s::timestamptz)
AND (%(folder)s::text IS NULL OR folder_path ILIKE %(folder_like)s)
GROUP BY sender_email
ORDER BY c DESC
LIMIT %(limit)s
"""
with pg_conn() as pg, pg.cursor() as cur:
cur.execute(sql, {"mboxes": mboxes, "since": since_dt, "limit": limit})
cur.execute(sql, {
"mboxes": mboxes, "since": since_dt,
"folder": folder_contains,
"folder_like": f"%{folder_contains}%" if folder_contains else None,
"limit": limit,
})
rows = [{"sender_email": s, "count": c, "last_at": serialize(t)}
for s, c, t in cur.fetchall()]
return {"count": len(rows), "results": rows}
+2 -2
View File
@@ -55,10 +55,10 @@ if _REQ_FILE.exists():
# Definice pipeline (step_id, label, executable filename)
STEPS = [
("1b", "Graph delta sync", "1b_parse_emails_graph_delta_v1.0.py"),
("1b", "Graph delta sync", "1b_parse_emails_graph_delta_v1.1.py"),
("3", "Download attachments", "3_download_attachments_v1.5.py"),
("4", "Unwrap S/MIME", "4_unwrap_smime_v1.0.py"),
("5", "Enrich fulltext (PG)", "5_enrich_fulltext_emails_v1.3.py"),
("5", "Enrich fulltext (PG)", "5_enrich_fulltext_emails_v1.4.py"),
]
@@ -0,0 +1,143 @@
# Zmeny v1.1 (2026-06-10)
- Bugfix: NON_MAILBOX_COLLECTIONS += jnj_messages, jnj_sync_state (phantom kolekce JNJ folder trackingu zpusobovaly Graph 404 -> FAIL kroku 1b).
# 1b_parse_emails_graph_delta_v1.0.py
**Inkrementalní sync přes Microsoft Graph delta query.** Sourozenec [`1_parse_emails_graph_v1.4.py`](1_parse_emails_graph_v1.4.md) — každý řeší jiný use case:
| Skript | Použití |
|---|---|
| `1_parse_emails_graph_v1.4.py` | **První plný import** schránky (vše od začátku) |
| `1b_parse_emails_graph_delta_v1.0.py` | **Pravidelný sync** — jen co se od minula změnilo |
## Jak funguje
Graph API vystavuje `messages/delta` endpoint, který si pamatuje **záložku** (`deltaLink` s tokenem). Při dalším volání s touto záložkou vrátí jen:
- **nové zprávy**
- **změny** existujících (`isRead`, vlajka, přesun do jiné složky, kategorie)
- **smazané** zprávy (`@removed`)
Delta běží **per složka**. Skript drží stav v Mongo kolekci `emaily.sync_state`:
```json
{
"_id": "ordinace@buzalkova.cz|<folder_id>",
"mailbox": "ordinace@buzalkova.cz",
"folder_id": "AAA...",
"folder_path": "Inbox",
"delta_link": "https://graph.microsoft.com/.../delta?$deltatoken=...",
"last_run_at": "2026-06-04T10:00:00Z",
"cumulative_new": 1234, "cumulative_sync": 5678, "cumulative_removed": 12, "run_count": 42
}
```
První běh = fresh delta (Graph vrátí všechno + dá `deltaLink`). Každý další = jen změny od poslední záložky.
## Co se stane se smazanými zprávami
Když delta vrátí `@removed` pro zprávu, skript ji **nemaže** z Mongo. Pouze nastaví:
```json
{ "permanently_deleted": true, "permanently_deleted_at": "2026-06-04T10:00:00Z" }
```
Dohledatelné: `col.find({"permanently_deleted": true})`.
**`@removed` přijde jen pro definitivně smazané** zprávy (uživatel vysypal koš / Shift+Del). Mail v `Deleted Items` je pořád normální zpráva, jen má `folder_path = "Deleted Items"`.
## Extrakce zprávy
Funkce `extract_message` a `extract_sync_fields` se načítají přímo z modulu `1_parse_emails_graph_v1.4.py` (přes `importlib`) — extrakční logika je jediná na celý projekt, nemůže se rozejít.
## Nové vs změněné — jak skript pozná
Pro každou položku z delta odpovědi:
1. **Má `@removed`?** → označit `permanently_deleted` v Mongo, hotovo.
2. **`graph_id` už je v Mongo?** → existující změna — pošle se jen `extract_sync_fields` (is_read, flag, folder, …) přes `$set`.
3. **`graph_id` v Mongo není?** → nová zpráva — udělá se druhý GET `/messages/{id}?$expand=attachments` (delta nepodporuje `$expand`), aby přišla těla, hlavičky i přílohy, a uloží se přes `extract_message` jako klasický nový dokument.
## Argumenty
| Argument | Povinný | Hodnoty | Default | Popis |
|---|---|---|---|---|
| `--mailbox` | **ne** | e-mail | (všechny) | Schránka = kolekce v Mongo. **Bez argumentu projede všechny** kolekce v `emaily` mimo `SKIP_MAILBOXES` a systémové (`attachments_index`, `sync_state`) |
| `--folder` | ne | substring | (všechny) | Filtr složek (např. `Inbox` zahrne i `Inbox/Archive`) |
| `--limit N` | ne | int | 0 (bez limitu) | Max položek na složku (test) |
| `--reset` | ne | flag | false | Smaže všechny `deltaLink`y pro vybrané schránky → další běh začne od fresh delta |
| `--dry-run` | ne | flag | false | Nic neuloží do Mongo, jen vypíše co by se stalo |
## SKIP_MAILBOXES (hardcoded ve skriptu)
| Schránka | Důvod |
|---|---|
| `vbuzalka@its.jnj.com` | JNJ tenant, nemáme Graph API přístup. Pro tuto schránku je nutný samostatný skript (lokální `.msg` parser nebo jiný zdroj). |
Při `--mailbox vbuzalka@its.jnj.com` skript skončí s exit kódem 2. Při běhu bez `--mailbox` se schránka tiše přeskočí s hlášením `[skip]`.
## Varianty volání
```bash
# VŠECHNY schránky najednou (mimo SKIP_MAILBOXES) — pro cron / pravidelný sync:
docker exec -it python-runner python /scripts/1b_parse_emails_graph_delta_v1.0.py
# Jedna schránka — první běh (fresh delta — projde všechno, uloží deltaLinky):
docker exec -it python-runner python /scripts/1b_parse_emails_graph_delta_v1.0.py --mailbox ordinace@buzalkova.cz
# Pravidelný sync jedné schránky (jen změny od minulého běhu):
docker exec -it python-runner python /scripts/1b_parse_emails_graph_delta_v1.0.py --mailbox ordinace@buzalkova.cz
# Dry-run — uvidíš co by se stalo, nic se neuloží:
docker exec -it python-runner python /scripts/1b_parse_emails_graph_delta_v1.0.py --mailbox ordinace@buzalkova.cz --dry-run
# Test jen na složce Inbox, max 20 položek:
docker exec -it python-runner python /scripts/1b_parse_emails_graph_delta_v1.0.py --mailbox ordinace@buzalkova.cz --folder Inbox --limit 20
# Reset — zahodí deltaLinky a najede znova od plné delta:
docker exec -it python-runner python /scripts/1b_parse_emails_graph_delta_v1.0.py --mailbox ordinace@buzalkova.cz --reset
# Cron / na pozadí (každých 5 min):
docker exec -d python-runner bash -c "python /scripts/1b_parse_emails_graph_delta_v1.0.py --mailbox ordinace@buzalkova.cz > /scripts/delta_sync.log 2>&1"
```
## Co dělat na začátek
1. **První import** schránky pořád přes `1_parse_emails_graph_v1.4.py` (existující data zůstanou).
2. **První běh** `1b_…delta_v1.0.py` — fresh delta projde znovu všechny zprávy a hlavně uloží `deltaLink`y do `sync_state`. To může chvíli trvat (podobně jako `--mode new-only` na v1.4).
3. **Další běhy** = už jen rychlé, vrací 0-X změn za interval.
## Otevřené body k otestování
- Jak rychle běží první (fresh) delta na velké schránce (`vladimir.buzalka@buzalka.cz` ~80k mailů)
- Co Graph vrátí pro nově vytvořené složky (mělo by fungovat — appendnou se do `folders` při dalším `get_all_folders`)
- Chování při `--limit` (drží se starý deltaLink → pristi beh dokonci zbytek)
## HTTP 410 — expirovaný deltaLink
DeltaLinky drží Graph cca 30 dní. Pokud nebudeš schránku syncovat měsíc, skript dostane 410, **smaže starý state** a sám zopakuje běh jako fresh delta. Žádný manuální zásah není potřeba.
## Závislosti
Stejné jako `1_parse_emails_graph_v1.4.py` (msal, requests, pymongo, dateutil) — žádné nové.
## Sledování průběhu
```bash
docker exec -it python-runner tail -f /scripts/delta_sync.log
docker exec -it python-runner tail -f /scripts/delta_errors.log
```
## Stav sync_state v Mongo
```python
# Přehled posledních synců:
db.sync_state.find().sort("last_run_at", -1)
# Zahodit deltaLinky pro jednu schránku (= efekt --reset):
db.sync_state.delete_many({"mailbox": "ordinace@buzalkova.cz"})
# Najít všechny permanentně smazané v jedné schránce:
db["ordinace@buzalkova.cz"].find({"permanently_deleted": true}, {"subject": 1, "permanently_deleted_at": 1})
```
@@ -0,0 +1,523 @@
"""
==============================================================================
Skript: 1b_parse_emails_graph_delta_v1.1.py
Verze: 1.1
Datum: 2026-06-10
Autor: vladimir.buzalka
Zmeny v1.1 (2026-06-10):
- Bugfix: NON_MAILBOX_COLLECTIONS rozsireno o "jnj_messages" a
"jnj_sync_state" (pomocne kolekce JNJ folder trackingu). Predtim je
discover_mailboxes bral jako schranky -> Graph 404 na
/users/jnj_messages/mailFolders -> cely krok 1b FAIL(1) pri kazdem behu.
Popis:
Inkrementalni sync emailu pres Microsoft Graph DELTA QUERY.
Sourozenec `1_parse_emails_graph_v1.4.py` — kazdy resi jiny use case:
1_parse_emails_graph_v1.4.py = prvni plny import schranky
1b_parse_emails_graph_delta_v1.1.py = pravidelny sync (zmeny od minula)
Delta query je server-side change tracking — Graph si pamatuje "zalozku"
(deltaLink) a vraci jen to, co se od ni zmenilo:
- nove zpravy
- zmeny existujicich (isRead, flag, presun do jine slozky, kategorie)
- SMAZANE zpravy (@removed) — definitivne smazane, nikoli v kosi
Pro mail v "Deleted Items" delta nic specialniho nedela — je to porad
normalni zprava, jen s folder_path="Deleted Items". @removed prijde az
kdyz uzivatel vysype kos / Shift+Del.
State:
Kolekce `emaily.sync_state`, _id = "<mailbox>|<folder_id>".
{
mailbox, folder_id, folder_path,
delta_link, # plny URL s $deltatoken na pristi beh
last_run_at,
cumulative_new, cumulative_sync, cumulative_removed
}
Permanentne smazane zpravy:
Skript je NEMAZE z Mongo. Pouze nastavi:
permanently_deleted: True
permanently_deleted_at: <UTC datetime detekce>
Dohledani: col.find({"permanently_deleted": True})
Reuse:
Funkce extract_message / extract_sync_fields se nactou primo z modulu
1_parse_emails_graph_v1.4.py (importlib, file-based), aby se logika
extrahce nikdy nerozesla.
Spousteni:
python 1b_parse_emails_graph_delta_v1.1.py # VSECHNY schranky (mimo SKIP_MAILBOXES)
python 1b_parse_emails_graph_delta_v1.1.py --mailbox ordinace@buzalkova.cz # jedna schranka
python 1b_parse_emails_graph_delta_v1.1.py --mailbox ordinace@buzalkova.cz --folder Inbox
python 1b_parse_emails_graph_delta_v1.1.py --reset # zahodit deltaLinky a najet znova
python 1b_parse_emails_graph_delta_v1.1.py --dry-run # nic neulozit
SKIP_MAILBOXES (hardcoded):
vbuzalka@its.jnj.com — JNJ tenant, nemame Graph API pristup. Pro tuto
schranku je nutny samostatny skript (lokalni .msg).
Zavislosti:
msal, requests, pymongo, python-dateutil
Python 3.10+
==============================================================================
"""
from __future__ import annotations
import argparse
import importlib.util
import logging
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import msal
import requests
from pymongo import MongoClient, ASCENDING
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
# ─── KONFIGURACE ──────────────────────────────────────────────────────────────
GRAPH_TENANT_ID = "7d269944-37a4-43a1-8140-c7517dc426e9"
GRAPH_CLIENT_ID = "4b222bfd-78c9-4239-a53f-43006b3ed07f"
GRAPH_CLIENT_SECRET = "Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk"
GRAPH_URL = "https://graph.microsoft.com/v1.0"
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "emaily"
SYNC_STATE_COL = "sync_state"
PAGE_SIZE = 100 # delta endpoint typicky vraci max 100/stranka
LOG_FILE = Path(__file__).parent / "delta_errors.log"
SCRIPT_VERSION = "1.1"
# Kolekce v `emaily` ktere NEJSOU mailboxy:
# (jnj_messages + jnj_sync_state = pomocne kolekce JNJ folder trackingu,
# bez exclude je discover_mailboxes bere jako schranky -> Graph 404 -> FAIL)
NON_MAILBOX_COLLECTIONS = {"attachments_index", "sync_state",
"jnj_messages", "jnj_sync_state"}
# Schranky, kde NEMAME Graph API pristup — pri bezneho behu se preskoci.
# Pro tyto je nutny separatni skript (napr. lokalni .msg parser).
SKIP_MAILBOXES = {
"vbuzalka@its.jnj.com", # JNJ tenant — nemame Graph credentials
}
logging.basicConfig(
filename=str(LOG_FILE),
level=logging.ERROR,
format="%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
encoding="utf-8",
)
# Co tahnout z delta endpointu (stejne jako MSG_SELECT v v1.4, mimo internetMessageHeaders
# ktere delta neumi vratit pro vsechny polozky — pro nove zpravy si je dotahneme
# samostatnym fetchem).
DELTA_SELECT = (
"id,internetMessageId,subject,bodyPreview,body,"
"importance,isRead,isDraft,hasAttachments,"
"receivedDateTime,sentDateTime,createdDateTime,lastModifiedDateTime,"
"sender,from,toRecipients,ccRecipients,bccRecipients,replyTo,"
"conversationId,conversationIndex,parentFolderId,"
"categories,flag,inferenceClassification"
)
# Pro plne nacteni nove zpravy (vcetne hlavicek + priloh) pouzijeme stejny
# select+expand jako v1.4
FULL_FETCH_SELECT = (
"id,internetMessageId,subject,bodyPreview,body,"
"importance,isRead,isDraft,hasAttachments,"
"receivedDateTime,sentDateTime,createdDateTime,lastModifiedDateTime,"
"sender,from,toRecipients,ccRecipients,bccRecipients,replyTo,"
"conversationId,conversationIndex,parentFolderId,"
"categories,flag,inferenceClassification,internetMessageHeaders"
)
FULL_FETCH_EXPAND = "attachments($select=id,name,contentType,size,isInline)"
# ─── Reuse extract logiky z v1.4 ──────────────────────────────────────────────
_HERE = Path(__file__).parent
_V14_PATH = _HERE / "1_parse_emails_graph_v1.4.py"
if not _V14_PATH.exists():
print(f"CHYBA: chybi sourozenec {_V14_PATH.name} — extract logiku nelze nacist", file=sys.stderr)
sys.exit(1)
_spec = importlib.util.spec_from_file_location("v14_parse", _V14_PATH)
_v14 = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(_v14)
extract_message = _v14.extract_message
extract_sync_fields = _v14.extract_sync_fields
# GRAPH_MAILBOX modul-level v v1.4 — pro extract neni potreba, ale pro
# konzistenci nastavujeme ho v main()
# ─── Graph API ────────────────────────────────────────────────────────────────
_graph_token: Optional[str] = None
def get_token() -> str:
global _graph_token
app = msal.ConfidentialClientApplication(
GRAPH_CLIENT_ID,
authority=f"https://login.microsoftonline.com/{GRAPH_TENANT_ID}",
client_credential=GRAPH_CLIENT_SECRET,
)
result = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])
if "access_token" not in result:
raise RuntimeError(f"Graph auth failed: {result}")
_graph_token = result["access_token"]
return _graph_token
class DeltaExpired(Exception):
"""deltaLink expiroval (HTTP 410) — je nutne zacit od plne delta znovu."""
def graph_get(url: str, params: dict = None, allow_410: bool = False) -> dict:
"""GET na Graph s retry pri 401. Pri 410 a allow_410=True vyhodi DeltaExpired."""
global _graph_token
if not _graph_token:
get_token()
for attempt in range(3):
r = requests.get(
url,
headers={"Authorization": f"Bearer {_graph_token}"},
params=params,
timeout=60,
)
if r.status_code == 401:
get_token()
continue
if r.status_code == 410 and allow_410:
raise DeltaExpired(url)
if r.status_code == 429:
# rate limit — respect Retry-After
wait = int(r.headers.get("Retry-After", "5"))
print(f" [429] cekam {wait}s ...")
time.sleep(wait)
continue
r.raise_for_status()
return r.json()
raise RuntimeError(f"Graph GET failed after retries: {url}")
def get_all_folders(mailbox: str, parent_id: str = None, parent_path: str = "") -> list[dict]:
if parent_id is None:
url = f"{GRAPH_URL}/users/{mailbox}/mailFolders"
else:
url = f"{GRAPH_URL}/users/{mailbox}/mailFolders/{parent_id}/childFolders"
folders = []
params = {"$top": 100, "$select": "id,displayName,childFolderCount"}
while url:
data = graph_get(url, params)
for f in data.get("value", []):
path = f"{parent_path}/{f['displayName']}".lstrip("/")
folders.append({"id": f["id"], "path": path})
if f.get("childFolderCount", 0) > 0:
folders.extend(get_all_folders(mailbox, f["id"], path))
url = data.get("@odata.nextLink")
params = None
return folders
def fetch_full_message(mailbox: str, msg_id: str) -> Optional[dict]:
"""Stahne celou zpravu vcetne hlavicek a priloh — pro nove zpravy zachycene v delte."""
url = f"{GRAPH_URL}/users/{mailbox}/messages/{msg_id}"
params = {"$select": FULL_FETCH_SELECT, "$expand": FULL_FETCH_EXPAND}
try:
return graph_get(url, params)
except requests.HTTPError as e:
logging.error("fetch_full_message %s: %s", msg_id, e)
return None
# ─── Delta iterace ────────────────────────────────────────────────────────────
def iter_folder_delta(mailbox: str, folder_id: str, delta_link: Optional[str], limit: int = 0):
"""
Generator: vraci (item, final_delta_link).
item je dict s polozkou (bud zmena nebo {'@removed': ...}).
Posledni vyhozeny tuple ma final_delta_link != None (zbytek None).
Pri HTTP 410 (expirovany deltaLink) vyhodi DeltaExpired — caller ma
pustit znova s delta_link=None (= fresh full delta).
"""
if delta_link:
url = delta_link
params = None
else:
url = f"{GRAPH_URL}/users/{mailbox}/mailFolders/{folder_id}/messages/delta"
params = {"$select": DELTA_SELECT, "$top": PAGE_SIZE}
n = 0
while url:
data = graph_get(url, params, allow_410=True)
params = None
for item in data.get("value", []):
yield item, None
n += 1
if limit and n >= limit:
# ulozime aspon stavajici nextLink jako "delta" — neni to ciste,
# ale pri --limit jde o test, takze pristi beh proste pocnize znovu
return
next_link = data.get("@odata.nextLink")
final_link = data.get("@odata.deltaLink")
if final_link:
# konec — predame final delta
yield None, final_link
return
url = next_link
# ─── Per-folder sync ──────────────────────────────────────────────────────────
def sync_folder(col, sync_col, mailbox: str, folder: dict, dry_run: bool, limit: int) -> dict:
"""Vrati statistiky."""
fid = folder["id"]
fpath = folder["path"]
state_id = f"{mailbox}|{fid}"
state = sync_col.find_one({"_id": state_id})
delta_link = state.get("delta_link") if state else None
is_first_run = delta_link is None
label = "FRESH" if is_first_run else "DELTA"
print(f"\n[{label}] {fpath}")
stats = {"new": 0, "sync": 0, "removed": 0, "errors": 0}
final_delta = None
try:
gen = iter_folder_delta(mailbox, fid, delta_link, limit=limit)
for item, fin in gen:
if fin:
final_delta = fin
break
try:
process_item(col, mailbox, fpath, item, stats, dry_run)
except Exception as e:
stats["errors"] += 1
logging.error("process_item %s: %s", item.get("id", "?"), e)
except DeltaExpired:
print(f" [410] deltaLink expiroval — restart od fresh delta")
# rekurzivni restart s vymazanym statem
sync_col.delete_one({"_id": state_id})
return sync_folder(col, sync_col, mailbox, folder, dry_run, limit)
print(f" new={stats['new']} sync={stats['sync']} removed={stats['removed']} err={stats['errors']}")
# Ulozit sync_state pokud mame final_delta a neni dry run
if final_delta and not dry_run:
sync_col.update_one(
{"_id": state_id},
{
"$set": {
"mailbox": mailbox,
"folder_id": fid,
"folder_path": fpath,
"delta_link": final_delta,
"last_run_at": datetime.now(timezone.utc).replace(tzinfo=None),
},
"$inc": {
"cumulative_new": stats["new"],
"cumulative_sync": stats["sync"],
"cumulative_removed": stats["removed"],
"run_count": 1,
},
},
upsert=True,
)
elif not final_delta:
# neprisel deltaLink (napr. limit nebo chyba) — nemenime state, pristi beh
# bude pokracovat normalne podle stareho deltaLinku nebo zacne od fresh
if not is_first_run:
print(f" [pozn] delta neukoncena — pristi beh pojede od ulozeneho deltaLinku")
return stats
def process_item(col, mailbox: str, folder_path: str, item: dict, stats: dict, dry_run: bool):
"""Zpracuje jednu polozku z delta odpovedi."""
# 1) Smazana zprava (@removed)
if "@removed" in item or item.get("@removed.reason"):
graph_id = item.get("id")
if not graph_id:
return
if dry_run:
print(f" REMOVED graph_id={graph_id[:30]}...")
else:
col.update_one(
{"graph_id": graph_id},
{"$set": {
"permanently_deleted": True,
"permanently_deleted_at": datetime.now(timezone.utc).replace(tzinfo=None),
}},
)
stats["removed"] += 1
return
# 2) Nova nebo zmenena zprava — rozhodneme podle existence graph_id v Mongo
graph_id = item.get("id")
if not graph_id:
return
existing = col.find_one({"graph_id": graph_id}, {"_id": 1})
if existing:
# Existujici zprava — update jen sync poli (delta payload je obsahuje)
fields = extract_sync_fields(item, folder_path)
if dry_run:
print(f" SYNC {item.get('subject','')[:60]}")
else:
col.update_one({"_id": existing["_id"]}, {"$set": fields})
stats["sync"] += 1
else:
# Nova zprava — pro telo+attachments+headers fetchneme plnou verzi
full = fetch_full_message(mailbox, graph_id)
if full is None:
stats["errors"] += 1
return
doc = extract_message(full, folder_path)
if doc is None:
stats["errors"] += 1
return
if dry_run:
print(f" NEW {doc.get('subject','')[:60]}")
else:
col.update_one({"_id": doc["_id"]}, {"$set": doc}, upsert=True)
stats["new"] += 1
# ─── Indexy pro sync_state ────────────────────────────────────────────────────
def ensure_sync_state_indexes(sync_col):
sync_col.create_index([("mailbox", ASCENDING), ("folder_id", ASCENDING)])
sync_col.create_index([("last_run_at", ASCENDING)])
def ensure_perm_deleted_index(col):
col.create_index([("permanently_deleted", ASCENDING)], sparse=True)
# ─── Main ─────────────────────────────────────────────────────────────────────
def discover_mailboxes(db) -> list[str]:
"""Vrati seznam mailboxu = vsechny kolekce v `emaily` mimo NON_MAILBOX_COLLECTIONS
a SKIP_MAILBOXES."""
out = []
for name in sorted(db.list_collection_names()):
if name in NON_MAILBOX_COLLECTIONS:
continue
if name in SKIP_MAILBOXES:
print(f" [skip] {name} — v SKIP_MAILBOXES (neni Graph pristup)")
continue
out.append(name)
return out
def sync_mailbox(client, mailbox: str, args) -> dict:
"""Sync jedne schranky. Vraci totals dict."""
_v14.GRAPH_MAILBOX = mailbox
print(f"\n========== {mailbox} ==========")
col = client[MONGO_DB][mailbox]
sync_col = client[MONGO_DB][SYNC_STATE_COL]
if not args.dry_run:
ensure_sync_state_indexes(sync_col)
ensure_perm_deleted_index(col)
if args.reset:
n = sync_col.delete_many({"mailbox": mailbox}).deleted_count
print(f" --reset: smazano {n} deltaLinku pro {mailbox}")
print("Nacitam seznam slozek...")
try:
folders = get_all_folders(mailbox)
except requests.HTTPError as e:
print(f" CHYBA: nelze nacist slozky pro {mailbox}: {e}")
logging.error("get_all_folders %s: %s", mailbox, e)
return {"new": 0, "sync": 0, "removed": 0, "errors": 1}
if args.folder:
folders = [f for f in folders if args.folder.lower() in f["path"].lower()]
print(f" Slozek ke zpracovani: {len(folders)}")
totals = {"new": 0, "sync": 0, "removed": 0, "errors": 0}
for folder in folders:
s = sync_folder(col, sync_col, mailbox, folder, args.dry_run, args.limit)
for k in totals:
totals[k] += s[k]
print(f" -> mailbox total: new={totals['new']} sync={totals['sync']} removed={totals['removed']} err={totals['errors']}")
return totals
def main():
ap = argparse.ArgumentParser(description=f"parse_emails_graph delta sync v{SCRIPT_VERSION}")
ap.add_argument("--mailbox", default="",
help="E-mail schranky (= kolekce v Mongo). "
"Bez argumentu projede vsechny schranky z `emaily` (mimo SKIP_MAILBOXES).")
ap.add_argument("--folder", default="", help="Filtruje slozky obsahujici tento retezec (default: vsechny)")
ap.add_argument("--limit", type=int, default=0, help="Max polozek na slozku (test)")
ap.add_argument("--reset", action="store_true",
help="Smaze deltaLinky pro vybrane schranky — pristi beh zacne od fresh delta")
ap.add_argument("--dry-run", action="store_true", help="Nic neulozi do Mongo, jen vypise co by se stalo")
args = ap.parse_args()
print(f"=== Delta sync v{SCRIPT_VERSION} ===")
if args.dry_run:
print(" DRY-RUN — zadne zmeny v Mongo")
print("Pripojuji se k MongoDB...")
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
client.admin.command("ping")
db = client[MONGO_DB]
if args.mailbox:
if args.mailbox in SKIP_MAILBOXES:
print(f" CHYBA: {args.mailbox} je v SKIP_MAILBOXES — neni Graph pristup.")
sys.exit(2)
mailboxes = [args.mailbox]
else:
mailboxes = discover_mailboxes(db)
print(f" Schranky ke zpracovani: {len(mailboxes)}")
for m in mailboxes:
print(f" {m}")
print("Token Graph API...")
get_token()
print(" OK")
t0 = time.time()
grand = {"new": 0, "sync": 0, "removed": 0, "errors": 0}
per_mailbox = []
for mb in mailboxes:
try:
s = sync_mailbox(client, mb, args)
except Exception as e:
print(f" FATAL pri sync {mb}: {e}")
logging.error("sync_mailbox %s: %s", mb, e)
s = {"new": 0, "sync": 0, "removed": 0, "errors": 1}
per_mailbox.append((mb, s))
for k in grand:
grand[k] += s[k]
dt = time.time() - t0
print(f"\n=== SHRNUTI ===")
for mb, s in per_mailbox:
print(f" {mb:40} new={s['new']:>5} sync={s['sync']:>5} removed={s['removed']:>4} err={s['errors']:>3}")
print(f" {'TOTAL':40} new={grand['new']:>5} sync={grand['sync']:>5} removed={grand['removed']:>4} err={grand['errors']:>3}")
print(f" trvalo: {dt:.1f} s")
return 1 if grand["errors"] > 0 else 0
if __name__ == "__main__":
sys.exit(main() or 0)
+48 -11
View File
@@ -69,6 +69,13 @@ Historie verzi:
1.5 2026-06-13 Nova priloha se zaroven nahrava do SeaweedFS (Tower1) pres
sdileny seaweed_store.py; index dostane seaweed_path/url/synced_at.
Vypadek SeaweedFS pipeline neshodi (fallback = backfill skript).
1.5.1 2026-06-14 Pojistka proti emailum s prazdnym graph_id (legacy/mailstore
importy jeste neoznacene jako source=mailstore). Drive se z nich
sestavila URL .../messages//attachments -> 400 a kazda priloha
se zapocitala jako chyba -> cely pipeline report padal na FAIL
(priklad: 2026-06-13, 22 666 chyb). Nyni se takove prilohy
trvale oznaci attachment_no_graph_id (jako missing/reference),
preskoci se a NEpocitaji jako chyba.
"""
import sys
@@ -105,7 +112,7 @@ MONGO_COL_INDEX = "attachments_index"
EMAILS_BASE_DIR = Path("/mnt/Emails")
LOG_FILE = Path(__file__).parent / "parse_emails_errors.log"
SCRIPT_VERSION = "1.5"
SCRIPT_VERSION = "1.5.1"
BATCH_SIZE = 50
# Typy příloh které přeskočíme (S/MIME podpisy, certifikáty)
@@ -448,6 +455,7 @@ def process_mailbox(client, mailbox: str, args) -> dict:
"file_hash": {"$exists": False},
"attachment_missing": {"$ne": True},
"attachment_reference": {"$ne": True},
"attachment_no_graph_id": {"$ne": True},
}
}
}
@@ -457,7 +465,7 @@ def process_mailbox(client, mailbox: str, args) -> dict:
if total == 0:
print(" Neni co stahnout.")
return {"mailbox": mailbox, "ok": 0, "new": 0, "dup": 0, "skip": 0,
"miss": 0, "ref": 0, "err": 0, "elapsed": 0.0}
"miss": 0, "ref": 0, "nogid": 0, "err": 0, "elapsed": 0.0}
cursor = col_emails.find(query, {"_id": 1, "graph_id": 1, "subject": 1, "attachments": 1})
if args.limit:
@@ -469,6 +477,7 @@ def process_mailbox(client, mailbox: str, args) -> dict:
skip_count = 0
miss_count = 0
ref_count = 0
nogid_count = 0
err_count = 0
email_i = 0
batch = []
@@ -492,10 +501,36 @@ def process_mailbox(client, mailbox: str, args) -> dict:
real_atts = [a for a in att_list if not a.get("is_inline", False)
and not a.get("attachment_missing")
and not a.get("attachment_reference")]
and not a.get("attachment_reference")
and not a.get("attachment_no_graph_id")]
if not real_atts:
continue
# Email bez graph_id nelze stahnout z Graphu (legacy/mailstore import,
# jeste neoznaceny jako source=mailstore). Bez teto pojistky se sestavi
# URL .../messages//attachments -> 400 Bad Request a KAZDA priloha se
# zapocita jako chyba -> cely pipeline report spadne na FAIL.
# Oznacime prilohy attachment_no_graph_id (permanentni, jako
# missing/reference), aby se v dalsich bezich preskocily a NEpocitaly
# jako chyba.
if not graph_id:
now_utc = datetime.now(timezone.utc).replace(tzinfo=None)
marked = list(att_list)
for i, a in enumerate(marked):
if (a.get("is_inline", False) or a.get("file_hash")
or a.get("attachment_missing") or a.get("attachment_reference")
or a.get("attachment_no_graph_id")):
continue
marked[i] = {**a, "attachment_no_graph_id": True,
"attachment_no_graph_id_at": now_utc}
nogid_count += len(real_atts)
batch.append(UpdateOne({"_id": email_id}, {"$set": {"attachments": marked}}))
if len(batch) >= BATCH_SIZE:
flush()
print(f"\n {email_i:>5}/{total} NOGID {subject} "
f"({len(real_atts)} priloh bez graph_id — oznaceno)")
continue
print(f"\n {email_i:>5}/{total} {subject}")
need_listing = any(
@@ -517,6 +552,8 @@ def process_mailbox(client, mailbox: str, args) -> dict:
continue
if att.get("attachment_missing") or att.get("attachment_reference"):
continue
if att.get("attachment_no_graph_id"):
continue
if not args.force_recheck and att.get("file_hash"):
continue
@@ -628,17 +665,17 @@ def process_mailbox(client, mailbox: str, args) -> dict:
elapsed = (datetime.now() - start).total_seconds()
print(f" {''*60}")
print(f" Průběh: emaily={email_i}/{total} nove={new_count} dup={dup_count} "
f"skip={skip_count} miss={miss_count} ref={ref_count} err={err_count}")
f"skip={skip_count} miss={miss_count} ref={ref_count} nogid={nogid_count} err={err_count}")
print(f" {''*60}")
flush()
elapsed = (datetime.now() - start).total_seconds()
print(f" -> mailbox total: emaily={ok_count} nove={new_count} dup={dup_count} "
f"skip={skip_count} miss={miss_count} ref={ref_count} err={err_count} ({elapsed:.1f} s)")
f"skip={skip_count} miss={miss_count} ref={ref_count} nogid={nogid_count} err={err_count} ({elapsed:.1f} s)")
return {"mailbox": mailbox, "ok": ok_count, "new": new_count, "dup": dup_count,
"skip": skip_count, "miss": miss_count, "ref": ref_count, "err": err_count,
"elapsed": elapsed}
"skip": skip_count, "miss": miss_count, "ref": ref_count, "nogid": nogid_count,
"err": err_count, "elapsed": elapsed}
def discover_mailboxes(db) -> list[str]:
@@ -711,24 +748,24 @@ def main():
logging.error("process_mailbox %s: %s", mb, e)
print(f" FATAL pri zpracovani {mb}: {e}")
results.append({"mailbox": mb, "ok": 0, "new": 0, "dup": 0,
"skip": 0, "miss": 0, "ref": 0, "err": 1, "elapsed": 0.0})
"skip": 0, "miss": 0, "ref": 0, "nogid": 0, "err": 1, "elapsed": 0.0})
elapsed_total = (datetime.now() - start_all).total_seconds()
files_total = col_index.count_documents({})
size_total = sum(d.get("size_bytes", 0) for d in col_index.find({}, {"size_bytes": 1}))
grand = {k: sum(r.get(k, 0) for r in results)
for k in ("ok", "new", "dup", "skip", "miss", "ref", "err")}
for k in ("ok", "new", "dup", "skip", "miss", "ref", "nogid", "err")}
print(f"\n{'='*60}")
print("=== SHRNUTI ===")
for r in results:
print(f" {r['mailbox']:40} ok={r['ok']:>5} nove={r['new']:>4} "
f"dup={r['dup']:>4} skip={r['skip']:>3} miss={r.get('miss',0):>3} "
f"ref={r.get('ref',0):>3} err={r['err']:>3}")
f"ref={r.get('ref',0):>3} nogid={r.get('nogid',0):>4} err={r['err']:>3}")
print(f" {'TOTAL':40} ok={grand['ok']:>5} nove={grand['new']:>4} "
f"dup={grand['dup']:>4} skip={grand['skip']:>3} miss={grand['miss']:>3} "
f"ref={grand['ref']:>3} err={grand['err']:>3}")
f"ref={grand['ref']:>3} nogid={grand['nogid']:>4} err={grand['err']:>3}")
print(f"Souboru v indexu: {files_total} ({size_total / 1024 / 1024:.1f} MB)")
print(f"Celkovy cas: {int(elapsed_total//3600)}h {int((elapsed_total%3600)//60)}m {int(elapsed_total%60)}s")
print(f"\nKonec: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
+53
View File
@@ -0,0 +1,53 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Orientacni srovnani: online (Graph totalItemCount) vs Mongo po slozkach."""
import sys
import msal, requests
from pymongo import MongoClient
TENANT="7d269944-37a4-43a1-8140-c7517dc426e9"
CLIENT="4b222bfd-78c9-4239-a53f-43006b3ed07f"
SECRET="Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk"
GRAPH="https://graph.microsoft.com/v1.0"
MBOX = sys.argv[1] if len(sys.argv)>1 else "michaela.buzalkova@buzalka.cz"
app=msal.ConfidentialClientApplication(CLIENT,authority=f"https://login.microsoftonline.com/{TENANT}",client_credential=SECRET)
tok=app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])["access_token"]
H={"Authorization":f"Bearer {tok}"}
def folders(parent=None,ppath=""):
url=f"{GRAPH}/users/{MBOX}/mailFolders" if parent is None else f"{GRAPH}/users/{MBOX}/mailFolders/{parent}/childFolders"
params={"$top":100,"$select":"id,displayName,totalItemCount,childFolderCount"}
out=[]
while url:
d=requests.get(url,headers=H,params=params,timeout=60).json()
for f in d.get("value",[]):
path=f"{ppath}/{f['displayName']}".lstrip("/")
out.append((path,f.get("totalItemCount",0)))
if f.get("childFolderCount",0)>0:
out+=folders(f["id"],path)
url=d.get("@odata.nextLink"); params=None
return out
online=folders()
online_d={p:c for p,c in online}
cli=MongoClient("mongodb://192.168.1.76:27017")
col=cli["emaily"][MBOX]
mongo_d={}
for r in col.aggregate([{"$group":{"_id":"$folder_path","n":{"$sum":1}}}]):
mongo_d[r["_id"]]=r["n"]
deleted=col.count_documents({"permanently_deleted":True})
allpaths=sorted(set(online_d)|set(mongo_d))
print(f"{'SLOZKA':40} {'ONLINE':>8} {'MONGO':>8} {'ROZDIL':>8}")
print("-"*68)
to=tm=0
for p in allpaths:
o=online_d.get(p); m=mongo_d.get(p,0)
to+=(o or 0); tm+=m
print(f"{(p or '(none)')[:40]:40} {('-' if o is None else o):>8} {m:>8} {('' if o is None else o-m):>8}")
print("-"*68)
print(f"{'CELKEM':40} {to:>8} {tm:>8} {to-tm:>8}")
print(f"\nMongo total docs: {col.count_documents({})} | z toho permanently_deleted: {deleted}")
print(f"Online folders: {len(online_d)} | Mongo folders: {len(mongo_d)}")
+59
View File
@@ -0,0 +1,59 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Ktere zpravy v dane slozce chybi v Mongo (online vs archiv) — charakteristika."""
import sys, collections
import msal, requests
from pymongo import MongoClient
TENANT="7d269944-37a4-43a1-8140-c7517dc426e9"
CLIENT="4b222bfd-78c9-4239-a53f-43006b3ed07f"
SECRET="Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk"
GRAPH="https://graph.microsoft.com/v1.0"
MBOX="michaela.buzalkova@buzalka.cz"
TARGETS=["Inbox","Odloženo","Deleted Items","Sent Items"] # displayName
app=msal.ConfidentialClientApplication(CLIENT,authority=f"https://login.microsoftonline.com/{TENANT}",client_credential=SECRET)
tok=app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])["access_token"]
H={"Authorization":f"Bearer {tok}"}
def allfolders(parent=None,ppath=""):
url=f"{GRAPH}/users/{MBOX}/mailFolders" if parent is None else f"{GRAPH}/users/{MBOX}/mailFolders/{parent}/childFolders"
params={"$top":100,"$select":"id,displayName,childFolderCount"}; out=[]
while url:
d=requests.get(url,headers=H,params=params,timeout=60).json()
for f in d.get("value",[]):
path=f"{ppath}/{f['displayName']}".lstrip("/")
out.append((path,f["displayName"],f["id"]))
if f.get("childFolderCount",0)>0: out+=allfolders(f["id"],path)
url=d.get("@odata.nextLink"); params=None
return out
folders=allfolders()
cli=MongoClient("mongodb://192.168.1.76:27017"); col=cli["emaily"][MBOX]
for path,dname,fid in folders:
if dname not in TARGETS: continue
# online IDs
url=f"{GRAPH}/users/{MBOX}/mailFolders/{fid}/messages"
params={"$select":"internetMessageId,receivedDateTime,sentDateTime,from,subject","$top":100}
online={}
while url:
d=requests.get(url,headers=H,params=params,timeout=60).json()
for m in d.get("value",[]):
mid=m.get("internetMessageId")
if mid: online[mid]=m
url=d.get("@odata.nextLink"); params=None
have={x["_id"] for x in col.find({"folder_path":path},{"_id":1})}
missing=[m for mid,m in online.items() if mid not in have]
print(f"\n=== {path} (online {len(online)}, mongo {len(have)}, chybi {len(missing)}) ===")
if not missing: continue
dts=sorted([(m.get("receivedDateTime") or m.get("sentDateTime") or "")[:10] for m in missing])
print(f" datumove rozpeti chybejicich: {dts[0]} .. {dts[-1]}")
by=collections.Counter(((m.get('from') or {}).get('emailAddress') or {}).get('address','?') for m in missing)
print(" top odesilatele chybejicich:")
for a,n in by.most_common(8): print(f" {n:>4} {a}")
print(" ukazka 5 nejnovejsich:")
for m in sorted(missing,key=lambda x:(x.get('receivedDateTime') or ''),reverse=True)[:5]:
d=(m.get('receivedDateTime') or m.get('sentDateTime') or '')[:16]
s=(m.get('subject') or '')[:55]
print(f" {d} {s}")