This commit is contained in:
2026-06-10 11:59:19 +02:00
parent a41f97b86b
commit 7b2f69ad85
275 changed files with 16726 additions and 0 deletions
@@ -0,0 +1,83 @@
# jnj_tower_ingest v1.0.0
**Soubor:** `jnj_tower_ingest_v1.0.py`
**Datum:** 2026-06-10
**Autor:** vladimir.buzalka
**Běží:** Docker kontejner `python-runner` na Unraid Tower (192.168.1.76), u MongoDB.
## Co to je
Sjednocený **Tower-side ingest** JNJ e-mailů — spojuje dvě dříve oddělené poloviny
do jednoho běhu:
| Fáze | Dříve samostatně | Co dělá |
|---|---|---|
| **1. PARSE** | `parse_emails_tower_v1.3.py` | `.msg` z `/mnt/JNJEMAILS` → bohatý dokument v Mongo `emaily."vbuzalka@its.jnj.com"` (tělo, přílohy, hlavičky, MAPI props). `_id` = Internet Message-ID. |
| **2. SYNC** | `sync_jnj_state_v1.0.py` | nejnovější `/mnt/JNJEMAILS/db/jnjemails_*.db` (SQLite, **jen čtení** `mode=ro`) → zrcadlo do `jnj_messages` + doplnění `jnj_folder`/stavu do `emaily`. |
**Pořadí: parse BĚŽÍ PŘED sync.** Tím čerstvě naparsované maily dostanou cestu hned ve
stejném běhu (dřív: když sync předběhl parse, nový mail neměl co matchnout — sync
nezakládá stuby). Spojovací klíč všude = **Internet Message-ID = Mongo `_id`**.
## Inkrementálnost (vhodné pro cron každých 5 min)
- **PARSE** — parsuje jen `.msg` s `mtime` novějším než watermark
(`jnj_sync_state` / `_id="parse_state"``last_parse_mtime`).
- **První běh = seed:** watermark chybí → kandidáti = soubory, jejichž `filename`
ještě není v Mongu (jednorázový `distinct("filename")`); poté se watermark
nastaví na nejnovější soubor.
- **Další běhy = incremental:** jen `mtime > watermark`. Žádný sken Monga.
- `--full` reparsuje vše (upsert, idempotentní).
- **Indexy** se vytvářejí jen při `full`/`seed`/`--reindex` (v incremental už existují).
- **SYNC** — watermark `updated_at` (`jnj_sync_state` / `_id="watermark"`) + zkratka
`last_db` (stejná SQLite jako minule → okamžitý no-op, nesahá na Mongo data).
Dvě nezávislé události (nová `.msg` / nová `.db`) → skript udělá jen tu fázi, co má
práci; jinak levný no-op.
## Argumenty
| Argument | Význam |
|---|---|
| `--dry-run` | nic nezapíše, jen plán obou fází |
| `--full` | parse: reparsuj vše; sync: ignoruj watermark |
| `--limit N` | max N souborů (parse) / řádků (sync) — test |
| `--reindex` | vynutí indexy po parse fázi |
| `--force` | sync: ignoruj zkratku `last_db` |
| `--parse-only` | jen fáze PARSE |
| `--sync-only` | jen fáze SYNC |
## Spouštění
```bash
# Test:
docker exec -it python-runner python3 /scripts/jnj_tower_ingest_v1.0.py --dry-run
# Ostrý inkrementální běh (volá ho cron):
docker exec python-runner python3 /scripts/jnj_tower_ingest_v1.0.py
# Plný reparse + reindex:
docker exec -it python-runner python3 /scripts/jnj_tower_ingest_v1.0.py --full --reindex
```
## Plánování (HOTOVO)
Unraid User Scripts úloha `jnj_state_sync` (cron `*/5 * * * *`) — wrapper s `flock`
volá `docker exec python-runner python3 /scripts/jnj_tower_ingest_v1.0.py`.
Loguje jen reálnou práci/chyby do `/mnt/user/Scripts/logs/jnj_tower_ingest.log`
(grep `Zapisuji|PARSE hotovo|SYNC hotovo|CHYBA|Traceback`). Cron řádek/rozvrh se při
přepnutí ze `sync_jnj_state` neměnil — jen obsah wrapperu.
## Revert
Staré skripty `parse_emails_tower_v1.3.py` a `sync_jnj_state_v1.0.py` zůstávají v
`/scripts/` jako pojistka. Návrat = přepsat wrapper zpět na `sync_jnj_state_v1.0.py`.
## Závislosti
`extract-msg==0.55.0`, `olefile`, `pymongo`, `python-dateutil`, `sqlite3` (stdlib).
Python 3.10+.
## Historie verzí
- **1.0.0** 2026-06-10 — sjednocení `parse_emails_tower_v1.3` + `sync_jnj_state_v1.0`;
parse zinkrementálněn přes mtime watermark; indexy jen při full/seed/`--reindex`;
pořadí parse→sync.
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,514 @@
"""
==============================================================================
Skript: 1b_parse_emails_graph_delta_v1.0.py
Verze: 1.0
Datum: 2026-06-04
Autor: vladimir.buzalka
Popis:
Inkrementalni sync emailu pres Microsoft Graph DELTA QUERY.
Sourozenec `1_parse_emails_graph_v1.4.py` — kazdy resi jiny use case:
1_parse_emails_graph_v1.4.py = prvni plny import schranky
1b_parse_emails_graph_delta_v1.0.py = pravidelny sync (zmeny od minula)
Delta query je server-side change tracking — Graph si pamatuje "zalozku"
(deltaLink) a vraci jen to, co se od ni zmenilo:
- nove zpravy
- zmeny existujicich (isRead, flag, presun do jine slozky, kategorie)
- SMAZANE zpravy (@removed) — definitivne smazane, nikoli v kosi
Pro mail v "Deleted Items" delta nic specialniho nedela — je to porad
normalni zprava, jen s folder_path="Deleted Items". @removed prijde az
kdyz uzivatel vysype kos / Shift+Del.
State:
Kolekce `emaily.sync_state`, _id = "<mailbox>|<folder_id>".
{
mailbox, folder_id, folder_path,
delta_link, # plny URL s $deltatoken na pristi beh
last_run_at,
cumulative_new, cumulative_sync, cumulative_removed
}
Permanentne smazane zpravy:
Skript je NEMAZE z Mongo. Pouze nastavi:
permanently_deleted: True
permanently_deleted_at: <UTC datetime detekce>
Dohledani: col.find({"permanently_deleted": True})
Reuse:
Funkce extract_message / extract_sync_fields se nactou primo z modulu
1_parse_emails_graph_v1.4.py (importlib, file-based), aby se logika
extrahce nikdy nerozesla.
Spousteni:
python 1b_parse_emails_graph_delta_v1.0.py # VSECHNY schranky (mimo SKIP_MAILBOXES)
python 1b_parse_emails_graph_delta_v1.0.py --mailbox ordinace@buzalkova.cz # jedna schranka
python 1b_parse_emails_graph_delta_v1.0.py --mailbox ordinace@buzalkova.cz --folder Inbox
python 1b_parse_emails_graph_delta_v1.0.py --reset # zahodit deltaLinky a najet znova
python 1b_parse_emails_graph_delta_v1.0.py --dry-run # nic neulozit
SKIP_MAILBOXES (hardcoded):
vbuzalka@its.jnj.com — JNJ tenant, nemame Graph API pristup. Pro tuto
schranku je nutny samostatny skript (lokalni .msg).
Zavislosti:
msal, requests, pymongo, python-dateutil
Python 3.10+
==============================================================================
"""
from __future__ import annotations
import argparse
import importlib.util
import logging
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import msal
import requests
from pymongo import MongoClient, ASCENDING
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
# ─── KONFIGURACE ──────────────────────────────────────────────────────────────
GRAPH_TENANT_ID = "7d269944-37a4-43a1-8140-c7517dc426e9"
GRAPH_CLIENT_ID = "4b222bfd-78c9-4239-a53f-43006b3ed07f"
GRAPH_CLIENT_SECRET = "Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk"
GRAPH_URL = "https://graph.microsoft.com/v1.0"
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "emaily"
SYNC_STATE_COL = "sync_state"
PAGE_SIZE = 100 # delta endpoint typicky vraci max 100/stranka
LOG_FILE = Path(__file__).parent / "delta_errors.log"
SCRIPT_VERSION = "1.0"
# Kolekce v `emaily` ktere NEJSOU mailboxy:
NON_MAILBOX_COLLECTIONS = {"attachments_index", "sync_state"}
# Schranky, kde NEMAME Graph API pristup — pri bezneho behu se preskoci.
# Pro tyto je nutny separatni skript (napr. lokalni .msg parser).
SKIP_MAILBOXES = {
"vbuzalka@its.jnj.com", # JNJ tenant — nemame Graph credentials
}
logging.basicConfig(
filename=str(LOG_FILE),
level=logging.ERROR,
format="%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
encoding="utf-8",
)
# Co tahnout z delta endpointu (stejne jako MSG_SELECT v v1.4, mimo internetMessageHeaders
# ktere delta neumi vratit pro vsechny polozky — pro nove zpravy si je dotahneme
# samostatnym fetchem).
DELTA_SELECT = (
"id,internetMessageId,subject,bodyPreview,body,"
"importance,isRead,isDraft,hasAttachments,"
"receivedDateTime,sentDateTime,createdDateTime,lastModifiedDateTime,"
"sender,from,toRecipients,ccRecipients,bccRecipients,replyTo,"
"conversationId,conversationIndex,parentFolderId,"
"categories,flag,inferenceClassification"
)
# Pro plne nacteni nove zpravy (vcetne hlavicek + priloh) pouzijeme stejny
# select+expand jako v1.4
FULL_FETCH_SELECT = (
"id,internetMessageId,subject,bodyPreview,body,"
"importance,isRead,isDraft,hasAttachments,"
"receivedDateTime,sentDateTime,createdDateTime,lastModifiedDateTime,"
"sender,from,toRecipients,ccRecipients,bccRecipients,replyTo,"
"conversationId,conversationIndex,parentFolderId,"
"categories,flag,inferenceClassification,internetMessageHeaders"
)
FULL_FETCH_EXPAND = "attachments($select=id,name,contentType,size,isInline)"
# ─── Reuse extract logiky z v1.4 ──────────────────────────────────────────────
_HERE = Path(__file__).parent
_V14_PATH = _HERE / "1_parse_emails_graph_v1.4.py"
if not _V14_PATH.exists():
print(f"CHYBA: chybi sourozenec {_V14_PATH.name} — extract logiku nelze nacist", file=sys.stderr)
sys.exit(1)
_spec = importlib.util.spec_from_file_location("v14_parse", _V14_PATH)
_v14 = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(_v14)
extract_message = _v14.extract_message
extract_sync_fields = _v14.extract_sync_fields
# GRAPH_MAILBOX modul-level v v1.4 — pro extract neni potreba, ale pro
# konzistenci nastavujeme ho v main()
# ─── Graph API ────────────────────────────────────────────────────────────────
_graph_token: Optional[str] = None
def get_token() -> str:
global _graph_token
app = msal.ConfidentialClientApplication(
GRAPH_CLIENT_ID,
authority=f"https://login.microsoftonline.com/{GRAPH_TENANT_ID}",
client_credential=GRAPH_CLIENT_SECRET,
)
result = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])
if "access_token" not in result:
raise RuntimeError(f"Graph auth failed: {result}")
_graph_token = result["access_token"]
return _graph_token
class DeltaExpired(Exception):
"""deltaLink expiroval (HTTP 410) — je nutne zacit od plne delta znovu."""
def graph_get(url: str, params: dict = None, allow_410: bool = False) -> dict:
"""GET na Graph s retry pri 401. Pri 410 a allow_410=True vyhodi DeltaExpired."""
global _graph_token
if not _graph_token:
get_token()
for attempt in range(3):
r = requests.get(
url,
headers={"Authorization": f"Bearer {_graph_token}"},
params=params,
timeout=60,
)
if r.status_code == 401:
get_token()
continue
if r.status_code == 410 and allow_410:
raise DeltaExpired(url)
if r.status_code == 429:
# rate limit — respect Retry-After
wait = int(r.headers.get("Retry-After", "5"))
print(f" [429] cekam {wait}s ...")
time.sleep(wait)
continue
r.raise_for_status()
return r.json()
raise RuntimeError(f"Graph GET failed after retries: {url}")
def get_all_folders(mailbox: str, parent_id: str = None, parent_path: str = "") -> list[dict]:
if parent_id is None:
url = f"{GRAPH_URL}/users/{mailbox}/mailFolders"
else:
url = f"{GRAPH_URL}/users/{mailbox}/mailFolders/{parent_id}/childFolders"
folders = []
params = {"$top": 100, "$select": "id,displayName,childFolderCount"}
while url:
data = graph_get(url, params)
for f in data.get("value", []):
path = f"{parent_path}/{f['displayName']}".lstrip("/")
folders.append({"id": f["id"], "path": path})
if f.get("childFolderCount", 0) > 0:
folders.extend(get_all_folders(mailbox, f["id"], path))
url = data.get("@odata.nextLink")
params = None
return folders
def fetch_full_message(mailbox: str, msg_id: str) -> Optional[dict]:
"""Stahne celou zpravu vcetne hlavicek a priloh — pro nove zpravy zachycene v delte."""
url = f"{GRAPH_URL}/users/{mailbox}/messages/{msg_id}"
params = {"$select": FULL_FETCH_SELECT, "$expand": FULL_FETCH_EXPAND}
try:
return graph_get(url, params)
except requests.HTTPError as e:
logging.error("fetch_full_message %s: %s", msg_id, e)
return None
# ─── Delta iterace ────────────────────────────────────────────────────────────
def iter_folder_delta(mailbox: str, folder_id: str, delta_link: Optional[str], limit: int = 0):
"""
Generator: vraci (item, final_delta_link).
item je dict s polozkou (bud zmena nebo {'@removed': ...}).
Posledni vyhozeny tuple ma final_delta_link != None (zbytek None).
Pri HTTP 410 (expirovany deltaLink) vyhodi DeltaExpired — caller ma
pustit znova s delta_link=None (= fresh full delta).
"""
if delta_link:
url = delta_link
params = None
else:
url = f"{GRAPH_URL}/users/{mailbox}/mailFolders/{folder_id}/messages/delta"
params = {"$select": DELTA_SELECT, "$top": PAGE_SIZE}
n = 0
while url:
data = graph_get(url, params, allow_410=True)
params = None
for item in data.get("value", []):
yield item, None
n += 1
if limit and n >= limit:
# ulozime aspon stavajici nextLink jako "delta" — neni to ciste,
# ale pri --limit jde o test, takze pristi beh proste pocnize znovu
return
next_link = data.get("@odata.nextLink")
final_link = data.get("@odata.deltaLink")
if final_link:
# konec — predame final delta
yield None, final_link
return
url = next_link
# ─── Per-folder sync ──────────────────────────────────────────────────────────
def sync_folder(col, sync_col, mailbox: str, folder: dict, dry_run: bool, limit: int) -> dict:
"""Vrati statistiky."""
fid = folder["id"]
fpath = folder["path"]
state_id = f"{mailbox}|{fid}"
state = sync_col.find_one({"_id": state_id})
delta_link = state.get("delta_link") if state else None
is_first_run = delta_link is None
label = "FRESH" if is_first_run else "DELTA"
print(f"\n[{label}] {fpath}")
stats = {"new": 0, "sync": 0, "removed": 0, "errors": 0}
final_delta = None
try:
gen = iter_folder_delta(mailbox, fid, delta_link, limit=limit)
for item, fin in gen:
if fin:
final_delta = fin
break
try:
process_item(col, mailbox, fpath, item, stats, dry_run)
except Exception as e:
stats["errors"] += 1
logging.error("process_item %s: %s", item.get("id", "?"), e)
except DeltaExpired:
print(f" [410] deltaLink expiroval — restart od fresh delta")
# rekurzivni restart s vymazanym statem
sync_col.delete_one({"_id": state_id})
return sync_folder(col, sync_col, mailbox, folder, dry_run, limit)
print(f" new={stats['new']} sync={stats['sync']} removed={stats['removed']} err={stats['errors']}")
# Ulozit sync_state pokud mame final_delta a neni dry run
if final_delta and not dry_run:
sync_col.update_one(
{"_id": state_id},
{
"$set": {
"mailbox": mailbox,
"folder_id": fid,
"folder_path": fpath,
"delta_link": final_delta,
"last_run_at": datetime.now(timezone.utc).replace(tzinfo=None),
},
"$inc": {
"cumulative_new": stats["new"],
"cumulative_sync": stats["sync"],
"cumulative_removed": stats["removed"],
"run_count": 1,
},
},
upsert=True,
)
elif not final_delta:
# neprisel deltaLink (napr. limit nebo chyba) — nemenime state, pristi beh
# bude pokracovat normalne podle stareho deltaLinku nebo zacne od fresh
if not is_first_run:
print(f" [pozn] delta neukoncena — pristi beh pojede od ulozeneho deltaLinku")
return stats
def process_item(col, mailbox: str, folder_path: str, item: dict, stats: dict, dry_run: bool):
"""Zpracuje jednu polozku z delta odpovedi."""
# 1) Smazana zprava (@removed)
if "@removed" in item or item.get("@removed.reason"):
graph_id = item.get("id")
if not graph_id:
return
if dry_run:
print(f" REMOVED graph_id={graph_id[:30]}...")
else:
col.update_one(
{"graph_id": graph_id},
{"$set": {
"permanently_deleted": True,
"permanently_deleted_at": datetime.now(timezone.utc).replace(tzinfo=None),
}},
)
stats["removed"] += 1
return
# 2) Nova nebo zmenena zprava — rozhodneme podle existence graph_id v Mongo
graph_id = item.get("id")
if not graph_id:
return
existing = col.find_one({"graph_id": graph_id}, {"_id": 1})
if existing:
# Existujici zprava — update jen sync poli (delta payload je obsahuje)
fields = extract_sync_fields(item, folder_path)
if dry_run:
print(f" SYNC {item.get('subject','')[:60]}")
else:
col.update_one({"_id": existing["_id"]}, {"$set": fields})
stats["sync"] += 1
else:
# Nova zprava — pro telo+attachments+headers fetchneme plnou verzi
full = fetch_full_message(mailbox, graph_id)
if full is None:
stats["errors"] += 1
return
doc = extract_message(full, folder_path)
if doc is None:
stats["errors"] += 1
return
if dry_run:
print(f" NEW {doc.get('subject','')[:60]}")
else:
col.update_one({"_id": doc["_id"]}, {"$set": doc}, upsert=True)
stats["new"] += 1
# ─── Indexy pro sync_state ────────────────────────────────────────────────────
def ensure_sync_state_indexes(sync_col):
sync_col.create_index([("mailbox", ASCENDING), ("folder_id", ASCENDING)])
sync_col.create_index([("last_run_at", ASCENDING)])
def ensure_perm_deleted_index(col):
col.create_index([("permanently_deleted", ASCENDING)], sparse=True)
# ─── Main ─────────────────────────────────────────────────────────────────────
def discover_mailboxes(db) -> list[str]:
"""Vrati seznam mailboxu = vsechny kolekce v `emaily` mimo NON_MAILBOX_COLLECTIONS
a SKIP_MAILBOXES."""
out = []
for name in sorted(db.list_collection_names()):
if name in NON_MAILBOX_COLLECTIONS:
continue
if name in SKIP_MAILBOXES:
print(f" [skip] {name} — v SKIP_MAILBOXES (neni Graph pristup)")
continue
out.append(name)
return out
def sync_mailbox(client, mailbox: str, args) -> dict:
"""Sync jedne schranky. Vraci totals dict."""
_v14.GRAPH_MAILBOX = mailbox
print(f"\n========== {mailbox} ==========")
col = client[MONGO_DB][mailbox]
sync_col = client[MONGO_DB][SYNC_STATE_COL]
if not args.dry_run:
ensure_sync_state_indexes(sync_col)
ensure_perm_deleted_index(col)
if args.reset:
n = sync_col.delete_many({"mailbox": mailbox}).deleted_count
print(f" --reset: smazano {n} deltaLinku pro {mailbox}")
print("Nacitam seznam slozek...")
try:
folders = get_all_folders(mailbox)
except requests.HTTPError as e:
print(f" CHYBA: nelze nacist slozky pro {mailbox}: {e}")
logging.error("get_all_folders %s: %s", mailbox, e)
return {"new": 0, "sync": 0, "removed": 0, "errors": 1}
if args.folder:
folders = [f for f in folders if args.folder.lower() in f["path"].lower()]
print(f" Slozek ke zpracovani: {len(folders)}")
totals = {"new": 0, "sync": 0, "removed": 0, "errors": 0}
for folder in folders:
s = sync_folder(col, sync_col, mailbox, folder, args.dry_run, args.limit)
for k in totals:
totals[k] += s[k]
print(f" -> mailbox total: new={totals['new']} sync={totals['sync']} removed={totals['removed']} err={totals['errors']}")
return totals
def main():
ap = argparse.ArgumentParser(description=f"parse_emails_graph delta sync v{SCRIPT_VERSION}")
ap.add_argument("--mailbox", default="",
help="E-mail schranky (= kolekce v Mongo). "
"Bez argumentu projede vsechny schranky z `emaily` (mimo SKIP_MAILBOXES).")
ap.add_argument("--folder", default="", help="Filtruje slozky obsahujici tento retezec (default: vsechny)")
ap.add_argument("--limit", type=int, default=0, help="Max polozek na slozku (test)")
ap.add_argument("--reset", action="store_true",
help="Smaze deltaLinky pro vybrane schranky — pristi beh zacne od fresh delta")
ap.add_argument("--dry-run", action="store_true", help="Nic neulozi do Mongo, jen vypise co by se stalo")
args = ap.parse_args()
print(f"=== Delta sync v{SCRIPT_VERSION} ===")
if args.dry_run:
print(" DRY-RUN — zadne zmeny v Mongo")
print("Pripojuji se k MongoDB...")
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
client.admin.command("ping")
db = client[MONGO_DB]
if args.mailbox:
if args.mailbox in SKIP_MAILBOXES:
print(f" CHYBA: {args.mailbox} je v SKIP_MAILBOXES — neni Graph pristup.")
sys.exit(2)
mailboxes = [args.mailbox]
else:
mailboxes = discover_mailboxes(db)
print(f" Schranky ke zpracovani: {len(mailboxes)}")
for m in mailboxes:
print(f" {m}")
print("Token Graph API...")
get_token()
print(" OK")
t0 = time.time()
grand = {"new": 0, "sync": 0, "removed": 0, "errors": 0}
per_mailbox = []
for mb in mailboxes:
try:
s = sync_mailbox(client, mb, args)
except Exception as e:
print(f" FATAL pri sync {mb}: {e}")
logging.error("sync_mailbox %s: %s", mb, e)
s = {"new": 0, "sync": 0, "removed": 0, "errors": 1}
per_mailbox.append((mb, s))
for k in grand:
grand[k] += s[k]
dt = time.time() - t0
print(f"\n=== SHRNUTI ===")
for mb, s in per_mailbox:
print(f" {mb:40} new={s['new']:>5} sync={s['sync']:>5} removed={s['removed']:>4} err={s['errors']:>3}")
print(f" {'TOTAL':40} new={grand['new']:>5} sync={grand['sync']:>5} removed={grand['removed']:>4} err={grand['errors']:>3}")
print(f" trvalo: {dt:.1f} s")
return 1 if grand["errors"] > 0 else 0
if __name__ == "__main__":
sys.exit(main() or 0)
@@ -0,0 +1,523 @@
"""
==============================================================================
Skript: 1b_parse_emails_graph_delta_v1.1.py
Verze: 1.1
Datum: 2026-06-10
Autor: vladimir.buzalka
Zmeny v1.1 (2026-06-10):
- Bugfix: NON_MAILBOX_COLLECTIONS rozsireno o "jnj_messages" a
"jnj_sync_state" (pomocne kolekce JNJ folder trackingu). Predtim je
discover_mailboxes bral jako schranky -> Graph 404 na
/users/jnj_messages/mailFolders -> cely krok 1b FAIL(1) pri kazdem behu.
Popis:
Inkrementalni sync emailu pres Microsoft Graph DELTA QUERY.
Sourozenec `1_parse_emails_graph_v1.4.py` — kazdy resi jiny use case:
1_parse_emails_graph_v1.4.py = prvni plny import schranky
1b_parse_emails_graph_delta_v1.1.py = pravidelny sync (zmeny od minula)
Delta query je server-side change tracking — Graph si pamatuje "zalozku"
(deltaLink) a vraci jen to, co se od ni zmenilo:
- nove zpravy
- zmeny existujicich (isRead, flag, presun do jine slozky, kategorie)
- SMAZANE zpravy (@removed) — definitivne smazane, nikoli v kosi
Pro mail v "Deleted Items" delta nic specialniho nedela — je to porad
normalni zprava, jen s folder_path="Deleted Items". @removed prijde az
kdyz uzivatel vysype kos / Shift+Del.
State:
Kolekce `emaily.sync_state`, _id = "<mailbox>|<folder_id>".
{
mailbox, folder_id, folder_path,
delta_link, # plny URL s $deltatoken na pristi beh
last_run_at,
cumulative_new, cumulative_sync, cumulative_removed
}
Permanentne smazane zpravy:
Skript je NEMAZE z Mongo. Pouze nastavi:
permanently_deleted: True
permanently_deleted_at: <UTC datetime detekce>
Dohledani: col.find({"permanently_deleted": True})
Reuse:
Funkce extract_message / extract_sync_fields se nactou primo z modulu
1_parse_emails_graph_v1.4.py (importlib, file-based), aby se logika
extrahce nikdy nerozesla.
Spousteni:
python 1b_parse_emails_graph_delta_v1.1.py # VSECHNY schranky (mimo SKIP_MAILBOXES)
python 1b_parse_emails_graph_delta_v1.1.py --mailbox ordinace@buzalkova.cz # jedna schranka
python 1b_parse_emails_graph_delta_v1.1.py --mailbox ordinace@buzalkova.cz --folder Inbox
python 1b_parse_emails_graph_delta_v1.1.py --reset # zahodit deltaLinky a najet znova
python 1b_parse_emails_graph_delta_v1.1.py --dry-run # nic neulozit
SKIP_MAILBOXES (hardcoded):
vbuzalka@its.jnj.com — JNJ tenant, nemame Graph API pristup. Pro tuto
schranku je nutny samostatny skript (lokalni .msg).
Zavislosti:
msal, requests, pymongo, python-dateutil
Python 3.10+
==============================================================================
"""
from __future__ import annotations
import argparse
import importlib.util
import logging
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import msal
import requests
from pymongo import MongoClient, ASCENDING
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
# ─── KONFIGURACE ──────────────────────────────────────────────────────────────
GRAPH_TENANT_ID = "7d269944-37a4-43a1-8140-c7517dc426e9"
GRAPH_CLIENT_ID = "4b222bfd-78c9-4239-a53f-43006b3ed07f"
GRAPH_CLIENT_SECRET = "Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk"
GRAPH_URL = "https://graph.microsoft.com/v1.0"
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "emaily"
SYNC_STATE_COL = "sync_state"
PAGE_SIZE = 100 # delta endpoint typicky vraci max 100/stranka
LOG_FILE = Path(__file__).parent / "delta_errors.log"
SCRIPT_VERSION = "1.1"
# Kolekce v `emaily` ktere NEJSOU mailboxy:
# (jnj_messages + jnj_sync_state = pomocne kolekce JNJ folder trackingu,
# bez exclude je discover_mailboxes bere jako schranky -> Graph 404 -> FAIL)
NON_MAILBOX_COLLECTIONS = {"attachments_index", "sync_state",
"jnj_messages", "jnj_sync_state"}
# Schranky, kde NEMAME Graph API pristup — pri bezneho behu se preskoci.
# Pro tyto je nutny separatni skript (napr. lokalni .msg parser).
SKIP_MAILBOXES = {
"vbuzalka@its.jnj.com", # JNJ tenant — nemame Graph credentials
}
logging.basicConfig(
filename=str(LOG_FILE),
level=logging.ERROR,
format="%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
encoding="utf-8",
)
# Co tahnout z delta endpointu (stejne jako MSG_SELECT v v1.4, mimo internetMessageHeaders
# ktere delta neumi vratit pro vsechny polozky — pro nove zpravy si je dotahneme
# samostatnym fetchem).
DELTA_SELECT = (
"id,internetMessageId,subject,bodyPreview,body,"
"importance,isRead,isDraft,hasAttachments,"
"receivedDateTime,sentDateTime,createdDateTime,lastModifiedDateTime,"
"sender,from,toRecipients,ccRecipients,bccRecipients,replyTo,"
"conversationId,conversationIndex,parentFolderId,"
"categories,flag,inferenceClassification"
)
# Pro plne nacteni nove zpravy (vcetne hlavicek + priloh) pouzijeme stejny
# select+expand jako v1.4
FULL_FETCH_SELECT = (
"id,internetMessageId,subject,bodyPreview,body,"
"importance,isRead,isDraft,hasAttachments,"
"receivedDateTime,sentDateTime,createdDateTime,lastModifiedDateTime,"
"sender,from,toRecipients,ccRecipients,bccRecipients,replyTo,"
"conversationId,conversationIndex,parentFolderId,"
"categories,flag,inferenceClassification,internetMessageHeaders"
)
FULL_FETCH_EXPAND = "attachments($select=id,name,contentType,size,isInline)"
# ─── Reuse extract logiky z v1.4 ──────────────────────────────────────────────
_HERE = Path(__file__).parent
_V14_PATH = _HERE / "1_parse_emails_graph_v1.4.py"
if not _V14_PATH.exists():
print(f"CHYBA: chybi sourozenec {_V14_PATH.name} — extract logiku nelze nacist", file=sys.stderr)
sys.exit(1)
_spec = importlib.util.spec_from_file_location("v14_parse", _V14_PATH)
_v14 = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(_v14)
extract_message = _v14.extract_message
extract_sync_fields = _v14.extract_sync_fields
# GRAPH_MAILBOX modul-level v v1.4 — pro extract neni potreba, ale pro
# konzistenci nastavujeme ho v main()
# ─── Graph API ────────────────────────────────────────────────────────────────
_graph_token: Optional[str] = None
def get_token() -> str:
global _graph_token
app = msal.ConfidentialClientApplication(
GRAPH_CLIENT_ID,
authority=f"https://login.microsoftonline.com/{GRAPH_TENANT_ID}",
client_credential=GRAPH_CLIENT_SECRET,
)
result = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])
if "access_token" not in result:
raise RuntimeError(f"Graph auth failed: {result}")
_graph_token = result["access_token"]
return _graph_token
class DeltaExpired(Exception):
"""deltaLink expiroval (HTTP 410) — je nutne zacit od plne delta znovu."""
def graph_get(url: str, params: dict = None, allow_410: bool = False) -> dict:
"""GET na Graph s retry pri 401. Pri 410 a allow_410=True vyhodi DeltaExpired."""
global _graph_token
if not _graph_token:
get_token()
for attempt in range(3):
r = requests.get(
url,
headers={"Authorization": f"Bearer {_graph_token}"},
params=params,
timeout=60,
)
if r.status_code == 401:
get_token()
continue
if r.status_code == 410 and allow_410:
raise DeltaExpired(url)
if r.status_code == 429:
# rate limit — respect Retry-After
wait = int(r.headers.get("Retry-After", "5"))
print(f" [429] cekam {wait}s ...")
time.sleep(wait)
continue
r.raise_for_status()
return r.json()
raise RuntimeError(f"Graph GET failed after retries: {url}")
def get_all_folders(mailbox: str, parent_id: str = None, parent_path: str = "") -> list[dict]:
if parent_id is None:
url = f"{GRAPH_URL}/users/{mailbox}/mailFolders"
else:
url = f"{GRAPH_URL}/users/{mailbox}/mailFolders/{parent_id}/childFolders"
folders = []
params = {"$top": 100, "$select": "id,displayName,childFolderCount"}
while url:
data = graph_get(url, params)
for f in data.get("value", []):
path = f"{parent_path}/{f['displayName']}".lstrip("/")
folders.append({"id": f["id"], "path": path})
if f.get("childFolderCount", 0) > 0:
folders.extend(get_all_folders(mailbox, f["id"], path))
url = data.get("@odata.nextLink")
params = None
return folders
def fetch_full_message(mailbox: str, msg_id: str) -> Optional[dict]:
"""Stahne celou zpravu vcetne hlavicek a priloh — pro nove zpravy zachycene v delte."""
url = f"{GRAPH_URL}/users/{mailbox}/messages/{msg_id}"
params = {"$select": FULL_FETCH_SELECT, "$expand": FULL_FETCH_EXPAND}
try:
return graph_get(url, params)
except requests.HTTPError as e:
logging.error("fetch_full_message %s: %s", msg_id, e)
return None
# ─── Delta iterace ────────────────────────────────────────────────────────────
def iter_folder_delta(mailbox: str, folder_id: str, delta_link: Optional[str], limit: int = 0):
"""
Generator: vraci (item, final_delta_link).
item je dict s polozkou (bud zmena nebo {'@removed': ...}).
Posledni vyhozeny tuple ma final_delta_link != None (zbytek None).
Pri HTTP 410 (expirovany deltaLink) vyhodi DeltaExpired — caller ma
pustit znova s delta_link=None (= fresh full delta).
"""
if delta_link:
url = delta_link
params = None
else:
url = f"{GRAPH_URL}/users/{mailbox}/mailFolders/{folder_id}/messages/delta"
params = {"$select": DELTA_SELECT, "$top": PAGE_SIZE}
n = 0
while url:
data = graph_get(url, params, allow_410=True)
params = None
for item in data.get("value", []):
yield item, None
n += 1
if limit and n >= limit:
# ulozime aspon stavajici nextLink jako "delta" — neni to ciste,
# ale pri --limit jde o test, takze pristi beh proste pocnize znovu
return
next_link = data.get("@odata.nextLink")
final_link = data.get("@odata.deltaLink")
if final_link:
# konec — predame final delta
yield None, final_link
return
url = next_link
# ─── Per-folder sync ──────────────────────────────────────────────────────────
def sync_folder(col, sync_col, mailbox: str, folder: dict, dry_run: bool, limit: int) -> dict:
"""Vrati statistiky."""
fid = folder["id"]
fpath = folder["path"]
state_id = f"{mailbox}|{fid}"
state = sync_col.find_one({"_id": state_id})
delta_link = state.get("delta_link") if state else None
is_first_run = delta_link is None
label = "FRESH" if is_first_run else "DELTA"
print(f"\n[{label}] {fpath}")
stats = {"new": 0, "sync": 0, "removed": 0, "errors": 0}
final_delta = None
try:
gen = iter_folder_delta(mailbox, fid, delta_link, limit=limit)
for item, fin in gen:
if fin:
final_delta = fin
break
try:
process_item(col, mailbox, fpath, item, stats, dry_run)
except Exception as e:
stats["errors"] += 1
logging.error("process_item %s: %s", item.get("id", "?"), e)
except DeltaExpired:
print(f" [410] deltaLink expiroval — restart od fresh delta")
# rekurzivni restart s vymazanym statem
sync_col.delete_one({"_id": state_id})
return sync_folder(col, sync_col, mailbox, folder, dry_run, limit)
print(f" new={stats['new']} sync={stats['sync']} removed={stats['removed']} err={stats['errors']}")
# Ulozit sync_state pokud mame final_delta a neni dry run
if final_delta and not dry_run:
sync_col.update_one(
{"_id": state_id},
{
"$set": {
"mailbox": mailbox,
"folder_id": fid,
"folder_path": fpath,
"delta_link": final_delta,
"last_run_at": datetime.now(timezone.utc).replace(tzinfo=None),
},
"$inc": {
"cumulative_new": stats["new"],
"cumulative_sync": stats["sync"],
"cumulative_removed": stats["removed"],
"run_count": 1,
},
},
upsert=True,
)
elif not final_delta:
# neprisel deltaLink (napr. limit nebo chyba) — nemenime state, pristi beh
# bude pokracovat normalne podle stareho deltaLinku nebo zacne od fresh
if not is_first_run:
print(f" [pozn] delta neukoncena — pristi beh pojede od ulozeneho deltaLinku")
return stats
def process_item(col, mailbox: str, folder_path: str, item: dict, stats: dict, dry_run: bool):
"""Zpracuje jednu polozku z delta odpovedi."""
# 1) Smazana zprava (@removed)
if "@removed" in item or item.get("@removed.reason"):
graph_id = item.get("id")
if not graph_id:
return
if dry_run:
print(f" REMOVED graph_id={graph_id[:30]}...")
else:
col.update_one(
{"graph_id": graph_id},
{"$set": {
"permanently_deleted": True,
"permanently_deleted_at": datetime.now(timezone.utc).replace(tzinfo=None),
}},
)
stats["removed"] += 1
return
# 2) Nova nebo zmenena zprava — rozhodneme podle existence graph_id v Mongo
graph_id = item.get("id")
if not graph_id:
return
existing = col.find_one({"graph_id": graph_id}, {"_id": 1})
if existing:
# Existujici zprava — update jen sync poli (delta payload je obsahuje)
fields = extract_sync_fields(item, folder_path)
if dry_run:
print(f" SYNC {item.get('subject','')[:60]}")
else:
col.update_one({"_id": existing["_id"]}, {"$set": fields})
stats["sync"] += 1
else:
# Nova zprava — pro telo+attachments+headers fetchneme plnou verzi
full = fetch_full_message(mailbox, graph_id)
if full is None:
stats["errors"] += 1
return
doc = extract_message(full, folder_path)
if doc is None:
stats["errors"] += 1
return
if dry_run:
print(f" NEW {doc.get('subject','')[:60]}")
else:
col.update_one({"_id": doc["_id"]}, {"$set": doc}, upsert=True)
stats["new"] += 1
# ─── Indexy pro sync_state ────────────────────────────────────────────────────
def ensure_sync_state_indexes(sync_col):
sync_col.create_index([("mailbox", ASCENDING), ("folder_id", ASCENDING)])
sync_col.create_index([("last_run_at", ASCENDING)])
def ensure_perm_deleted_index(col):
col.create_index([("permanently_deleted", ASCENDING)], sparse=True)
# ─── Main ─────────────────────────────────────────────────────────────────────
def discover_mailboxes(db) -> list[str]:
"""Vrati seznam mailboxu = vsechny kolekce v `emaily` mimo NON_MAILBOX_COLLECTIONS
a SKIP_MAILBOXES."""
out = []
for name in sorted(db.list_collection_names()):
if name in NON_MAILBOX_COLLECTIONS:
continue
if name in SKIP_MAILBOXES:
print(f" [skip] {name} — v SKIP_MAILBOXES (neni Graph pristup)")
continue
out.append(name)
return out
def sync_mailbox(client, mailbox: str, args) -> dict:
"""Sync jedne schranky. Vraci totals dict."""
_v14.GRAPH_MAILBOX = mailbox
print(f"\n========== {mailbox} ==========")
col = client[MONGO_DB][mailbox]
sync_col = client[MONGO_DB][SYNC_STATE_COL]
if not args.dry_run:
ensure_sync_state_indexes(sync_col)
ensure_perm_deleted_index(col)
if args.reset:
n = sync_col.delete_many({"mailbox": mailbox}).deleted_count
print(f" --reset: smazano {n} deltaLinku pro {mailbox}")
print("Nacitam seznam slozek...")
try:
folders = get_all_folders(mailbox)
except requests.HTTPError as e:
print(f" CHYBA: nelze nacist slozky pro {mailbox}: {e}")
logging.error("get_all_folders %s: %s", mailbox, e)
return {"new": 0, "sync": 0, "removed": 0, "errors": 1}
if args.folder:
folders = [f for f in folders if args.folder.lower() in f["path"].lower()]
print(f" Slozek ke zpracovani: {len(folders)}")
totals = {"new": 0, "sync": 0, "removed": 0, "errors": 0}
for folder in folders:
s = sync_folder(col, sync_col, mailbox, folder, args.dry_run, args.limit)
for k in totals:
totals[k] += s[k]
print(f" -> mailbox total: new={totals['new']} sync={totals['sync']} removed={totals['removed']} err={totals['errors']}")
return totals
def main():
ap = argparse.ArgumentParser(description=f"parse_emails_graph delta sync v{SCRIPT_VERSION}")
ap.add_argument("--mailbox", default="",
help="E-mail schranky (= kolekce v Mongo). "
"Bez argumentu projede vsechny schranky z `emaily` (mimo SKIP_MAILBOXES).")
ap.add_argument("--folder", default="", help="Filtruje slozky obsahujici tento retezec (default: vsechny)")
ap.add_argument("--limit", type=int, default=0, help="Max polozek na slozku (test)")
ap.add_argument("--reset", action="store_true",
help="Smaze deltaLinky pro vybrane schranky — pristi beh zacne od fresh delta")
ap.add_argument("--dry-run", action="store_true", help="Nic neulozi do Mongo, jen vypise co by se stalo")
args = ap.parse_args()
print(f"=== Delta sync v{SCRIPT_VERSION} ===")
if args.dry_run:
print(" DRY-RUN — zadne zmeny v Mongo")
print("Pripojuji se k MongoDB...")
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
client.admin.command("ping")
db = client[MONGO_DB]
if args.mailbox:
if args.mailbox in SKIP_MAILBOXES:
print(f" CHYBA: {args.mailbox} je v SKIP_MAILBOXES — neni Graph pristup.")
sys.exit(2)
mailboxes = [args.mailbox]
else:
mailboxes = discover_mailboxes(db)
print(f" Schranky ke zpracovani: {len(mailboxes)}")
for m in mailboxes:
print(f" {m}")
print("Token Graph API...")
get_token()
print(" OK")
t0 = time.time()
grand = {"new": 0, "sync": 0, "removed": 0, "errors": 0}
per_mailbox = []
for mb in mailboxes:
try:
s = sync_mailbox(client, mb, args)
except Exception as e:
print(f" FATAL pri sync {mb}: {e}")
logging.error("sync_mailbox %s: %s", mb, e)
s = {"new": 0, "sync": 0, "removed": 0, "errors": 1}
per_mailbox.append((mb, s))
for k in grand:
grand[k] += s[k]
dt = time.time() - t0
print(f"\n=== SHRNUTI ===")
for mb, s in per_mailbox:
print(f" {mb:40} new={s['new']:>5} sync={s['sync']:>5} removed={s['removed']:>4} err={s['errors']:>3}")
print(f" {'TOTAL':40} new={grand['new']:>5} sync={grand['sync']:>5} removed={grand['removed']:>4} err={grand['errors']:>3}")
print(f" trvalo: {dt:.1f} s")
return 1 if grand["errors"] > 0 else 0
if __name__ == "__main__":
sys.exit(main() or 0)
@@ -0,0 +1,579 @@
"""
==============================================================================
Skript: enrich_fulltext_emails_v1.3.py
Verze: 1.3
Datum: 2026-06-04
Autor: vladimir.buzalka
Popis:
Vytahne plny text z emailu ulozenych v MongoDB (db: emaily) a ulozi ho do
PostgreSQL (db: MongoEmaily, tabulka: emails) s GIN tsvector indexem.
Emaily se NESTAHUJI znovu - tela uz jsou v Mongo z parse_emails_graph_v1.4
(a refetch_text_bodies_v1.0 pro stare plain-text emaily).
Tento skript jen vybere prvni dostupne telo a posle text do PG na fulltext.
Zmeny v1.3.1 (2026-06-09):
- Bugfix: _clean_for_pg nahrazuje osamocene surrogate (\\ud800-\\udfff) za U+FFFD.
Drive jeden mail se surrogaty (napr. JNJ .msg) shodil celou davku a krok 5
skoncil FAIL. EXTRACTOR_VERSION zustava 1.2 (neni zmena fallback logiky).
Zmeny v1.3 vs v1.2:
- Bugfix: NON_MAILBOX_COLLECTIONS = {"attachments_index", "sync_state"}
(sync_state pribyla v delta syncu, predtim ji v1.2 brala jako mailbox).
- --index-reset: pred zpracovanim schranky vymaze vsechny jeji emaily z PG
(force re-extract; pouzij kdyz povysis EXTRACTOR_VERSION nebo chces ciste).
- Vylepseny header per-mailbox: ukaze pocet v Mongu, v PG a k zpracovani.
Zmeny v1.2 vs v1.1:
- S/MIME emaily: pokud unwrap_smime_v1.0 ulozil smime_body_text/smime_body_html,
pouzije se PREFEROVANE pred bezvyznamnym wrapper telem.
- body_source: nova hodnota "smime".
- EXTRACTOR_VERSION=1.2 -> vsechny existujici emaily v PG se preparsuji.
Zmeny v1.1 vs v1.0:
- Fallback poradi rozsireno o body_text.
- body_source umi novou hodnotu "text" (plne plain-text telo, max 2 MB).
Zdroj:
MongoDB 192.168.1.76 db=emaily kolekce=<mailbox>
(krome NON_MAILBOX_COLLECTIONS)
Cil:
PostgreSQL 192.168.1.76 db=MongoEmaily tabulka=emails
tsvector config 'soubory' (sdileny - simple + unaccent)
Inkrementalita:
Pokud (mailbox, message_id) jiz existuje a extractor_version je aktualni
a modified_at v Mongo neni novejsi -> skip. Pri zmene verze extractoru
se vse preparsuje. --index-reset to obejde a smaze PG pred behom.
Spusteni:
python enrich_fulltext_emails_v1.3.py # vsechny schranky
python enrich_fulltext_emails_v1.3.py --mailbox ordinace@buzalkova.cz
python enrich_fulltext_emails_v1.3.py --limit 500 # test
python enrich_fulltext_emails_v1.3.py --mailbox X --index-reset # smaze PG schranky a re-extrahuje vsechno
python enrich_fulltext_emails_v1.3.py --index-reset # smaze CELY index a postavi znovu (POMALE!)
==============================================================================
"""
from __future__ import annotations
import argparse
import re
import sys
import time
import traceback
from datetime import datetime, timezone
from typing import Optional
import psycopg
from bs4 import BeautifulSoup
from pymongo import MongoClient
# --- konfigurace ------------------------------------------------------------
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "emaily"
PG_DSN = ("host=192.168.1.76 port=5432 dbname=MongoEmaily "
"user=vladimir.buzalka password=Vlado7309208104++")
EXTRACTOR_VERSION = "1.2" # NEMENIT pokud nemenis fallback logiku!
MAX_TEXT_BYTES = 5 * 1024 * 1024 # plain text max 5 MB
# Kolekce v `emaily` ktere NEJSOU mailboxy (nezpracovavame)
NON_MAILBOX_COLLECTIONS = {"attachments_index", "sync_state"}
BATCH_SIZE = 100
# --- SCHEMA -----------------------------------------------------------------
SCHEMA_SQL = """
CREATE EXTENSION IF NOT EXISTS unaccent;
CREATE EXTENSION IF NOT EXISTS pg_trgm;
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_ts_config WHERE cfgname = 'soubory') THEN
CREATE TEXT SEARCH CONFIGURATION soubory ( COPY = simple );
ALTER TEXT SEARCH CONFIGURATION soubory
ALTER MAPPING FOR hword, hword_part, word
WITH unaccent, simple;
END IF;
END$$;
CREATE TABLE IF NOT EXISTS emails (
id BIGSERIAL PRIMARY KEY,
mailbox TEXT NOT NULL,
message_id TEXT NOT NULL,
graph_id TEXT,
conversation_id TEXT,
folder_path TEXT,
subject TEXT,
sender_email TEXT,
sender_name TEXT,
to_addrs TEXT,
cc_addrs TEXT,
bcc_addrs TEXT,
sent_at TIMESTAMPTZ,
received_at TIMESTAMPTZ,
modified_at TIMESTAMPTZ,
is_read BOOLEAN,
is_draft BOOLEAN,
has_attachments BOOLEAN,
attachment_count INT,
attachments_summary TEXT,
body TEXT,
body_length INT,
body_source TEXT, -- 'html' | 'preview' | 'empty'
tsv tsvector GENERATED ALWAYS AS (
to_tsvector('soubory'::regconfig,
left(
coalesce(subject, '') || ' ' ||
coalesce(sender_email, '') || ' ' ||
coalesce(sender_name, '') || ' ' ||
coalesce(to_addrs, '') || ' ' ||
coalesce(cc_addrs, '') || ' ' ||
coalesce(attachments_summary, '') || ' ' ||
coalesce(body, ''),
800000)
)
) STORED,
extracted_at TIMESTAMPTZ DEFAULT now(),
extractor_version TEXT,
ok BOOLEAN,
error TEXT,
UNIQUE (mailbox, message_id)
);
CREATE INDEX IF NOT EXISTS emails_tsv_gin ON emails USING gin(tsv);
CREATE INDEX IF NOT EXISTS emails_subject_trgm ON emails USING gin(subject gin_trgm_ops);
CREATE INDEX IF NOT EXISTS emails_sender_email_idx ON emails(sender_email);
CREATE INDEX IF NOT EXISTS emails_mailbox_idx ON emails(mailbox);
CREATE INDEX IF NOT EXISTS emails_received_idx ON emails(received_at DESC);
CREATE INDEX IF NOT EXISTS emails_conv_idx ON emails(conversation_id);
"""
# --- HELPERY ----------------------------------------------------------------
_CTRL_RX = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f]")
_WS_RX = re.compile(r"[ \t]+")
_NL_RX = re.compile(r"\n{3,}")
# Osamocene surrogate (\ud800-\udfff) jsou neplatne v UTF-8 -> psycopg pri zapisu
# vyhodi UnicodeEncodeError ("surrogates not allowed") a shodi celou davku.
# Vznikaji ze spatne dekodovanych tel (napr. nektere JNJ .msg). Nahradime je U+FFFD.
_SURROGATE_RX = re.compile(r"[\ud800-\udfff]")
def _clean_for_pg(s: str) -> str:
if not s:
return ""
s = _CTRL_RX.sub("", s)
if _SURROGATE_RX.search(s):
s = _SURROGATE_RX.sub("", s)
return s
def _truncate(s: str) -> str:
s = _clean_for_pg(s or "")
if not s:
return ""
b = s.encode("utf-8", errors="replace")
if len(b) <= MAX_TEXT_BYTES:
return s
return b[:MAX_TEXT_BYTES].decode("utf-8", errors="ignore")
def html_to_text(html: str) -> str:
if not html:
return ""
try:
soup = BeautifulSoup(html, "lxml")
except Exception:
soup = BeautifulSoup(html, "html.parser")
for tag in soup(["script", "style", "head"]):
tag.decompose()
text = soup.get_text(separator="\n")
lines = [_WS_RX.sub(" ", ln).strip() for ln in text.split("\n")]
text = "\n".join(ln for ln in lines if ln)
text = _NL_RX.sub("\n\n", text)
return text
def fmt_recipients(recipients: list, kind: str) -> str:
if not recipients:
return ""
out = []
for r in recipients:
if not isinstance(r, dict):
continue
if r.get("type") != kind:
continue
name = (r.get("name") or "").strip()
email = (r.get("email") or "").strip()
if name and email:
out.append(f"{name} <{email}>")
elif email:
out.append(email)
elif name:
out.append(name)
return "; ".join(out)
def fmt_attachments(attachments: list) -> str:
if not attachments:
return ""
out = []
for a in attachments[:20]:
if not isinstance(a, dict):
continue
name = a.get("name") or a.get("filename") or ""
if name:
out.append(name)
return " | ".join(out)
def _short(s, n=60):
if not s:
return ""
s = str(s).replace("\n", " ").strip()
return s if len(s) <= n else s[:n] + "..."
def _now() -> datetime:
return datetime.now(tz=timezone.utc)
def _aware_utc(dt: Optional[datetime]) -> Optional[datetime]:
"""Sjednoceni: PG TIMESTAMPTZ -> tz-aware UTC; Mongo datetime -> naive (UTC).
Vrati tz-aware UTC datetime nebo None."""
if dt is None:
return None
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
# --- HLAVNI SMYCKA ----------------------------------------------------------
def process_mailbox(pg: psycopg.Connection, mongo_coll, mailbox: str,
limit: Optional[int] = None,
index_reset: bool = False) -> dict:
# --index-reset: smaz vse pro tuto schranku v PG
if index_reset:
with pg.cursor() as cur:
cur.execute("DELETE FROM emails WHERE mailbox = %s", (mailbox,))
deleted = cur.rowcount
pg.commit()
print(f"[{mailbox}] --index-reset: smazano {deleted} radku v PG")
# existujici zaznamy v PG (rychly inkrementalni lookup)
# tuple = (extractor_version, ok, body_source)
with pg.cursor() as cur:
cur.execute(
"SELECT message_id, extractor_version, ok, body_source "
"FROM emails WHERE mailbox = %s",
(mailbox,),
)
existing = {row[0]: (row[1], row[2], row[3]) for row in cur.fetchall()}
mongo_total = mongo_coll.estimated_document_count()
pg_total = len(existing)
pg_uptodate = sum(1 for v in existing.values()
if v[0] == EXTRACTOR_VERSION and v[1])
to_process_estimate = mongo_total - pg_uptodate
print(f"\n========== {mailbox} ==========")
print(f" v Mongu: {mongo_total}")
print(f" v PG: {pg_total} (z toho ext_v={EXTRACTOR_VERSION} & ok=true: {pg_uptodate})")
print(f" k zpracovani: ~{to_process_estimate}{' (limit=' + str(limit) + ')' if limit else ''}")
if to_process_estimate <= 0 and not index_reset and not limit:
print(" Nic noveho ke zpracovani.")
return {"mailbox": mailbox, "processed": 0, "ok": 0, "errors": 0,
"skipped": pg_uptodate, "empty_body": 0}
proj = {
"_id": 1, "graph_id": 1, "conversation_id": 1, "folder_path": 1,
"subject": 1, "sender": 1, "recipients": 1,
"sent_at": 1, "received_at": 1, "modified_at": 1,
"is_read": 1, "is_draft": 1,
"has_attachments": 1, "attachment_count": 1, "attachments": 1,
"body_html": 1, "body_text": 1, "body_preview": 1,
"smime_unwrapped": 1, "smime_body_text": 1, "smime_body_html": 1,
"smime_subject": 1, "smime_inner_attachments": 1,
}
cursor = mongo_coll.find({}, proj, no_cursor_timeout=True)
if limit:
cursor = cursor.limit(limit)
processed = ok = errors = skipped = empty_body = 0
queue: list[dict] = []
n = 0
try:
for doc in cursor:
n += 1
msg_id = doc.get("_id") or ""
prev = existing.get(msg_id) # (extractor_version, ok, body_source)
mongo_mtime = doc.get("modified_at")
# Skip kdyz PG ma stejnou EV a ok=true.
# Vyjimka: smime_unwrapped v Mongu, ale PG body_source != 'smime'
# -> unwrap_smime pridal rozbaleny text az po enrichu -> re-enrich.
if prev and prev[0] == EXTRACTOR_VERSION and prev[1]:
needs_smime_reindex = (
bool(doc.get("smime_unwrapped"))
and prev[2] != "smime"
)
if not needs_smime_reindex:
skipped += 1
continue
sender = doc.get("sender") or {}
recipients = doc.get("recipients") or []
attachments = doc.get("attachments") or []
inner = doc.get("smime_inner_attachments") or []
if inner:
attachments = list(attachments) + [
{"filename": (a.get("filename") or "") + " [smime]"}
for a in inner if a.get("filename")
]
row = {
"mailbox": mailbox,
"message_id": msg_id,
"graph_id": doc.get("graph_id"),
"conversation_id": doc.get("conversation_id"),
"folder_path": doc.get("folder_path"),
"subject": doc.get("subject") or "",
"sender_email": sender.get("email"),
"sender_name": sender.get("name"),
"to_addrs": fmt_recipients(recipients, "to"),
"cc_addrs": fmt_recipients(recipients, "cc"),
"bcc_addrs": fmt_recipients(recipients, "bcc"),
# Vsechny timestampy z Monga jsou naive ale interpretovany jako UTC.
# Tagneme je tz-aware aby PG TIMESTAMPTZ ulozil spravnou UTC hodnotu
# a nepocital posun podle session timezone.
"sent_at": _aware_utc(doc.get("sent_at")),
"received_at": _aware_utc(doc.get("received_at")),
"modified_at": _aware_utc(mongo_mtime),
"is_read": doc.get("is_read"),
"is_draft": doc.get("is_draft"),
"has_attachments": doc.get("has_attachments"),
"attachment_count": doc.get("attachment_count"),
"attachments_summary": fmt_attachments(attachments),
"body": None,
"body_length": 0,
"body_source": "empty",
"extracted_at": _now(),
"extractor_version": EXTRACTOR_VERSION,
"ok": False,
"error": None,
}
status = "OK "; detail = ""
try:
text = ""
if doc.get("smime_unwrapped"):
s_text = doc.get("smime_body_text") or ""
s_html = doc.get("smime_body_html") or ""
s_html_text = html_to_text(s_html) if s_html else ""
combined = "\n\n".join(p for p in (s_text, s_html_text) if p)
s_subject = doc.get("smime_subject") or ""
if s_subject:
combined = f"Subject: {s_subject}\n\n{combined}"
if combined:
text = combined
row["body_source"] = "smime"
if not text:
html = doc.get("body_html") or ""
h_text = html_to_text(html) if html else ""
if h_text:
text = h_text
row["body_source"] = "html"
if not text:
plain = doc.get("body_text") or ""
if plain:
text = plain
row["body_source"] = "text"
if not text:
preview = doc.get("body_preview") or ""
if preview:
text = preview
row["body_source"] = "preview"
if not text:
row["body_source"] = "empty"
empty_body += 1
body = _truncate(text)
row["body"] = body if body else None
row["body_length"] = len(body)
row["ok"] = True
ok += 1
detail = f"{len(body)} znaku {_short(body, 60)!r}"
except Exception as e:
row["error"] = f"{type(e).__name__}: {e}"[:500]
status = "ERR"; detail = row["error"][:80]; errors += 1
queue.append(row)
processed += 1
if processed % 200 == 0 or processed == 1:
subj = _short(row["subject"], 50)
print(f" [{n:>6}|p={processed:>5}] {status} {row['body_source']:<7} "
f"{row['body_length']:>7}ch | {subj}", flush=True)
if len(queue) >= BATCH_SIZE:
_flush(pg, queue); queue.clear()
finally:
cursor.close()
if queue:
_flush(pg, queue)
return {"mailbox": mailbox, "processed": processed, "ok": ok,
"errors": errors, "skipped": skipped, "empty_body": empty_body}
UPSERT_SQL = """
INSERT INTO emails
(mailbox, message_id, graph_id, conversation_id, folder_path,
subject, sender_email, sender_name, to_addrs, cc_addrs, bcc_addrs,
sent_at, received_at, modified_at, is_read, is_draft,
has_attachments, attachment_count, attachments_summary,
body, body_length, body_source,
extracted_at, extractor_version, ok, error)
VALUES
(%(mailbox)s, %(message_id)s, %(graph_id)s, %(conversation_id)s, %(folder_path)s,
%(subject)s, %(sender_email)s, %(sender_name)s, %(to_addrs)s, %(cc_addrs)s, %(bcc_addrs)s,
%(sent_at)s, %(received_at)s, %(modified_at)s, %(is_read)s, %(is_draft)s,
%(has_attachments)s, %(attachment_count)s, %(attachments_summary)s,
%(body)s, %(body_length)s, %(body_source)s,
%(extracted_at)s, %(extractor_version)s, %(ok)s, %(error)s)
ON CONFLICT (mailbox, message_id) DO UPDATE SET
graph_id = EXCLUDED.graph_id,
conversation_id = EXCLUDED.conversation_id,
folder_path = EXCLUDED.folder_path,
subject = EXCLUDED.subject,
sender_email = EXCLUDED.sender_email,
sender_name = EXCLUDED.sender_name,
to_addrs = EXCLUDED.to_addrs,
cc_addrs = EXCLUDED.cc_addrs,
bcc_addrs = EXCLUDED.bcc_addrs,
sent_at = EXCLUDED.sent_at,
received_at = EXCLUDED.received_at,
modified_at = EXCLUDED.modified_at,
is_read = EXCLUDED.is_read,
is_draft = EXCLUDED.is_draft,
has_attachments = EXCLUDED.has_attachments,
attachment_count = EXCLUDED.attachment_count,
attachments_summary = EXCLUDED.attachments_summary,
body = EXCLUDED.body,
body_length = EXCLUDED.body_length,
body_source = EXCLUDED.body_source,
extracted_at = EXCLUDED.extracted_at,
extractor_version = EXCLUDED.extractor_version,
ok = EXCLUDED.ok,
error = EXCLUDED.error
"""
def _flush(pg: psycopg.Connection, rows: list[dict]) -> None:
for r in rows:
for k in ("subject", "sender_email", "sender_name", "to_addrs", "cc_addrs",
"bcc_addrs", "attachments_summary", "body", "error", "folder_path"):
if r.get(k):
r[k] = _clean_for_pg(r[k])
with pg.cursor() as cur:
cur.executemany(UPSERT_SQL, rows)
pg.commit()
def discover_mailboxes(db) -> list[str]:
out = []
for name in sorted(db.list_collection_names()):
if name in NON_MAILBOX_COLLECTIONS:
continue
out.append(name)
return out
def main() -> int:
ap = argparse.ArgumentParser(description="enrich_fulltext_emails v1.3")
ap.add_argument("--mailbox", default="",
help="Jedna konkretni schranka. Bez argumentu projede vsechny.")
ap.add_argument("--limit", type=int,
help="Limit emailu na schranku (test)")
ap.add_argument("--index-reset", action="store_true",
help="Pred zpracovanim schranky vymaze vsechny jeji emaily z PG "
"(force re-extract). Bez --mailbox SMAZE CELY index.")
args = ap.parse_args()
t0 = time.time()
print(f"=== enrich_fulltext_emails v1.3 ===")
print(f"Start: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("\nPripojuji se k PostgreSQL...")
pg = psycopg.connect(PG_DSN, connect_timeout=10)
with pg.cursor() as cur:
cur.execute(SCHEMA_SQL)
pg.commit()
print(" Schema OK.")
print("Pripojuji se k MongoDB...")
mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
mongo.admin.command("ping")
db = mongo[MONGO_DB]
print(" MongoDB OK.")
if args.mailbox:
mailboxes = [args.mailbox]
else:
mailboxes = discover_mailboxes(db)
print(f"\nSchranky ke zpracovani ({len(mailboxes)}):")
for mb in mailboxes:
print(f" - {mb}")
if args.index_reset and not args.mailbox:
print(f"\n!!! --index-reset bez --mailbox => SMAZE CELY INDEX ({len(mailboxes)} schranek) !!!")
results = []
for mb in mailboxes:
try:
results.append(process_mailbox(pg, db[mb], mb,
limit=args.limit,
index_reset=args.index_reset))
except Exception as e:
traceback.print_exc()
print(f" FATAL pri zpracovani {mb}: {e}")
results.append({"mailbox": mb, "processed": 0, "ok": 0,
"errors": 1, "skipped": 0, "empty_body": 0})
pg.close()
print("\n" + "="*60)
print("=== SHRNUTI ===")
grand = {"processed": 0, "ok": 0, "errors": 0, "skipped": 0, "empty_body": 0}
for r in results:
print(f" {r['mailbox']:40} processed={r['processed']:>5} ok={r['ok']:>5} "
f"errors={r['errors']:>3} skipped={r['skipped']:>6} empty={r['empty_body']:>4}")
for k in grand:
grand[k] += r.get(k, 0)
print(f" {'TOTAL':40} processed={grand['processed']:>5} ok={grand['ok']:>5} "
f"errors={grand['errors']:>3} skipped={grand['skipped']:>6} empty={grand['empty_body']:>4}")
print(f"\nCelkem trvalo: {time.time() - t0:.1f} s")
print(f"Konec: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# exit code: 0 jen kdyz vsechny schranky probehly bez chyby
return 1 if grand["errors"] > 0 else 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except KeyboardInterrupt:
print("\nPreruseno uzivatelem")
except Exception:
traceback.print_exc()
sys.exit(1)
@@ -0,0 +1,587 @@
"""
==============================================================================
Skript: enrich_fulltext_emails_v1.4.py
Verze: 1.4
Datum: 2026-06-10
Autor: vladimir.buzalka
Zmeny v1.4 (2026-06-10):
- Bugfix: NON_MAILBOX_COLLECTIONS rozsireno o "jnj_messages" a
"jnj_sync_state" (pomocne kolekce JNJ folder trackingu). Predtim je
discover_mailboxes bral jako schranky (jiny schema dokumentu) ->
errors=1 -> cely krok 5 FAIL(1) pri kazdem behu pipeline.
Popis:
Vytahne plny text z emailu ulozenych v MongoDB (db: emaily) a ulozi ho do
PostgreSQL (db: MongoEmaily, tabulka: emails) s GIN tsvector indexem.
Emaily se NESTAHUJI znovu - tela uz jsou v Mongo z parse_emails_graph_v1.4
(a refetch_text_bodies_v1.0 pro stare plain-text emaily).
Tento skript jen vybere prvni dostupne telo a posle text do PG na fulltext.
Zmeny v1.3.1 (2026-06-09):
- Bugfix: _clean_for_pg nahrazuje osamocene surrogate (\\ud800-\\udfff) za U+FFFD.
Drive jeden mail se surrogaty (napr. JNJ .msg) shodil celou davku a krok 5
skoncil FAIL. EXTRACTOR_VERSION zustava 1.2 (neni zmena fallback logiky).
Zmeny v1.3 vs v1.2:
- Bugfix: NON_MAILBOX_COLLECTIONS = {"attachments_index", "sync_state"}
(sync_state pribyla v delta syncu, predtim ji v1.2 brala jako mailbox).
- --index-reset: pred zpracovanim schranky vymaze vsechny jeji emaily z PG
(force re-extract; pouzij kdyz povysis EXTRACTOR_VERSION nebo chces ciste).
- Vylepseny header per-mailbox: ukaze pocet v Mongu, v PG a k zpracovani.
Zmeny v1.2 vs v1.1:
- S/MIME emaily: pokud unwrap_smime_v1.0 ulozil smime_body_text/smime_body_html,
pouzije se PREFEROVANE pred bezvyznamnym wrapper telem.
- body_source: nova hodnota "smime".
- EXTRACTOR_VERSION=1.2 -> vsechny existujici emaily v PG se preparsuji.
Zmeny v1.1 vs v1.0:
- Fallback poradi rozsireno o body_text.
- body_source umi novou hodnotu "text" (plne plain-text telo, max 2 MB).
Zdroj:
MongoDB 192.168.1.76 db=emaily kolekce=<mailbox>
(krome NON_MAILBOX_COLLECTIONS)
Cil:
PostgreSQL 192.168.1.76 db=MongoEmaily tabulka=emails
tsvector config 'soubory' (sdileny - simple + unaccent)
Inkrementalita:
Pokud (mailbox, message_id) jiz existuje a extractor_version je aktualni
a modified_at v Mongo neni novejsi -> skip. Pri zmene verze extractoru
se vse preparsuje. --index-reset to obejde a smaze PG pred behom.
Spusteni:
python enrich_fulltext_emails_v1.4.py # vsechny schranky
python enrich_fulltext_emails_v1.4.py --mailbox ordinace@buzalkova.cz
python enrich_fulltext_emails_v1.4.py --limit 500 # test
python enrich_fulltext_emails_v1.4.py --mailbox X --index-reset # smaze PG schranky a re-extrahuje vsechno
python enrich_fulltext_emails_v1.4.py --index-reset # smaze CELY index a postavi znovu (POMALE!)
==============================================================================
"""
from __future__ import annotations
import argparse
import re
import sys
import time
import traceback
from datetime import datetime, timezone
from typing import Optional
import psycopg
from bs4 import BeautifulSoup
from pymongo import MongoClient
# --- konfigurace ------------------------------------------------------------
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "emaily"
PG_DSN = ("host=192.168.1.76 port=5432 dbname=MongoEmaily "
"user=vladimir.buzalka password=Vlado7309208104++")
EXTRACTOR_VERSION = "1.2" # NEMENIT pokud nemenis fallback logiku!
MAX_TEXT_BYTES = 5 * 1024 * 1024 # plain text max 5 MB
# Kolekce v `emaily` ktere NEJSOU mailboxy (nezpracovavame)
# (jnj_messages + jnj_sync_state = pomocne kolekce JNJ folder trackingu)
NON_MAILBOX_COLLECTIONS = {"attachments_index", "sync_state",
"jnj_messages", "jnj_sync_state"}
BATCH_SIZE = 100
# --- SCHEMA -----------------------------------------------------------------
SCHEMA_SQL = """
CREATE EXTENSION IF NOT EXISTS unaccent;
CREATE EXTENSION IF NOT EXISTS pg_trgm;
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_ts_config WHERE cfgname = 'soubory') THEN
CREATE TEXT SEARCH CONFIGURATION soubory ( COPY = simple );
ALTER TEXT SEARCH CONFIGURATION soubory
ALTER MAPPING FOR hword, hword_part, word
WITH unaccent, simple;
END IF;
END$$;
CREATE TABLE IF NOT EXISTS emails (
id BIGSERIAL PRIMARY KEY,
mailbox TEXT NOT NULL,
message_id TEXT NOT NULL,
graph_id TEXT,
conversation_id TEXT,
folder_path TEXT,
subject TEXT,
sender_email TEXT,
sender_name TEXT,
to_addrs TEXT,
cc_addrs TEXT,
bcc_addrs TEXT,
sent_at TIMESTAMPTZ,
received_at TIMESTAMPTZ,
modified_at TIMESTAMPTZ,
is_read BOOLEAN,
is_draft BOOLEAN,
has_attachments BOOLEAN,
attachment_count INT,
attachments_summary TEXT,
body TEXT,
body_length INT,
body_source TEXT, -- 'html' | 'preview' | 'empty'
tsv tsvector GENERATED ALWAYS AS (
to_tsvector('soubory'::regconfig,
left(
coalesce(subject, '') || ' ' ||
coalesce(sender_email, '') || ' ' ||
coalesce(sender_name, '') || ' ' ||
coalesce(to_addrs, '') || ' ' ||
coalesce(cc_addrs, '') || ' ' ||
coalesce(attachments_summary, '') || ' ' ||
coalesce(body, ''),
800000)
)
) STORED,
extracted_at TIMESTAMPTZ DEFAULT now(),
extractor_version TEXT,
ok BOOLEAN,
error TEXT,
UNIQUE (mailbox, message_id)
);
CREATE INDEX IF NOT EXISTS emails_tsv_gin ON emails USING gin(tsv);
CREATE INDEX IF NOT EXISTS emails_subject_trgm ON emails USING gin(subject gin_trgm_ops);
CREATE INDEX IF NOT EXISTS emails_sender_email_idx ON emails(sender_email);
CREATE INDEX IF NOT EXISTS emails_mailbox_idx ON emails(mailbox);
CREATE INDEX IF NOT EXISTS emails_received_idx ON emails(received_at DESC);
CREATE INDEX IF NOT EXISTS emails_conv_idx ON emails(conversation_id);
"""
# --- HELPERY ----------------------------------------------------------------
_CTRL_RX = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f]")
_WS_RX = re.compile(r"[ \t]+")
_NL_RX = re.compile(r"\n{3,}")
# Osamocene surrogate (\ud800-\udfff) jsou neplatne v UTF-8 -> psycopg pri zapisu
# vyhodi UnicodeEncodeError ("surrogates not allowed") a shodi celou davku.
# Vznikaji ze spatne dekodovanych tel (napr. nektere JNJ .msg). Nahradime je U+FFFD.
_SURROGATE_RX = re.compile(r"[\ud800-\udfff]")
def _clean_for_pg(s: str) -> str:
if not s:
return ""
s = _CTRL_RX.sub("", s)
if _SURROGATE_RX.search(s):
s = _SURROGATE_RX.sub("", s)
return s
def _truncate(s: str) -> str:
s = _clean_for_pg(s or "")
if not s:
return ""
b = s.encode("utf-8", errors="replace")
if len(b) <= MAX_TEXT_BYTES:
return s
return b[:MAX_TEXT_BYTES].decode("utf-8", errors="ignore")
def html_to_text(html: str) -> str:
if not html:
return ""
try:
soup = BeautifulSoup(html, "lxml")
except Exception:
soup = BeautifulSoup(html, "html.parser")
for tag in soup(["script", "style", "head"]):
tag.decompose()
text = soup.get_text(separator="\n")
lines = [_WS_RX.sub(" ", ln).strip() for ln in text.split("\n")]
text = "\n".join(ln for ln in lines if ln)
text = _NL_RX.sub("\n\n", text)
return text
def fmt_recipients(recipients: list, kind: str) -> str:
if not recipients:
return ""
out = []
for r in recipients:
if not isinstance(r, dict):
continue
if r.get("type") != kind:
continue
name = (r.get("name") or "").strip()
email = (r.get("email") or "").strip()
if name and email:
out.append(f"{name} <{email}>")
elif email:
out.append(email)
elif name:
out.append(name)
return "; ".join(out)
def fmt_attachments(attachments: list) -> str:
if not attachments:
return ""
out = []
for a in attachments[:20]:
if not isinstance(a, dict):
continue
name = a.get("name") or a.get("filename") or ""
if name:
out.append(name)
return " | ".join(out)
def _short(s, n=60):
if not s:
return ""
s = str(s).replace("\n", " ").strip()
return s if len(s) <= n else s[:n] + "..."
def _now() -> datetime:
return datetime.now(tz=timezone.utc)
def _aware_utc(dt: Optional[datetime]) -> Optional[datetime]:
"""Sjednoceni: PG TIMESTAMPTZ -> tz-aware UTC; Mongo datetime -> naive (UTC).
Vrati tz-aware UTC datetime nebo None."""
if dt is None:
return None
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
# --- HLAVNI SMYCKA ----------------------------------------------------------
def process_mailbox(pg: psycopg.Connection, mongo_coll, mailbox: str,
limit: Optional[int] = None,
index_reset: bool = False) -> dict:
# --index-reset: smaz vse pro tuto schranku v PG
if index_reset:
with pg.cursor() as cur:
cur.execute("DELETE FROM emails WHERE mailbox = %s", (mailbox,))
deleted = cur.rowcount
pg.commit()
print(f"[{mailbox}] --index-reset: smazano {deleted} radku v PG")
# existujici zaznamy v PG (rychly inkrementalni lookup)
# tuple = (extractor_version, ok, body_source)
with pg.cursor() as cur:
cur.execute(
"SELECT message_id, extractor_version, ok, body_source "
"FROM emails WHERE mailbox = %s",
(mailbox,),
)
existing = {row[0]: (row[1], row[2], row[3]) for row in cur.fetchall()}
mongo_total = mongo_coll.estimated_document_count()
pg_total = len(existing)
pg_uptodate = sum(1 for v in existing.values()
if v[0] == EXTRACTOR_VERSION and v[1])
to_process_estimate = mongo_total - pg_uptodate
print(f"\n========== {mailbox} ==========")
print(f" v Mongu: {mongo_total}")
print(f" v PG: {pg_total} (z toho ext_v={EXTRACTOR_VERSION} & ok=true: {pg_uptodate})")
print(f" k zpracovani: ~{to_process_estimate}{' (limit=' + str(limit) + ')' if limit else ''}")
if to_process_estimate <= 0 and not index_reset and not limit:
print(" Nic noveho ke zpracovani.")
return {"mailbox": mailbox, "processed": 0, "ok": 0, "errors": 0,
"skipped": pg_uptodate, "empty_body": 0}
proj = {
"_id": 1, "graph_id": 1, "conversation_id": 1, "folder_path": 1,
"subject": 1, "sender": 1, "recipients": 1,
"sent_at": 1, "received_at": 1, "modified_at": 1,
"is_read": 1, "is_draft": 1,
"has_attachments": 1, "attachment_count": 1, "attachments": 1,
"body_html": 1, "body_text": 1, "body_preview": 1,
"smime_unwrapped": 1, "smime_body_text": 1, "smime_body_html": 1,
"smime_subject": 1, "smime_inner_attachments": 1,
}
cursor = mongo_coll.find({}, proj, no_cursor_timeout=True)
if limit:
cursor = cursor.limit(limit)
processed = ok = errors = skipped = empty_body = 0
queue: list[dict] = []
n = 0
try:
for doc in cursor:
n += 1
msg_id = doc.get("_id") or ""
prev = existing.get(msg_id) # (extractor_version, ok, body_source)
mongo_mtime = doc.get("modified_at")
# Skip kdyz PG ma stejnou EV a ok=true.
# Vyjimka: smime_unwrapped v Mongu, ale PG body_source != 'smime'
# -> unwrap_smime pridal rozbaleny text az po enrichu -> re-enrich.
if prev and prev[0] == EXTRACTOR_VERSION and prev[1]:
needs_smime_reindex = (
bool(doc.get("smime_unwrapped"))
and prev[2] != "smime"
)
if not needs_smime_reindex:
skipped += 1
continue
sender = doc.get("sender") or {}
recipients = doc.get("recipients") or []
attachments = doc.get("attachments") or []
inner = doc.get("smime_inner_attachments") or []
if inner:
attachments = list(attachments) + [
{"filename": (a.get("filename") or "") + " [smime]"}
for a in inner if a.get("filename")
]
row = {
"mailbox": mailbox,
"message_id": msg_id,
"graph_id": doc.get("graph_id"),
"conversation_id": doc.get("conversation_id"),
"folder_path": doc.get("folder_path"),
"subject": doc.get("subject") or "",
"sender_email": sender.get("email"),
"sender_name": sender.get("name"),
"to_addrs": fmt_recipients(recipients, "to"),
"cc_addrs": fmt_recipients(recipients, "cc"),
"bcc_addrs": fmt_recipients(recipients, "bcc"),
# Vsechny timestampy z Monga jsou naive ale interpretovany jako UTC.
# Tagneme je tz-aware aby PG TIMESTAMPTZ ulozil spravnou UTC hodnotu
# a nepocital posun podle session timezone.
"sent_at": _aware_utc(doc.get("sent_at")),
"received_at": _aware_utc(doc.get("received_at")),
"modified_at": _aware_utc(mongo_mtime),
"is_read": doc.get("is_read"),
"is_draft": doc.get("is_draft"),
"has_attachments": doc.get("has_attachments"),
"attachment_count": doc.get("attachment_count"),
"attachments_summary": fmt_attachments(attachments),
"body": None,
"body_length": 0,
"body_source": "empty",
"extracted_at": _now(),
"extractor_version": EXTRACTOR_VERSION,
"ok": False,
"error": None,
}
status = "OK "; detail = ""
try:
text = ""
if doc.get("smime_unwrapped"):
s_text = doc.get("smime_body_text") or ""
s_html = doc.get("smime_body_html") or ""
s_html_text = html_to_text(s_html) if s_html else ""
combined = "\n\n".join(p for p in (s_text, s_html_text) if p)
s_subject = doc.get("smime_subject") or ""
if s_subject:
combined = f"Subject: {s_subject}\n\n{combined}"
if combined:
text = combined
row["body_source"] = "smime"
if not text:
html = doc.get("body_html") or ""
h_text = html_to_text(html) if html else ""
if h_text:
text = h_text
row["body_source"] = "html"
if not text:
plain = doc.get("body_text") or ""
if plain:
text = plain
row["body_source"] = "text"
if not text:
preview = doc.get("body_preview") or ""
if preview:
text = preview
row["body_source"] = "preview"
if not text:
row["body_source"] = "empty"
empty_body += 1
body = _truncate(text)
row["body"] = body if body else None
row["body_length"] = len(body)
row["ok"] = True
ok += 1
detail = f"{len(body)} znaku {_short(body, 60)!r}"
except Exception as e:
row["error"] = f"{type(e).__name__}: {e}"[:500]
status = "ERR"; detail = row["error"][:80]; errors += 1
queue.append(row)
processed += 1
if processed % 200 == 0 or processed == 1:
subj = _short(row["subject"], 50)
print(f" [{n:>6}|p={processed:>5}] {status} {row['body_source']:<7} "
f"{row['body_length']:>7}ch | {subj}", flush=True)
if len(queue) >= BATCH_SIZE:
_flush(pg, queue); queue.clear()
finally:
cursor.close()
if queue:
_flush(pg, queue)
return {"mailbox": mailbox, "processed": processed, "ok": ok,
"errors": errors, "skipped": skipped, "empty_body": empty_body}
UPSERT_SQL = """
INSERT INTO emails
(mailbox, message_id, graph_id, conversation_id, folder_path,
subject, sender_email, sender_name, to_addrs, cc_addrs, bcc_addrs,
sent_at, received_at, modified_at, is_read, is_draft,
has_attachments, attachment_count, attachments_summary,
body, body_length, body_source,
extracted_at, extractor_version, ok, error)
VALUES
(%(mailbox)s, %(message_id)s, %(graph_id)s, %(conversation_id)s, %(folder_path)s,
%(subject)s, %(sender_email)s, %(sender_name)s, %(to_addrs)s, %(cc_addrs)s, %(bcc_addrs)s,
%(sent_at)s, %(received_at)s, %(modified_at)s, %(is_read)s, %(is_draft)s,
%(has_attachments)s, %(attachment_count)s, %(attachments_summary)s,
%(body)s, %(body_length)s, %(body_source)s,
%(extracted_at)s, %(extractor_version)s, %(ok)s, %(error)s)
ON CONFLICT (mailbox, message_id) DO UPDATE SET
graph_id = EXCLUDED.graph_id,
conversation_id = EXCLUDED.conversation_id,
folder_path = EXCLUDED.folder_path,
subject = EXCLUDED.subject,
sender_email = EXCLUDED.sender_email,
sender_name = EXCLUDED.sender_name,
to_addrs = EXCLUDED.to_addrs,
cc_addrs = EXCLUDED.cc_addrs,
bcc_addrs = EXCLUDED.bcc_addrs,
sent_at = EXCLUDED.sent_at,
received_at = EXCLUDED.received_at,
modified_at = EXCLUDED.modified_at,
is_read = EXCLUDED.is_read,
is_draft = EXCLUDED.is_draft,
has_attachments = EXCLUDED.has_attachments,
attachment_count = EXCLUDED.attachment_count,
attachments_summary = EXCLUDED.attachments_summary,
body = EXCLUDED.body,
body_length = EXCLUDED.body_length,
body_source = EXCLUDED.body_source,
extracted_at = EXCLUDED.extracted_at,
extractor_version = EXCLUDED.extractor_version,
ok = EXCLUDED.ok,
error = EXCLUDED.error
"""
def _flush(pg: psycopg.Connection, rows: list[dict]) -> None:
for r in rows:
for k in ("subject", "sender_email", "sender_name", "to_addrs", "cc_addrs",
"bcc_addrs", "attachments_summary", "body", "error", "folder_path"):
if r.get(k):
r[k] = _clean_for_pg(r[k])
with pg.cursor() as cur:
cur.executemany(UPSERT_SQL, rows)
pg.commit()
def discover_mailboxes(db) -> list[str]:
out = []
for name in sorted(db.list_collection_names()):
if name in NON_MAILBOX_COLLECTIONS:
continue
out.append(name)
return out
def main() -> int:
ap = argparse.ArgumentParser(description="enrich_fulltext_emails v1.4")
ap.add_argument("--mailbox", default="",
help="Jedna konkretni schranka. Bez argumentu projede vsechny.")
ap.add_argument("--limit", type=int,
help="Limit emailu na schranku (test)")
ap.add_argument("--index-reset", action="store_true",
help="Pred zpracovanim schranky vymaze vsechny jeji emaily z PG "
"(force re-extract). Bez --mailbox SMAZE CELY index.")
args = ap.parse_args()
t0 = time.time()
print(f"=== enrich_fulltext_emails v1.4 ===")
print(f"Start: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("\nPripojuji se k PostgreSQL...")
pg = psycopg.connect(PG_DSN, connect_timeout=10)
with pg.cursor() as cur:
cur.execute(SCHEMA_SQL)
pg.commit()
print(" Schema OK.")
print("Pripojuji se k MongoDB...")
mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
mongo.admin.command("ping")
db = mongo[MONGO_DB]
print(" MongoDB OK.")
if args.mailbox:
mailboxes = [args.mailbox]
else:
mailboxes = discover_mailboxes(db)
print(f"\nSchranky ke zpracovani ({len(mailboxes)}):")
for mb in mailboxes:
print(f" - {mb}")
if args.index_reset and not args.mailbox:
print(f"\n!!! --index-reset bez --mailbox => SMAZE CELY INDEX ({len(mailboxes)} schranek) !!!")
results = []
for mb in mailboxes:
try:
results.append(process_mailbox(pg, db[mb], mb,
limit=args.limit,
index_reset=args.index_reset))
except Exception as e:
traceback.print_exc()
print(f" FATAL pri zpracovani {mb}: {e}")
results.append({"mailbox": mb, "processed": 0, "ok": 0,
"errors": 1, "skipped": 0, "empty_body": 0})
pg.close()
print("\n" + "="*60)
print("=== SHRNUTI ===")
grand = {"processed": 0, "ok": 0, "errors": 0, "skipped": 0, "empty_body": 0}
for r in results:
print(f" {r['mailbox']:40} processed={r['processed']:>5} ok={r['ok']:>5} "
f"errors={r['errors']:>3} skipped={r['skipped']:>6} empty={r['empty_body']:>4}")
for k in grand:
grand[k] += r.get(k, 0)
print(f" {'TOTAL':40} processed={grand['processed']:>5} ok={grand['ok']:>5} "
f"errors={grand['errors']:>3} skipped={grand['skipped']:>6} empty={grand['empty_body']:>4}")
print(f"\nCelkem trvalo: {time.time() - t0:.1f} s")
print(f"Konec: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# exit code: 0 jen kdyz vsechny schranky probehly bez chyby
return 1 if grand["errors"] > 0 else 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except KeyboardInterrupt:
print("\nPreruseno uzivatelem")
except Exception:
traceback.print_exc()
sys.exit(1)
@@ -0,0 +1,289 @@
# parse_emails_tower_v1.3
## Spuštění
**První spuštění:**
```bash
docker exec -d python-runner bash -c \
"python /scripts/parse_emails_tower_v1.3.py > /scripts/parse_emails_tower.log 2>&1"
```
**Pokračování po přerušení (přeskočí už importované):**
```bash
docker exec -d python-runner bash -c \
"python /scripts/parse_emails_tower_v1.3.py --skip-existing > /scripts/parse_emails_tower.log 2>&1"
```
---
## Stav importu
**Sledování průběhu (live log):**
```bash
docker exec -it python-runner tail -f /scripts/parse_emails_tower.log
```
**Počet emailů v MongoDB:**
```bash
docker exec -it python-runner python -c \
"from pymongo import MongoClient; c=MongoClient('mongodb://192.168.1.76:27017'); print(c['emaily']['vbuzalka@its.jnj.com'].count_documents({}))"
```
---
**Název:** parse_emails_tower_v1.3.py
**Verze:** 1.3
**Datum:** 2026-06-08
**Autor:** vladimir.buzalka
---
## Účel
Import všech `.msg` souborů do MongoDB. Z každého souboru extrahuje **všechny dostupné vlastnosti** — podobně jako EXIF u fotek.
- **DB:** `emaily`
- **Kolekce:** `vbuzalka@its.jnj.com`
- `_id` = Internet Message-ID (nebo `filename:<stem>` jako fallback)
- Bezpečné přerušit a opakovat — upsert podle `_id`
---
## Prostředí
Běží v Docker containeru **python-runner** na **Unraid Tower**.
| Komponenta | Umístění |
|---|---|
| Container | `python-runner` (Docker na Unraid Tower) |
| .msg soubory | `/mnt/user/JNJEMAILS``/mnt/JNJEMAILS` uvnitř containeru |
| Skripty | `/mnt/user/Scripts``/scripts` uvnitř containeru |
| MongoDB | `192.168.1.76:27017` (externí, mimo container) |
---
## Spouštění (z Unraid terminálu)
**Test na 50 emailech:**
```bash
docker exec -it python-runner python /scripts/parse_emails_tower_v1.3.py --limit 50 --no-indexes
```
**Kompletní import na pozadí (log do souboru):**
```bash
docker exec -d python-runner bash -c \
"python /scripts/parse_emails_tower_v1.3.py > /scripts/parse_emails_tower.log 2>&1"
```
**Pokračování po přerušení:**
```bash
docker exec -d python-runner bash -c \
"python /scripts/parse_emails_tower_v1.3.py --skip-existing > /scripts/parse_emails_tower.log 2>&1"
```
**Sledování průběhu (Ctrl+C ukončí sledování, import běží dál):**
```bash
docker exec -it python-runner tail -f /scripts/parse_emails_tower.log
```
### Všechny parametry
| Parametr | Popis |
|---|---|
| `--skip-existing` | Načte seznam hotových souborů z MongoDB a přeskočí je. Použij pro pokračování po přerušení. |
| `--limit N` | Zpracuje jen prvních N souborů. Vhodné pro test. |
| `--no-indexes` | Nevytváří indexy na konci. Použij pokud přerušíš uprostřed — indexy vytvoř ručně až je vše hotové. |
| `--msgs-dir PATH` | Přepíše výchozí cestu k .msg souborům (výchozí: `/mnt/JNJEMAILS`). |
---
## Průběh na konzoli
Každý email na jednom řádku:
```
1/69371 OK RE: Protocol deviation CZ10022 jan.novak@its.jnj.com
2/69371 OK UCO3001: Draft FUL pro DD5-CZ10022 monitor@4gclinical.com
3/69371 ERR ? ?
```
Každých 500 emailů oddělovač s průběhem:
```
────────────────────────────────────────────────────────────────────────────────
Průběh: ok=498 err=2 0.4 msg/s ETA 47h12m
────────────────────────────────────────────────────────────────────────────────
```
Na konci souhrn:
```
====================================================
Vysledek: ok=69300 | skip=0 | err=71
Celkovy cas: 47h 23m 10s
Dokumentu v kolekci: 69300
```
---
## Zdroje dat z každého .msg
| Pole | Popis |
|---|---|
| Předmět, normalized subject | |
| Odesílatel | email, jméno, SMTP adresa |
| Příjemci To/CC/BCC | strukturovaně `[{type, email, name}]` |
| Čas doručení a odeslání | UTC |
| Tělo | plaintext + HTML (max 2 MB) |
| Přílohy | metadata: jméno, velikost, MIME typ, inline flag |
| Internet headers | X-Originating-IP, Received, DKIM, X-Mailer, ... |
| MAPI | důležitost, citlivost, příznak, konverzační vlákno, kategorie |
| In-Reply-To, References | pro rekonstrukci vlákna |
| Raw MAPI properties | `{0xXXXX: value}` |
---
## Hodnotové kódy
| Pole | Hodnota | Význam |
|---|---|---|
| `importance` | 0 | Nízká |
| | 1 | Normální |
| | 2 | Vysoká |
| `sensitivity` | 0 | Normální |
| | 1 | Osobní |
| | 2 | Soukromé |
| | 3 | Důvěrné |
| `flag_status` | 0 | Bez příznaku |
| | 1 | Označeno (follow up) |
| | 2 | Dokončeno |
---
## MongoDB indexy
Automaticky vytvořeny na konci importu (`--no-indexes` přeskočí):
| Index | Pole |
|---|---|
| Chronologický | `received_at`, `sent_at` |
| Odesílatel | `sender.email` |
| Soubor | `filename` (unique) |
| Konverzace | `conversation_topic` |
| Filtry | `has_attachments`, `categories`, `importance`, `flag_status` |
| Full-text | `subject` + `body_text` + `to` + `cc` (text index `text_search`) |
---
## Ukázkové dotazy (MongoDB shell / MCP)
**Emaily o UCO3001 s přílohou:**
```javascript
db["vbuzalka@its.jnj.com"].find({
$text: { $search: "UCO3001" },
has_attachments: true
}).sort({ received_at: -1 })
```
**Emaily od konkrétního odesílatele:**
```javascript
db["vbuzalka@its.jnj.com"].find({
"sender.email": /covance/i
}).sort({ received_at: -1 })
```
**Celé konverzační vlákno:**
```javascript
db["vbuzalka@its.jnj.com"].find({
conversation_topic: "Protocol deviation CZ10022"
}).sort({ received_at: 1 })
```
**Statistiky podle odesílatele (top 20):**
```javascript
db["vbuzalka@its.jnj.com"].aggregate([
{ $group: { _id: "$sender.email", count: { $sum: 1 } } },
{ $sort: { count: -1 } },
{ $limit: 20 }
])
```
---
## Chybový log
Soubory které selhaly jsou zalogovány do **samostatného** `parse_emails_tower_errors.log` vedle skriptu (tj. `/scripts/parse_emails_tower_errors.log``\\tower\Scripts\parse_emails_tower_errors.log`). Tento log je oddělený od Graph importu, aby v něm nebyl bordel:
```
2026-06-08 12:40:33 | open failed [7A3F...0000.msg]: <důvod>
2026-06-08 12:41:02 | per-dokument selhal [_id=<...>]: <důvod>
```
Stdout (průběh) jde do `parse_emails_tower.log` — rovněž samostatný.
---
## Záchrana problémových .msg (v1.3)
Některé `.msg` defaultní `extract_msg` neumí otevřít a celý soubor zahodí, **i když email je naprosto v pořádku** (jde otevřít v Outlooku). Tři příčiny a jejich řešení:
| Příčina | Příklad | Řešení |
|---|---|---|
| Vadná příloha bez `PR_ATTACH_METHOD` | „Attachment method missing" | `errorBehavior=SUPPRESS_ALL` — vadnou přílohu přeskočí, zbytek (tělo, ostatní přílohy) načte |
| Tělo deklaruje codepage 1200 (UTF-16), ale bajty jsou cp1250/gb2312 | české `` místo diakritiky | raw-OLE čtení + kaskádové dekódování |
| Vnořený email (Outlook item) | „not an MSG file", `extract_msg` vrátí prázdno | raw-OLE čtení klíčových MAPI streamů |
**Jak to funguje:**
1. `open_message()` — kaskádové otevření: `normal``SUPPRESS_ALL``+overrideEncoding` (dle codepage property).
2. **raw-OLE fallback** — když extract_msg vrátí prázdno/`` nebo musel hádat kódování, klíčová pole (subject, sender, body, html) se dočtou **přímo z OLE streamů** (`__substg1.0_0037`/`0C1A`/`5D01`/`1000`/`1013`) s kaskádovým dekódováním:
```
utf-8 (strict) → kódování dle CPID → cp1250 → cp1252 → gb2312 → latin-1
```
Hlavičkám o kódování se **nevěří** (často si protiřečí); bere se první kódování, které projde striktně bez chyby. `utf-8 strict` je silný rozlišovač.
**Nová pole v dokumentu:**
| Pole | Význam |
|---|---|
| `parse_mode` | `normal` / `suppress_all` / `override:<enc>` — jak byl soubor otevřen |
| `parse_degraded` | `true` = byl potřeba fallback (vadná příloha nebo hádané kódování) |
**Ověřeno:** všech 126 dříve selhaných souborů z běhu 8.6. se obnoví čistě (74× `suppress_all`, 52× `override:cp1250`), 0 prázdných, 0 s ``.
Dohledání degradovaných:
```javascript
db["vbuzalka@its.jnj.com"].find({ parse_degraded: true })
```
---
## Výkon
| Parametr | Hodnota |
|---|---|
| Počet souborů | ~69 000 |
| Rychlost | ~0.4 msg/s (htmlBody dekódování) |
| Odhadovaný čas | 48 hodin |
| Batch size | 200 dokumentů / bulk_write |
| Odhadovaná velikost DB | 25 GB |
---
## Závislosti (v Docker image python-runner)
```
extract-msg==0.55.0
olefile
pymongo
python-dateutil
```
Image sestaven z `Dockerfile` v `/mnt/user/Scripts/python-runner/`.
---
## Historie verzí
| Verze | Datum | Změna |
|---|---|---|
| 1.0 | 2026-06-01 | Iniciální verze |
| 1.1 | 2026-06-02 | Nasazení na Unraid Tower v Docker containeru python-runner; MSGS_DIR změněno z SMB share (`\\tower\JNJEMAILS`) na lokální mount (`/mnt/JNJEMAILS`); aktualizován popis spouštění pro `docker exec` |
| 1.2 | 2026-06-08 | **Oprava `to_bson`:** int mimo rozsah int64 (BSON umí jen 8-byte ints) se převede na string — dřív celý `bulk_write` spadl na `MongoDB can only handle up to 8-byte ints` a zahodil celou dávku 200 dokumentů (běh v1.1 z 8.6. neuložil **nic**). `flush()` má fallback per-dokument (vadný záznam zahodí sám, ne celou dávku). `bool()` testován před `int()`. Samostatné logy `parse_emails_tower.log` + `parse_emails_tower_errors.log`. |
| 1.3 | 2026-06-08 | **Záchrana dříve selhaných .msg** (cca 126 z běhu 8.6.): `open_message()` kaskádové otevření (`normal`→`SUPPRESS_ALL`→`+overrideEncoding`) řeší vadné přílohy i „not an MSG file"; **raw-OLE fallback** dočítá subject/sender/body/html přímo z OLE streamů s kaskádovým dekódováním (utf-8 strict→CPID→cp1250…), když extract_msg vrátí prázdno/``. Nová pole `parse_mode`, `parse_degraded`. Nová závislost `olefile`. Ověřeno: 126/126 obnoveno čistě. |
@@ -0,0 +1,896 @@
"""
parse_emails_tower_v1.3.py
Nazev: parse_emails_tower_v1.3.py
Verze: 1.3
Datum: 2026-06-08
Autor: vladimir.buzalka
Popis:
Parsuje vsechny .msg soubory z MSGS_DIR a importuje je jako dokumenty
do MongoDB. Z kazdeho souboru extrahuje VSECHNY dostupne vlastnosti —
podobne jako EXIF u fotek:
- predmet, odesilatel, prijemci (To/CC/BCC s typy)
- cas doruceni a odeslani (UTC)
- telo plaintext + HTML (max 2 MB)
- prilohy (metadata: jmeno, velikost, MIME typ, inline flag)
- internet headers (X-Originating-IP, Received, DKIM, ...)
- MAPI vlastnosti: dulezitost, citlivost, priznak, konverzacni vlakno,
kategorie, In-Reply-To, References, ...
- vsechny raw MAPI properties jako {0xXXXX: value}
DB: emaily
Kolekce: vbuzalka@its.jnj.com
_id: Internet Message-ID (nebo "filename:<stem>" jako fallback)
Bezpecne prerusit a opakovat:
- upsert podle _id — duplicity se automaticky prepisi
- --skip-existing nacte seznam hotovych souboru z MongoDB a
preskoci je => pokracovani po preruseni bez ztraty prace
Prostredi:
Bezi v Docker containeru "python-runner" na Unraid Tower.
.msg soubory jsou dostupne jako lokalni disk (volume mount):
/mnt/user/JNJEMAILS -> /mnt/JNJEMAILS (uvnitr containeru)
MongoDB na 192.168.1.76:27017 (externi, bezi mimo container).
Spousteni (z Unraid terminalu):
# Test na 50 emailech:
docker exec -it python-runner python /scripts/parse_emails_tower_v1.3.py --limit 50 --no-indexes
# Kompletni import na pozadi (samostatny log, ne sdileny s Graph importem):
docker exec -d python-runner bash -c \
"python /scripts/parse_emails_tower_v1.3.py > /scripts/parse_emails_tower.log 2>&1"
# Pokracovani po preruseni:
docker exec -d python-runner bash -c \
"python /scripts/parse_emails_tower_v1.3.py --skip-existing > /scripts/parse_emails_tower.log 2>&1"
# Sledovani prubehu:
docker exec -it python-runner tail -f /scripts/parse_emails_tower.log
Vystup na konzoli:
Kazdy email na jednom radku:
<poradi>/<celkem> OK/ERR <predmet 60 znaku> <odesilatel>
Kazych 500 emailu: oddelovac s prubehem, rychlosti a ETA.
Na konci: souhrn ok/skip/err, celkovy cas, pocet dokumentu v kolekci.
Zavislosti (nainstalovane v Docker image python-runner):
extract-msg==0.55.0, olefile, pymongo, python-dateutil
Python 3.12, Linux (Docker container na Unraid Tower)
(olefile je tranzitivni zavislost extract-msg, raw-OLE fallback ji pouziva primo)
Struktura dokumentu v MongoDB:
_id Internet Message-ID (nebo filename: fallback)
filename jmeno .msg souboru (20znakovy hex + .msg)
subject predmet zpravy
normalized_subject predmet bez RE:/FW: prefixu
importance 0=nizka 1=normalni 2=vysoka
sensitivity 0=normalni 1=osobni 2=soukrome 3=duverne
flag_status 0=bez priznaku 1=oznaceno 2=dokonceno
read_receipt_requested bool
delivery_receipt_requested bool
has_attachments bool
attachment_count int
message_size_bytes velikost .msg souboru na disku
conversation_topic tema vlakna (PR_CONVERSATION_TOPIC)
conversation_index base64 PR_CONVERSATION_INDEX
in_reply_to Message-ID predchozi zpravy
internet_references [Message-ID] — cela historia vlakna
categories [str] — MAPI kategorie / stitky
read_receipt_requested bool
delivery_receipt_requested bool
received_at datetime UTC — cas doruceni
sent_at datetime UTC — cas odeslani
sender.email emailova adresa odesilatele
sender.name zobrazovane jmeno odesilatele
sender.smtp SMTP adresa (pro interni EX adresy)
to retezec To (tak jak v Outlooku)
cc retezec CC
bcc retezec BCC
display_to PR_DISPLAY_TO (zkraceny seznam)
display_cc PR_DISPLAY_CC
recipients [{type, email, name}] — to/cc/bcc s typy
body_text plain text telo
body_html HTML telo (max 2 MB, None pokud neni)
attachments [{filename, size_bytes, mime_type,
content_id, is_inline}]
headers dict internet headers (lowercase_s_podtrzitky)
mapi dict vsech raw MAPI properties {0xXXXX: value}
parsed_at datetime UTC — cas parsovani
Indexy (vytvoreny automaticky na konci):
received_at, sent_at, sender.email, filename (unique),
conversation_topic, has_attachments, categories, importance,
flag_status, text_search (subject + body_text + to + cc)
Chyby:
Soubory ktere selhaly jsou zalogovany do parse_emails_tower_errors.log
v adresari skriptu (SAMOSTATNY log, oddeleny od Graph importu).
Radek: timestamp | open/extract failed | duvod.
Historie verzi:
1.0 2026-06-01 Inicialni verze
1.1 2026-06-02 Nasazeni na Unraid Tower v Docker containeru python-runner;
MSGS_DIR zmeneno z SMB share na lokalni mount /mnt/JNJEMAILS;
aktualizovany popis spousteni pro docker exec
1.2 2026-06-08 OPRAVA: to_bson prevadi int mimo rozsah int64 na string
(BSON umi jen 8-byte ints) — drive cely bulk_write spadl na
'MongoDB can only handle up to 8-byte ints' a zahodil celou
davku 200 dokumentu (v1.1 beh 8.6. neulozil NIC).
flush() ma fallback per-dokument: vadny zaznam zahodi sam,
ne celou davku. bool() testovan pred int().
Samostatny error log parse_emails_tower_errors.log a
stdout log parse_emails_tower.log (drive sdilene s Graph
importem — bordel v logu).
1.3 2026-06-08 ZACHRANA drive selhavajicich .msg (cca 126 z behu 8.6.):
- open_message(): kaskadove otevreni
normal -> SUPPRESS_ALL (vadne prilohy) -> +overrideEncoding
Resi 'Attachment method missing' i 'not an MSG file'.
- raw-OLE fallback: kdyz extract_msg vrati prazdno/ (vnoreny
email, codepage 1200 lze byt cp1250/gb2312), klicova pole
(subject/sender/body/html) se doctou PRIMO z OLE streamu
s kaskadovym dekodovanim (utf-8 strict -> CPID -> cp1250 ...).
Hlavickam o kodovani se neveri (casto si protireci).
- nova pole: parse_mode (normal/suppress_all/override:ENC),
parse_degraded (bool).
"""
import sys
import re
import logging
import argparse
import base64
import struct
from pathlib import Path
from datetime import datetime, timezone
from typing import Optional
import extract_msg
from extract_msg.enums import ErrorBehavior
import olefile
from dateutil import parser as dtparser
from pymongo import MongoClient, UpdateOne, ASCENDING, TEXT
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
# ─── KONFIGURACE ──────────────────────────────────────────────────────────────
MSGS_DIR = Path("/mnt/JNJEMAILS")
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "emaily"
MONGO_COL = "vbuzalka@its.jnj.com"
BATCH_SIZE = 200
LOG_FILE = Path(__file__).parent / "parse_emails_tower_errors.log"
SCRIPT_VERSION = "1.2"
# ──────────────────────────────────────────────────────────────────────────────
logging.basicConfig(
filename=str(LOG_FILE),
level=logging.ERROR,
format="%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
encoding="utf-8",
)
# ─── Pomocné funkce ───────────────────────────────────────────────────────────
def safe(obj, *attrs, default=None):
"""Bezpecne cteni atributu — vrati prvni non-None hodnotu."""
for attr in attrs:
try:
val = getattr(obj, attr, None)
if val is None:
continue
if isinstance(val, str) and not val.strip():
continue
return val
except Exception:
continue
return default
def parse_date(raw) -> Optional[datetime]:
"""Libovolny datum -> UTC datetime bez tzinfo (pro MongoDB)."""
if raw is None:
return None
if isinstance(raw, datetime):
if raw.tzinfo:
return raw.astimezone(timezone.utc).replace(tzinfo=None)
return raw
try:
dt = dtparser.parse(str(raw))
if dt.tzinfo:
return dt.astimezone(timezone.utc).replace(tzinfo=None)
return dt
except Exception:
return None
_INT64_MIN, _INT64_MAX = -(2 ** 63), 2 ** 63 - 1
def to_bson(val):
"""Konvertuje hodnotu na BSON-serializovatelny typ.
Pozor: BSON umi jen signed int64. Python ma neomezene integery, takze
velke MAPI hodnoty (PR_CHANGE_KEY, FILETIME, 64-bit handle) mimo rozsah
int64 prevadime na string — jinak cely bulk_write spadne na
'MongoDB can only handle up to 8-byte ints'.
"""
# bool musi byt PRED int (isinstance(True, int) == True)
if isinstance(val, bool):
return val
if isinstance(val, bytes):
return val.hex() if len(val) <= 128 else f"<bytes:{len(val)}>"
if isinstance(val, datetime):
return parse_date(val)
if isinstance(val, int):
return val if _INT64_MIN <= val <= _INT64_MAX else str(val)
if isinstance(val, (str, float, type(None))):
return val
if isinstance(val, list):
return [to_bson(v) for v in val]
try:
iv = int(val)
return iv if _INT64_MIN <= iv <= _INT64_MAX else str(iv)
except Exception:
pass
return str(val)
# ─── Extrakce částí zprávy ────────────────────────────────────────────────────
def extract_headers(msg) -> dict:
headers = {}
try:
hdr = msg.header
if not hdr:
return {}
from email.header import decode_header as _dh
def _decode(v: str) -> str:
try:
parts = _dh(v)
out = ""
for part, enc in parts:
out += part.decode(enc or "utf-8", errors="replace") if isinstance(part, bytes) else part
return out
except Exception:
return v
for key in set(hdr.keys()):
k = key.lower().replace("-", "_")
vals = [_decode(v) for v in hdr.get_all(key, [])]
headers[k] = vals if len(vals) > 1 else (vals[0] if vals else "")
except Exception as e:
logging.error("extract_headers: %s", e)
return headers
def extract_recipients(msg) -> list:
result = []
type_map = {1: "to", 2: "cc", 3: "bcc"}
try:
for r in msg.recipients:
rtype = getattr(r, "type", 1)
try:
rtype = int(rtype)
except Exception:
try:
rtype = int(rtype.value)
except Exception:
rtype = 1
rec = {
"type": type_map.get(rtype, "to"),
"email": safe(r, "email", default=""),
"name": safe(r, "name", default=""),
}
result.append(rec)
except Exception as e:
logging.error("extract_recipients: %s", e)
return result
def extract_attachments(msg) -> list:
result = []
try:
for att in msg.attachments:
fname = safe(att, "longFilename", "shortFilename", default="")
if not fname:
continue
size = 0
try:
d = att.data
size = len(d) if d else 0
except Exception:
pass
result.append({
"filename": fname,
"size_bytes": size,
"mime_type": safe(att, "mimetype", "mimeType", default="application/octet-stream"),
"content_id": safe(att, "cid", default=None),
"is_inline": bool(safe(att, "isInline", default=False)),
})
except Exception as e:
logging.error("extract_attachments: %s", e)
return result
def extract_mapi_props(msg) -> dict:
"""Vsechny raw MAPI properties jako {0xXXXX: value}."""
result = {}
try:
props = msg.props
if not hasattr(props, "items"):
return {}
for key, prop in props.items():
try:
val = to_bson(prop.value)
prop_id = f"0x{key[:4].upper()}" if len(key) >= 4 else f"0x{key.upper()}"
result[prop_id] = val
except Exception:
pass
except Exception as e:
logging.error("extract_mapi_props: %s", e)
return result
# ─── Tolerantní otevírání a raw-OLE fallback ─────────────────────────────────
#
# Nektere .msg extract_msg neumi: (a) vadna priloha bez PR_ATTACH_METHOD,
# (b) telo deklaruje codepage 1200 (UTF-16) ale bajty jsou cp1250/gb2312,
# (c) vnoreny email ("not an MSG file") — extract_msg vrati prazdne pole.
# Data v souboru ale jsou. Otevreme tolerantne a degradovana textova pole
# docteme PRIMO z OLE streamu s kaskadovym dekodovanim (hlavickam se neveri).
# Windows codepage -> python codec (PR_INTERNET_CPID / PR_MESSAGE_CODEPAGE)
_CPID_TO_CODEC = {
1250: "cp1250", 1251: "cp1251", 1252: "cp1252", 1253: "cp1253",
1254: "cp1254", 1255: "cp1255", 1256: "cp1256", 1257: "cp1257",
1258: "cp1258", 874: "cp874", 932: "shift_jis", 936: "gb2312",
949: "euc_kr", 950: "big5", 65001: "utf-8", 28591: "iso-8859-1",
28592: "iso-8859-2", 20127: "ascii",
}
def _read_u32_prop(ole, propid):
"""Precte 32-bit hodnotu MAPI property z top-level __properties_version1.0."""
try:
data = ole.openstream("__properties_version1.0").read()
except Exception:
return None
body = data[32:] # 32-bajtova hlavicka top-level property streamu
for i in range(0, len(body) - 16 + 1, 16):
rec = body[i:i + 16]
tag = struct.unpack("<I", rec[0:4])[0]
if ((tag >> 16) & 0xFFFF) == propid:
return struct.unpack("<I", rec[8:12])[0]
return None
def _detect_cpid(ole) -> Optional[str]:
"""Codec dle PR_INTERNET_CPID / PR_MESSAGE_CODEPAGE (jako napoveda, ne dogma)."""
for pid in (0x3FDE, 0x3FFD): # INTERNET_CPID, MESSAGE_CODEPAGE
codec = _CPID_TO_CODEC.get(_read_u32_prop(ole, pid))
# utf-8/ascii nejsou dobry hint pro 8-bit stream (casto lzou)
if codec and codec not in ("utf-8", "ascii"):
return codec
return None
def _cascade_decode(raw: bytes, is_unicode: bool, cpid_codec: Optional[str]) -> str:
"""Dekoduje bajty MAPI stringu. Hlavickam se neveri — zkousime striktne
v poradi priorit a vezmeme prvni, co projde bez chyby."""
if not raw:
return ""
if is_unicode: # PT_UNICODE = utf-16-le
try:
return raw.decode("utf-16-le")
except Exception:
return raw.decode("utf-16-le", errors="replace")
order = ["utf-8"] # utf-8 strict = silny rozlisovac
if cpid_codec:
order.append(cpid_codec)
order += ["cp1250", "cp1252", "gb2312", "big5"]
for enc in order:
try:
return raw.decode(enc, errors="strict")
except Exception:
continue
return raw.decode("latin-1", errors="replace") # nikdy nespadne
def _raw_mapi_strings(msg_path: Path) -> dict:
"""Cte klicova textova MAPI pole PRIMO z OLE (mimo extract_msg).
Pouzije se jen kdyz extract_msg vrati degradovane pole."""
out = {"subject": "", "normalized_subject": "", "sender_name": "",
"sender_email": "", "sender_smtp": "", "body_text": "", "body_html": ""}
try:
ole = olefile.OleFileIO(str(msg_path))
except Exception:
return out
try:
cpid = _detect_cpid(ole)
wanted = { # MAPI tag -> klic v out
"0037": "subject", "0E1D": "normalized_subject",
"0C1A": "sender_name", "5D01": "sender_smtp",
"0C1F": "sender_email", "1000": "body_text", "1013": "body_html",
}
prefix = "__substg1.0_"
found = {} # key -> (priorita_typu, hodnota)
for entry in ole.listdir():
if len(entry) != 1: # jen top-level (ne vnorene zpravy)
continue
name = entry[0]
if not name.startswith(prefix):
continue
tag = name[len(prefix):len(prefix) + 4].upper()
key = wanted.get(tag)
if not key:
continue
typ = name[-4:].upper()
prio = {"001F": 3, "001E": 2, "0102": 1}.get(typ, 0)
if prio == 0:
continue
prev = found.get(key)
if prev and prev[0] >= prio: # preferuj unicode > ansi > binarni
continue
try:
raw = ole.openstream(entry).read()
val = _cascade_decode(raw, typ == "001F", cpid)
except Exception:
continue
found[key] = (prio, val)
for key, (_, val) in found.items():
out[key] = val
finally:
ole.close()
return out
def _degraded(s) -> bool:
"""Pole je degradovane: prazdne nebo obsahuje U+FFFD (nahradni znak)."""
return (not s) or ("" in s)
def open_message(msg_path: Path):
"""Kaskadove otevreni .msg -> (msg, mode) nebo (None, None).
normal bezna cesta
suppress_all tolerantni k vadnym prilohum
override:ENC tolerantni + vnuceny encoding dle codepage property
"""
try:
return extract_msg.Message(str(msg_path)), "normal"
except Exception:
pass
try:
return extract_msg.Message(
str(msg_path), errorBehavior=ErrorBehavior.SUPPRESS_ALL), "suppress_all"
except Exception:
pass
encs = []
try:
ole = olefile.OleFileIO(str(msg_path))
c = _detect_cpid(ole)
ole.close()
if c:
encs.append(c)
except Exception:
pass
for e in encs + ["cp1250", "cp1252"]:
try:
return extract_msg.Message(
str(msg_path), errorBehavior=ErrorBehavior.SUPPRESS_ALL,
overrideEncoding=e), f"override:{e}"
except Exception:
continue
return None, None
# ─── Hlavní extrakce ─────────────────────────────────────────────────────────
def extract_message(msg_path: Path) -> Optional[dict]:
"""Parsuje jeden .msg soubor -> MongoDB dokument."""
msg, parse_mode = open_message(msg_path)
if msg is None:
logging.error("open failed [%s]: vsechny pokusy o otevreni selhaly", msg_path.name)
return None
try:
# ── Message-ID ────────────────────────────────────────────────
mid = None
for attr in ("messageId", "message_id", "internetMessageId"):
mid = safe(msg, attr)
if mid:
break
if not mid:
mid = f"filename:{msg_path.stem}"
mid = str(mid).strip()
# ── Předmět ───────────────────────────────────────────────────
try:
subject = msg.subject or ""
except Exception:
subject = ""
normalized_subject = safe(msg, "normalizedSubject", "normalized_subject", default="")
# ── Tělo ──────────────────────────────────────────────────────
try:
body_text = msg.body or ""
except Exception:
body_text = ""
body_html = None
try:
bh = msg.htmlBody
if isinstance(bh, bytes):
bh = bh.decode("utf-8", errors="replace")
if bh:
body_html = bh if len(bh) <= 2 * 1024 * 1024 else bh[:2 * 1024 * 1024]
except Exception:
pass
# ── Odesílatel ────────────────────────────────────────────────
try:
sender_email = msg.sender or ""
except Exception:
sender_email = ""
sender_name = safe(msg, "senderName", "sender_name", default="")
sender_smtp = safe(msg, "senderSmtpAddress", "sent_representing_smtp_address", default="")
# ── Příjemci ──────────────────────────────────────────────────
recipients = extract_recipients(msg)
try:
to_raw = msg.to or ""
except Exception:
to_raw = ""
try:
cc_raw = msg.cc or ""
except Exception:
cc_raw = ""
try:
bcc_raw = getattr(msg, "bcc", None) or ""
except Exception:
bcc_raw = ""
display_to = safe(msg, "displayTo", "display_to", default="")
display_cc = safe(msg, "displayCc", "display_cc", default="")
# ── Časy ──────────────────────────────────────────────────────
try:
received_at = parse_date(msg.date)
except Exception:
received_at = None
sent_at = None
for attr in ("clientSubmitTime", "client_submit_time", "sentOn"):
v = safe(msg, attr)
if v:
sent_at = parse_date(v)
break
# ── MAPI vlastnosti ───────────────────────────────────────────
importance = 1
try:
v = msg.importance
if v is not None:
importance = int(v)
except Exception:
pass
sensitivity = 0
try:
v = getattr(msg, "sensitivity", None)
if v is not None:
sensitivity = int(v)
except Exception:
pass
flag_status = 0
try:
v = safe(msg, "flagStatus", "flag_status")
if v is not None:
flag_status = int(v)
except Exception:
pass
conversation_topic = safe(msg, "conversationTopic", "conversation_topic", default="")
conversation_index = ""
try:
ci = safe(msg, "conversationIndex", "conversation_index")
if isinstance(ci, bytes):
conversation_index = base64.b64encode(ci).decode()
elif ci:
conversation_index = str(ci)
except Exception:
pass
in_reply_to = safe(msg, "inReplyTo", "in_reply_to", default="")
internet_refs = []
try:
refs = safe(msg, "internetReferences", "internet_references")
if isinstance(refs, list):
internet_refs = refs
elif isinstance(refs, str) and refs:
internet_refs = [r.strip() for r in refs.split() if r.strip()]
except Exception:
pass
categories = []
try:
cats = safe(msg, "categories")
if isinstance(cats, list):
categories = [str(c) for c in cats if c]
elif isinstance(cats, str) and cats:
categories = [c.strip() for c in re.split(r"[;,]", cats) if c.strip()]
except Exception:
pass
read_receipt = bool(safe(msg, "readReceiptRequested", "read_receipt_requested", default=False))
delivery_receipt = bool(safe(msg, "deliveryReceiptRequested", "delivery_receipt_requested", default=False))
# ── Internet headers ──────────────────────────────────────────
headers = extract_headers(msg)
if not in_reply_to:
in_reply_to = headers.get("in_reply_to", "")
if not internet_refs:
refs_str = headers.get("references", "")
if isinstance(refs_str, str) and refs_str:
internet_refs = [r.strip() for r in refs_str.split() if r.strip()]
# ── Přílohy ───────────────────────────────────────────────────
attachments = extract_attachments(msg)
# ── Raw MAPI ──────────────────────────────────────────────────
mapi_raw = extract_mapi_props(msg)
msg.close()
# ── Raw-OLE fallback pro degradovana textova pole ─────────────
# Kdyz extract_msg vratil prazdno/ nebo musel hadat encoding
# (override/suppress), docteme klicova pole primo z OLE streamu
# kaskadovym dekodovanim — spolehlivejsi nez jeden vnuceny encoding.
parse_degraded = parse_mode != "normal"
# v non-normal modu byl encoding hadany -> raw kaskade se veri vic
forced = parse_mode != "normal"
if (forced or _degraded(subject) or _degraded(body_text)
or _degraded(sender_email) or (body_html and "" in body_html)):
raw = _raw_mapi_strings(msg_path)
if raw["subject"] and (forced or _degraded(subject)):
subject = raw["subject"]
if raw["normalized_subject"] and (forced or _degraded(normalized_subject)):
normalized_subject = raw["normalized_subject"]
if raw["body_text"] and (forced or _degraded(body_text)):
body_text = raw["body_text"]
if raw["body_html"] and (forced or not body_html or "" in body_html):
bh = raw["body_html"]
body_html = bh if len(bh) <= 2 * 1024 * 1024 else bh[:2 * 1024 * 1024]
if (raw["sender_smtp"] or raw["sender_email"]) and (forced or _degraded(sender_email)):
sender_email = raw["sender_smtp"] or raw["sender_email"]
if raw["sender_name"] and (forced or _degraded(sender_name)):
sender_name = raw["sender_name"]
if raw["sender_smtp"] and not sender_smtp:
sender_smtp = raw["sender_smtp"]
# ── Dokument ──────────────────────────────────────────────────
return {
"_id": mid,
"filename": msg_path.name,
"subject": subject,
"normalized_subject": normalized_subject,
"importance": importance,
"sensitivity": sensitivity,
"flag_status": flag_status,
"read_receipt_requested": read_receipt,
"delivery_receipt_requested": delivery_receipt,
"has_attachments": len(attachments) > 0,
"attachment_count": len(attachments),
"message_size_bytes": msg_path.stat().st_size,
"conversation_topic": conversation_topic,
"conversation_index": conversation_index,
"in_reply_to": in_reply_to,
"internet_references": internet_refs,
"categories": categories,
"received_at": received_at,
"sent_at": sent_at,
"sender": {
"email": sender_email,
"name": sender_name,
"smtp": sender_smtp,
},
"to": to_raw,
"cc": cc_raw,
"bcc": bcc_raw,
"display_to": display_to,
"display_cc": display_cc,
"recipients": recipients,
"body_text": body_text,
"body_html": body_html,
"attachments": attachments,
"headers": headers,
"mapi": mapi_raw,
"parse_mode": parse_mode, # normal / suppress_all / override:ENC
"parse_degraded": parse_degraded, # True = pouzit fallback (vadna priloha/encoding)
"parsed_at": datetime.now(timezone.utc).replace(tzinfo=None),
}
except Exception as e:
logging.error("extract_message failed [%s]: %s", msg_path.name, e)
return None
# ─── MongoDB indexy ───────────────────────────────────────────────────────────
def create_indexes(col):
print(" Vytvarim indexy...")
col.create_index([("received_at", ASCENDING)])
col.create_index([("sent_at", ASCENDING)])
col.create_index([("sender.email", ASCENDING)])
col.create_index([("filename", ASCENDING)], unique=True, sparse=True)
col.create_index([("conversation_topic", ASCENDING)])
col.create_index([("has_attachments", ASCENDING)])
col.create_index([("categories", ASCENDING)])
col.create_index([("importance", ASCENDING)])
col.create_index([("flag_status", ASCENDING)])
col.create_index([
("subject", TEXT),
("body_text", TEXT),
("to", TEXT),
("cc", TEXT),
], name="text_search", default_language="none")
print(" Indexy hotovy.")
# ─── MAIN ─────────────────────────────────────────────────────────────────────
def main():
ap = argparse.ArgumentParser(description=f"parse_emails v{SCRIPT_VERSION}")
ap.add_argument("--msgs-dir", default=str(MSGS_DIR),
help="Cesta k .msg souborum")
ap.add_argument("--limit", type=int, default=0,
help="Zpracovat max N souboru (0 = vse)")
ap.add_argument("--skip-existing", action="store_true",
help="Preskocit soubory ktere jiz jsou v MongoDB (pokracovani)")
ap.add_argument("--no-indexes", action="store_true",
help="Nevytvorit indexy na konci")
args = ap.parse_args()
msgs_dir = Path(args.msgs_dir)
start = datetime.now()
print(f"=== parse_emails v{SCRIPT_VERSION} ===")
print(f"Start: {start.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Zdroj: {msgs_dir}")
print(f"MongoDB: {MONGO_URI} -> {MONGO_DB}.{MONGO_COL}")
# MongoDB
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
try:
client.admin.command("ping")
print(" MongoDB OK")
except Exception as e:
print(f" CHYBA: MongoDB neni dostupna -- {e}")
sys.exit(1)
col = client[MONGO_DB][MONGO_COL]
# Skip existing — nacti seznam uz importovanych souboru
existing: set = set()
if args.skip_existing:
print(" Nacitam existujici zaznamy z MongoDB...")
existing = set(col.distinct("filename"))
print(f" {len(existing)} jiz importovano")
# Scan
print(f"\nSkenuji {msgs_dir} ...")
all_files = sorted(msgs_dir.glob("*.msg"))
if args.limit:
all_files = all_files[:args.limit]
to_process = [f for f in all_files if f.name not in existing]
skipped = len(all_files) - len(to_process)
total = len(to_process)
print(f" Celkem .msg: {len(all_files)}")
print(f" Preskoceno: {skipped}")
print(f" Ke zpracovani: {total}\n")
if total == 0:
print("Neni co importovat.")
client.close()
return
batch = []
ok_count = 0
err_count = 0
def flush():
nonlocal ok_count, err_count
if not batch:
return
try:
col.bulk_write(batch, ordered=False)
except Exception as e:
# Cely batch spadl (typicky jeden vadny dokument). Zkusime
# ho zapsat dokument po dokumentu, aby chyba zahodila jen
# skutecne vadny zaznam, ne celych BATCH_SIZE.
logging.error("bulk_write spadl (%s) -- prepinam na per-dokument", e)
print(f" CHYBA bulk_write: {e} -- zkousim per-dokument")
for op in batch:
try:
col.bulk_write([op], ordered=False)
except Exception as e2:
try:
bad_id = getattr(op, "_filter", {}).get("_id", "?")
except Exception:
bad_id = "?"
logging.error("per-dokument selhal [_id=%s]: %s", bad_id, e2)
print(f" ZAHOZEN _id={bad_id}: {e2}")
ok_count -= 1
err_count += 1
batch.clear()
for i, msg_path in enumerate(to_process, 1):
doc = extract_message(msg_path)
if doc is None:
err_count += 1
else:
batch.append(UpdateOne({"_id": doc["_id"]}, {"$set": doc}, upsert=True))
ok_count += 1
if len(batch) >= BATCH_SIZE:
flush()
# Výpis každého emailu
status = "ERR " if doc is None else "OK "
subject_str = (doc.get("subject") or "")[:60] if doc else "?"
sender_str = (doc.get("sender", {}).get("email") or "")[:40] if doc else "?"
print(f" {i:>6}/{total} {status} {subject_str:<60} {sender_str}")
if i % 500 == 0:
elapsed = (datetime.now() - start).total_seconds()
rate = i / elapsed if elapsed > 0 else 0
eta_s = int((total - i) / rate) if rate > 0 else 0
print(f" {''*80}")
print(f" Průběh: ok={ok_count} err={err_count} "
f"{rate:.1f} msg/s ETA {eta_s//3600}h{(eta_s%3600)//60}m")
print(f" {''*80}")
flush()
elapsed_total = (datetime.now() - start).total_seconds()
print(f"\n{'='*52}")
print(f"Vysledek: ok={ok_count} | skip={skipped} | err={err_count}")
print(f"Celkovy cas: {int(elapsed_total//3600)}h {int((elapsed_total%3600)//60)}m {int(elapsed_total%60)}s")
print(f"Dokumentu v kolekci: {col.count_documents({})}")
if not args.no_indexes:
print()
create_indexes(col)
print(f"\nKonec: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
if err_count:
print(f"Chyby logovany do: {LOG_FILE}")
client.close()
if __name__ == "__main__":
main()
+80
View File
@@ -0,0 +1,80 @@
# jnj_tower_ingest v1.1.0
**Soubor:** `jnj_tower_ingest_v1.1.py`
**Datum:** 2026-06-10
**Autor:** vladimir.buzalka
**Běží:** Docker kontejner `python-runner` na Unraid Tower (192.168.1.76), u MongoDB.
## Co to je
Sjednocený **Tower-side ingest** JNJ e-mailů — tři dříve oddělené části v jednom běhu:
| Fáze | Dříve samostatně | Co dělá |
|---|---|---|
| **1. PARSE** | `parse_emails_tower_v1.3.py` | `.msg` z `/mnt/JNJEMAILS` → dokument v Mongo `emaily."vbuzalka@its.jnj.com"` (tělo, přílohy, hlavičky, MAPI). Inkrementálně přes **mtime watermark** (`jnj_sync_state`/`_id="parse_state"`). |
| **2. SYNC** | `sync_jnj_state_v1.0.py` | nejnovější SQLite (read-only) → zrcadlo `jnj_messages` + doplnění `jnj_folder`/stavu do `emaily`. Watermark `updated_at` + zkratka `last_db`. |
| **3. ENRICH** | `jnj_emails_to_fulltext_v1.0.py` | doindexuje JNJ schránku do **PG fulltextu** zavoláním **sdíleného** `5_enrich_fulltext_emails_vX.Y.py --mailbox vbuzalka@its.jnj.com` (stejný extractor jako Graph pipeline → konzistentní schéma). |
**Pořadí: parse → sync → enrich.** Čerstvě naparsovaný mail dostane v jednom běhu tělo
(parse) + cestu (sync) + fulltext (enrich). Klíč všude = Internet Message-ID = Mongo `_id`.
## Inkrementálnost (cron každých 5 min)
- **PARSE** — jen `.msg` s `mtime > parse_state.last_parse_mtime`. 1. běh = seed dle
filename v Mongu, pak čistě mtime. `--full` reparsuje vše. Indexy jen při full/seed/`--reindex`.
- **SYNC** — watermark `updated_at` + zkratka `last_db` (stejná SQLite → no-op).
- **ENRICH** — spustí se **jen když parse přidal nové dokumenty** (jinak přeskočí — JNJ
stejně enrichuje hlavní Graph pipeline v 6:00/18:00). Verze enrich se **auto-detekuje**
(nejnovější `/scripts/5_enrich_fulltext_emails_v*.py`). `--no-enrich` vypne,
`--enrich-always` vynutí.
Tři nezávislé události (nová `.msg` / nová `.db` / nové doc pro PG) → skript udělá jen to,
co má práci; jinak levný no-op.
## Vztah ke Graph pipeline
Hlavní `0_run_pipeline` (Graph API) zpracovává schránky buzalka.cz a **JNJ přeskakuje**
(`SKIP_MAILBOXES`, žádné API). JNJ řeší tenhle skript přes `.msg`. Obě cesty ústí do téhož
Monga `emaily` a přes **sdílený `5_enrich`** do téhož PG `MongoEmaily.emails`. Servisní
kolekce `jnj_messages` + `jnj_sync_state` jsou v enrich `NON_MAILBOX_COLLECTIONS`
(nejsou schránky → nejdou do PG).
## Argumenty
| Argument | Význam |
|---|---|
| `--dry-run` | nic nezapíše, jen plán všech fází |
| `--full` | parse: reparsuj vše; sync: ignoruj watermark; enrich: vynuť |
| `--limit N` | max N souborů (parse) / řádků (sync) |
| `--reindex` | vynutí indexy po parse |
| `--force` | sync: ignoruj `last_db` |
| `--parse-only` / `--sync-only` / `--enrich-only` | jen daná fáze |
| `--no-enrich` | přeskoč enrich |
| `--enrich-always` | spusť enrich i bez nových dokumentů |
## Spouštění
```bash
docker exec -it python-runner python3 /scripts/jnj_tower_ingest_v1.1.py --dry-run
docker exec python-runner python3 /scripts/jnj_tower_ingest_v1.1.py # cron
docker exec -it python-runner python3 /scripts/jnj_tower_ingest_v1.1.py --enrich-only
```
## Plánování (HOTOVO)
Unraid User Scripts úloha `jnj_state_sync` (cron `*/5 * * * *`) — wrapper s `flock` volá
`docker exec python-runner python3 /scripts/jnj_tower_ingest_v1.1.py`. Loguje jen reálnou
práci/chyby do `/mnt/user/Scripts/logs/jnj_tower_ingest.log`
(grep `Zapisuji|PARSE hotovo|SYNC hotovo|ENRICH hotovo|CHYBA|Traceback`).
## Revert
`jnj_tower_ingest_v1.0.py` (bez enrich) + `parse_emails_tower_v1.3.py` +
`sync_jnj_state_v1.0.py` zůstávají v `/scripts/` jako pojistka. Návrat = přepsat wrapper
zpět. `jnj_emails_to_fulltext` přesunut do Trash (nahrazen fází 3).
## Historie verzí
- **1.0.0** 2026-06-10 — sjednocení parse + sync (mtime watermark, pořadí parse→sync).
- **1.1.0** 2026-06-10 — + fáze ENRICH (sdílený `5_enrich --mailbox`, auto-detekce verze,
jen při nových dokumentech). Nahrazuje `jnj_emails_to_fulltext_v1.0`.
File diff suppressed because it is too large Load Diff