This commit is contained in:
2026-06-08 13:08:30 +02:00
parent 70899149e4
commit 69553d1759
616 changed files with 306 additions and 17619 deletions
+16 -8
View File
@@ -1,10 +1,11 @@
# app.py | v1.9 | 2026-06-08
# app.py | v2.0 | 2026-06-08
# FastAPI server pro příjem .msg a .db souborů, upload do Dropboxu a import do Graph API.
# Endpointy: /upload (.msg → /msgs + Graph import), /upload-db (.db → /msgs/db),
# /upload-dropbox (→ Dropbox /!!!Days/Downloads Z230),
# /message-delete, /message-update (sync: smazání, přečtení, přesun složky),
# /mirror-plan (diff manifestu z JNJ vůči schránce → smaže přebytky, vrátí to_add),
# /pending-files (seznam souborů k odeslání na JNJ), /download-file/{filename}.
# /status (seznam souborů k odeslání na JNJ — jména zašifrována Fernetem),
# /item/{enc_filename} (stažení souboru — enc_filename je Fernet token).
from fastapi import FastAPI, UploadFile, File, Form, Header, HTTPException, Response
from pydantic import BaseModel
@@ -540,19 +541,26 @@ async def pending_files(authorization: str = Header(None)):
except Exception:
files = []
log.info("pending-files: %d souboru", len(files))
return {"files": files}
# Jména souborů zašifrujeme — klient vidí v URL jen neprůhledný token (bypass Zscaler)
encrypted_names = [_FERNET.encrypt(name.encode()).decode() for name in files]
return {"files": encrypted_names}
@app.get("/item/{filename:path}")
async def download_file(filename: str, authorization: str = Header(None)):
if authorization != f"Bearer {TOKEN}":
raise HTTPException(status_code=401, detail="Unauthorized")
# filename je Fernet token (zašifrované původní jméno souboru)
try:
orig_filename = _FERNET.decrypt(filename.encode()).decode()
except Exception:
raise HTTPException(status_code=400, detail="Invalid filename token")
dbx = dropbox.Dropbox(
app_key=DROPBOX_APP_KEY,
app_secret=DROPBOX_APP_SECRET,
oauth2_refresh_token=DROPBOX_REFRESH_TOKEN,
)
dropbox_path = f"{DROPBOX_UPLOAD_TO_JNJ}/{filename}"
dropbox_path = f"{DROPBOX_UPLOAD_TO_JNJ}/{orig_filename}"
try:
_, response = dbx.files_download(dropbox_path)
raw = response.content
@@ -563,15 +571,15 @@ async def download_file(filename: str, authorization: str = Header(None)):
encrypted = _FERNET.encrypt(raw)
# Přesun do Sent
sent_path = f"{DROPBOX_UPLOAD_TO_JNJ}/##Trash/{filename}"
sent_path = f"{DROPBOX_UPLOAD_TO_JNJ}/##Trash/{orig_filename}"
try:
dbx.files_move_v2(dropbox_path, sent_path, autorename=True)
log.info("download-file: %s přesunut do Sent", filename)
log.info("download-file: %s přesunut do Sent", orig_filename)
except Exception as e:
log.warning("download-file: nelze přesunout %s do Sent: %s", filename, e)
log.warning("download-file: nelze přesunout %s do Sent: %s", orig_filename, e)
return Response(
content=encrypted,
media_type="application/octet-stream",
headers={"Content-Disposition": f'attachment; filename="{filename}.enc"'},
headers={"Content-Disposition": f'attachment; filename="{orig_filename}.enc"'},
)
-170
View File
@@ -1,170 +0,0 @@
# inbox_full_sync_v1.0
**Název:** inbox_full_sync_v1.0.py
**Verze:** 1.0.4
**Datum:** 2026-06-01
**Autor:** vladimir.buzalka
---
## Účel
Jednorázový skript pro úplný přenos Inboxu z JNJ Outlooku (MAPI) do osobní schránky `vladimir.buzalka@buzalka.cz` přes Microsoft Graph API.
Spouštět ručně jako záchranná síť nebo iniciální sync. Bezpečné opakovat — duplicity se automaticky přeskočí.
---
## Co dělá
1. Připojí se k Outlooku přes MAPI (`win32com`)
2. Projde celý **Inbox** včetně všech podsložek rekurzivně
3. Pro každý email zkontroluje SQLite DB — pokud už je přenesen, přeskočí ho
4. Nový email uloží jako `.msg` do temp složky, **zašifruje** (Fernet/AES) a odešle jako `.emsg` na `msgs.buzalka.cz/upload`
5. Server (`app.py`) dešifruje, parsuje `.msg`, importuje do Graph API a vrátí `graph_id`
6. Záznam se uloží do DB (`messages`, `log`)
7. Každých 100 přenesených emailů + na konci uploaduje DB na server
**Online Archive se nepřenáší**`GetDefaultFolder(6)` vrátí pouze primární schránku.
---
## Šifrování (Zscaler bypass)
JNJ síť používá **Zscaler DLP** — blokuje upload souborů s medicínským obsahem (ECG reporty, klinická data) na externí URL.
Řešení: soubor se před odesláním zašifruje pomocí **Fernet** (AES-128 CBC + HMAC). Zscaler vidí pouze šifrovaný bináč a nerozpozná obsah.
- Šifrovací klíč se odvozuje z `TOKEN` přes SHA-256 — žádná extra konstanta, obě strany derivují klíč samostatně
- Soubor se odesílá s příponou `.emsg` místo `.msg`
- Server (app.py v1.6+) automaticky detekuje `.emsg`, dešifruje a dále zpracuje standardně
---
## Konfigurace
Konstanty jsou přímo v kódu:
| Konstanta | Hodnota |
|---|---|
| `TOKEN` | Bearer token pro msgs.buzalka.cz (slouží i jako základ šifrovacího klíče) |
| `UPLOAD_URL` | `https://msgs.buzalka.cz/upload` |
| `DB_UPLOAD_URL` | `https://msgs.buzalka.cz/upload-db` |
| `DB_PATH` | `C:\Users\vbuzalka\SQLITE\jnjemails.db` |
| `LOG_PATH` | `C:\Users\vbuzalka\SQLITE\inbox_full_sync_errors.log` |
---
## Závislosti
- Python 3.10+, Windows
- Outlook musí být spuštěn
- `pywin32`, `requests`, `cryptography`
- Server `msgs.buzalka.cz` musí běžet (app.py v1.6+)
---
## SQLite DB (`jnjemails.db`)
### Tabulka `messages`
Jeden záznam na každý přenesený email.
| Sloupec | Popis |
|---|---|
| `message_id` | Internet Message-ID (nebo `entryid:...` jako fallback) |
| `entry_id` | Outlook EntryID — pro zpětné dohledání v MAPI |
| `graph_id` | ID zprávy v Graph API — pro sync operace |
| `is_read` | Stav přečtení při přenosu (0/1) |
| `jnj_folder` | Složka v JNJ při přenosu |
| `source` | Vždy `inbox_full_sync` |
### Tabulka `runs`
Jeden záznam na každý běh skriptu.
| Sloupec | Popis |
|---|---|
| `script` | `inbox_full_sync` |
| `version` | verze skriptu |
| `started_at` / `finished_at` | časy běhu |
| `transferred` | počet nově přenesených emailů |
| `skipped` | počet přeskočených (již v DB) |
| `errors` | počet chyb |
### Tabulka `log`
Flat event log — každý console výstup i interní událost jako řádek.
| Sloupec | Popis |
|---|---|
| `run_id` | FK na `runs.id` |
| `level` | `INFO` / `ERROR` |
| `event` | typ události (viz níže) |
| `subject` | předmět emailu (pokud relevantní) |
| `folder` | složka (pokud relevantní) |
| `graph_id` | Graph ID (pokud relevantní) |
| `detail` | pro `upload_saved`: `size=XKB`; pro `upload_error`: `error=... \| size=XKB \| body=... \| sender=... \| received=... \| entry_id=... \| message_id=...` |
#### Události (`log.event`)
| Event | Popis |
|---|---|
| `run_start` | start skriptu |
| `mailbox` | název schránky |
| `folder_start` | vstup do složky (detail = počet položek) |
| `folder_done` | konec složky (detail = přeneseno/skip) |
| `upload_saved` | nový email úspěšně přenesen (detail = size=XKB) |
| `upload_exists` | email již v DB, přeskočen |
| `upload_error` | chyba při uploadu — detail obsahuje sender, received, entry_id, message_id pro dohledání v Outlooku |
| `progress` | každých 100 přenesených emailů |
| `db_upload` | úspěšný upload DB na server |
| `db_upload_error` | chyba uploadu DB |
| `run_done` | konec skriptu (detail = souhrn) |
---
## Užitečné dotazy
**Poslední běh — kompletní log:**
```sql
SELECT r.script, r.version, r.started_at,
l.level, l.event, l.subject, l.folder, l.detail, l.created_at
FROM log l JOIN runs r ON r.id = l.run_id
WHERE l.run_id = (SELECT MAX(id) FROM runs)
ORDER BY l.created_at
```
**Přehled všech běhů:**
```sql
SELECT id, script, version, started_at, finished_at,
transferred, skipped, errors
FROM runs ORDER BY started_at DESC
```
**Chyby z posledního běhu:**
```sql
SELECT l.event, l.subject, l.folder, l.detail, l.created_at
FROM log l
WHERE l.run_id = (SELECT MAX(id) FROM runs)
AND l.level = 'ERROR'
ORDER BY l.created_at
```
---
## Návaznost
- Sdílí DB s `janssenpc_email_send_new_v1.5.py` — záznamy jsou kompatibilní
- Emaily přenesené tímto skriptem mají `graph_id` a jsou od té chvíle hlídány sync průchodem v1.5
- Server endpoint: `msgs.buzalka.cz/upload` musí vracet `graph_id` (app.py v1.6+)
- nginx `client_max_body_size` nastaven na **200M** (SWAG `msgreceiver.subdomain.conf`)
---
## Historie verzí
| Verze | Datum | Změna |
|---|---|---|
| 1.0.0 | 2026-06-01 | Základní funkce: Inbox full scan, dedup přes DB, entry_id/graph_id/is_read |
| 1.0.1 | 2026-06-01 | DB upload každých 100 emailů + finální upload |
| 1.0.2 | 2026-06-01 | SQLite tabulky runs + log |
| 1.0.3 | 2026-06-01 | Kompletní konzolový výstup zrcadlen do log tabulky, skipped counter |
| 1.0.4 | 2026-06-01 | Šifrování Fernet (.emsg) pro bypass Zscaler DLP; rozšířený error detail (sender/received/entry_id/size) |
-384
View File
@@ -1,384 +0,0 @@
"""
inbox_full_sync v1.0
Název: inbox_full_sync_v1.0.py
Verze: 1.0.3
Datum: 2026-06-01
Autor: vladimir.buzalka
Popis:
Jednorázový skript pro úplný přenos Inboxu z JNJ Outlooku (MAPI) do osobní
schránky vladimir.buzalka@buzalka.cz přes Graph API.
Prochází celý Inbox včetně všech podsložek. Online Archive se nepřenáší
(GetDefaultFolder(6) vrátí pouze primární schránku).
Každý email se uloží jako .msg do temp složky, odešle na https://msgs.buzalka.cz/upload
a přes Graph API se importuje do odpovídající složky v osobní schránce.
Dedup zajišťuje SQLite DB — email který je v DB (message_id) se přeskočí.
Spouštění:
Spouštět ručně jako záchranná síť nebo iniciální sync.
Bezpečné opakovat — duplicity se přeskočí.
Závislosti:
win32com, requests, sqlite3 (stdlib)
Python 3.10+, Windows, Outlook musí být spuštěn
Konfigurace (konstanty v kódu):
TOKEN Bearer token pro msgs.buzalka.cz
UPLOAD_URL https://msgs.buzalka.cz/upload
DB_UPLOAD_URL https://msgs.buzalka.cz/upload-db
DB_PATH C:\\Users\\vbuzalka\\SQLITE\\jnjemails.db
LOG_PATH C:\\Users\\vbuzalka\\SQLITE\\inbox_full_sync_errors.log
SQLite DB (jnjemails.db):
messages — přenesené emaily (message_id, entry_id, graph_id, is_read, jnj_folder, ...)
runs — jeden záznam na běh (script, version, started_at, finished_at, counts)
log — flat event log per run (level, event, subject, folder, graph_id, detail)
Dotaz pro posledn běh:
SELECT r.script, r.version, r.started_at, l.level, l.event,
l.subject, l.folder, l.detail, l.created_at
FROM log l JOIN runs r ON r.id = l.run_id
WHERE l.run_id = (SELECT MAX(id) FROM runs)
ORDER BY l.created_at
Log události (log.event):
run_start — start skriptu
mailbox — název schránky
folder_start — vstup do složky (detail = počet položek)
folder_done — konec složky (detail = přeneseno/skip)
upload_saved — nový email přenesen
upload_exists — email již v DB, přeskočen
upload_error — chyba při uploadu (detail = chybová zpráva)
progress — každých 100 přenesených
db_upload — úspěšný upload DB na server
db_upload_error — chyba uploadu DB
run_done — konec skriptu (detail = souhrn)
Historie verzí:
1.0.0 2026-06-01 Základní funkce: Inbox full scan, dedup přes DB, entry_id/graph_id/is_read
1.0.1 2026-06-01 DB upload každých 100 emailů + finální upload
1.0.2 2026-06-01 SQLite tabulky runs + log
1.0.3 2026-06-01 Kompletní konzolový výstup zrcadlen do log tabulky, skipped counter
"""
import win32com.client
import requests
import sqlite3
import urllib3
import logging
import hashlib
import base64
from pathlib import Path
from datetime import datetime
from cryptography.fernet import Fernet
import tempfile
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
TOKEN = "13e1bb01-9fd5-44a8-8ce9-4ee27133d340"
UPLOAD_URL = "https://msgs.buzalka.cz/upload"
DB_PATH = r"C:\Users\vbuzalka\SQLITE\jnjemails.db"
LOG_PATH = r"C:\Users\vbuzalka\SQLITE\inbox_full_sync_errors.log"
PR_INTERNET_MESSAGE_ID = "http://schemas.microsoft.com/mapi/proptag/0x1035001E"
DB_UPLOAD_URL = "https://msgs.buzalka.cz/upload-db"
SCRIPT_NAME = "inbox_full_sync"
SCRIPT_VERSION = "1.0.4"
# Šifrovací klíč odvozený z TOKENu — stejný algoritmus jako na serveru
_FERNET = Fernet(base64.urlsafe_b64encode(hashlib.sha256(TOKEN.encode()).digest()))
logging.basicConfig(
filename=LOG_PATH,
level=logging.ERROR,
format="%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
encoding="utf-8",
)
def init_db(conn):
conn.execute("""
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id TEXT NOT NULL,
subject TEXT,
sender TEXT,
received_at TEXT,
folder TEXT,
source TEXT,
uploaded_at TEXT DEFAULT (datetime('now')),
entry_id TEXT,
graph_id TEXT,
is_read INTEGER DEFAULT 0,
jnj_folder TEXT
)
""")
conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_message_id ON messages(message_id)")
conn.execute("""
CREATE TABLE IF NOT EXISTS runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
script TEXT NOT NULL,
version TEXT,
started_at TEXT NOT NULL,
finished_at TEXT,
transferred INTEGER DEFAULT 0,
skipped INTEGER DEFAULT 0,
sync_updated INTEGER DEFAULT 0,
sync_deleted INTEGER DEFAULT 0,
errors INTEGER DEFAULT 0
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id INTEGER REFERENCES runs(id),
level TEXT NOT NULL,
event TEXT NOT NULL,
subject TEXT,
folder TEXT,
graph_id TEXT,
detail TEXT,
created_at TEXT DEFAULT (datetime('now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_log_run_id ON log(run_id)")
for col, definition in [
("entry_id", "TEXT"),
("graph_id", "TEXT"),
("is_read", "INTEGER DEFAULT 0"),
("jnj_folder", "TEXT"),
]:
try:
conn.execute(f"ALTER TABLE messages ADD COLUMN {col} {definition}")
except Exception:
pass
conn.commit()
def start_run(conn):
cur = conn.execute(
"INSERT INTO runs (script, version, started_at) VALUES (?, ?, datetime('now'))",
(SCRIPT_NAME, SCRIPT_VERSION)
)
conn.commit()
return cur.lastrowid
def finish_run(conn, run_id, transferred, skipped, errors):
conn.execute("""
UPDATE runs SET finished_at=datetime('now'), transferred=?, skipped=?, errors=?
WHERE id=?
""", (transferred, skipped, errors, run_id))
conn.commit()
def db_log(conn, run_id, level, event, subject=None, folder=None, graph_id=None, detail=None):
conn.execute("""
INSERT INTO log (run_id, level, event, subject, folder, graph_id, detail)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (run_id, level, event, subject, folder, graph_id, detail))
conn.commit()
def info(conn, run_id, event, **kwargs):
db_log(conn, run_id, "INFO", event, **kwargs)
def error(conn, run_id, event, **kwargs):
db_log(conn, run_id, "ERROR", event, **kwargs)
def is_uploaded(conn, message_id):
row = conn.execute(
"SELECT 1 FROM messages WHERE message_id = ? LIMIT 1", (message_id,)
).fetchone()
return row is not None
def save_to_db(conn, message_id, subject, sender, received_at, folder,
entry_id=None, graph_id=None, is_read=0):
conn.execute("""
INSERT OR IGNORE INTO messages
(message_id, subject, sender, received_at, folder, source,
entry_id, graph_id, is_read, jnj_folder)
VALUES (?, ?, ?, ?, ?, 'inbox_full_sync', ?, ?, ?, ?)
""", (message_id, subject, sender, received_at, folder,
entry_id, graph_id, is_read, folder))
conn.commit()
def upload_db(conn, run_id):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"jnjemails_{timestamp}.db"
try:
with open(DB_PATH, "rb") as f:
resp = requests.post(
DB_UPLOAD_URL,
headers={"Authorization": f"Bearer {TOKEN}"},
files={"file": (filename, f, "application/octet-stream")},
timeout=60,
)
result = resp.json()
msg = f"DB upload: {result}"
print(f" {msg}")
info(conn, run_id, "db_upload", detail=msg)
except Exception as e:
msg = str(e)
print(f" DB upload CHYBA: {msg}")
error(conn, run_id, "db_upload_error", detail=msg)
def upload_msg(msg_path, filename, folder=""):
size_kb = Path(msg_path).stat().st_size // 1024
with open(msg_path, "rb") as f:
encrypted = _FERNET.encrypt(f.read())
enc_filename = Path(filename).stem + ".emsg"
resp = requests.post(
UPLOAD_URL,
headers={"Authorization": f"Bearer {TOKEN}"},
files={"file": (enc_filename, encrypted, "application/octet-stream")},
data={"folder": folder},
timeout=60,
)
if not resp.ok:
raise requests.HTTPError(
f"{resp.status_code} {resp.reason} | size={size_kb}KB | body={resp.text[:300]}",
response=resp,
)
return resp.json()
def process_folder(conn, run_id, folder, folder_path, counter, skipped_counter, error_counter):
current_path = f"{folder_path}/{folder.Name}"
items = folder.Items
items.Sort("[ReceivedTime]", False)
count = 0
skipped = 0
total = items.Count
msg = f"Složka: {current_path} ({total} položek)"
print(f"\n {msg}")
info(conn, run_id, "folder_start", folder=current_path, detail=str(total))
for item in items:
subject = getattr(item, 'Subject', '?')
try:
if not item.MessageClass.upper().startswith("IPM.NOTE"):
continue
try:
mid = item.PropertyAccessor.GetProperty(PR_INTERNET_MESSAGE_ID)
except Exception:
mid = None
if not mid:
mid = f"entryid:{item.EntryID}"
if is_uploaded(conn, mid):
skipped += 1
skipped_counter[0] += 1
continue
try:
with tempfile.TemporaryDirectory() as tmp:
safe_name = f"{item.EntryID[-20:]}.msg"
tmp_path = Path(tmp) / safe_name
item.SaveAs(str(tmp_path), 3)
size_kb = tmp_path.stat().st_size // 1024
result = upload_msg(tmp_path, safe_name, current_path)
status = result.get("status", "?")
graph_id = result.get("graph_id")
is_read = 0 if item.UnRead else 1
received = item.ReceivedTime.isoformat() if item.ReceivedTime else None
save_to_db(conn, mid, subject, item.SenderEmailAddress,
received, current_path,
entry_id=item.EntryID, graph_id=graph_id, is_read=is_read)
info(conn, run_id, f"upload_{status}",
subject=subject, folder=current_path, graph_id=graph_id,
detail=f"size={size_kb}KB")
counter[0] += 1
count += 1
if counter[0] % 100 == 0:
msg = f"celkem přeneseno: {counter[0]}"
print(f"{msg}, uploaduji DB...")
info(conn, run_id, "progress", detail=msg)
upload_db(conn, run_id)
print(f" {status.upper():6} | {subject[:70]}")
except Exception as e:
sender_str = getattr(item, 'SenderEmailAddress', '?')
received_str = getattr(item, 'ReceivedTime', None)
received_str = received_str.isoformat() if received_str else '?'
entry_id_str = getattr(item, 'EntryID', '?')
detail = (
f"error={e} | "
f"sender={sender_str} | "
f"received={received_str} | "
f"entry_id={entry_id_str} | "
f"message_id={mid}"
)
print(f" CHYBA | {subject[:50]} | sender={sender_str} | received={received_str} | {e}")
error(conn, run_id, "upload_error",
subject=subject, folder=current_path, detail=detail)
logging.error("folder=%s | %s", current_path, detail)
error_counter[0] += 1
except Exception as e:
# Neočekávaná chyba mimo upload blok (MessageClass, EntryID, apod.)
print(f" CHYBA (item) | {subject[:50]} | {e}")
logging.error("folder=%s | item_error | subject=%s | error=%s", current_path, subject, e)
error_counter[0] += 1
msg = f"složka hotova: přeneseno {count} | skip {skipped}"
print(f"{msg}")
info(conn, run_id, "folder_done", folder=current_path, detail=msg)
for subfolder in folder.Folders:
process_folder(conn, run_id, subfolder, current_path, counter, skipped_counter, error_counter)
# --- MAIN ---
print(f"=== inbox_full_sync v{SCRIPT_VERSION} ===")
print(f"Start: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
conn = sqlite3.connect(DB_PATH)
init_db(conn)
run_id = start_run(conn)
info(conn, run_id, "run_start", detail=f"script={SCRIPT_NAME} version={SCRIPT_VERSION}")
outlook = win32com.client.Dispatch("Outlook.Application")
ns = outlook.GetNamespace("MAPI")
inbox = ns.GetDefaultFolder(6) # olFolderInbox — primární schránka, bez Online Archive
mailbox_name = inbox.Parent.Name
print(f"\nSchránka: {mailbox_name}")
info(conn, run_id, "mailbox", detail=mailbox_name)
counter = [0]
skipped_counter = [0]
error_counter = [0]
process_folder(conn, run_id, inbox, f"/{mailbox_name}", counter, skipped_counter, error_counter)
finish_run(conn, run_id,
transferred=counter[0],
skipped=skipped_counter[0],
errors=error_counter[0])
summary = f"přeneseno {counter[0]} | skip {skipped_counter[0]} | chyby {error_counter[0]}"
print(f"\n=== Hotovo: {summary} ===")
info(conn, run_id, "run_done", detail=summary)
print("Uploaduji DB...")
upload_db(conn, run_id)
print(f"Konec: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Chyby logovány do: {LOG_PATH}")
conn.close()
-248
View File
@@ -1,248 +0,0 @@
# parse_emails_tower_v1.1
## Spuštění
**První spuštění:**
```bash
docker exec -d python-runner bash -c \
"python /scripts/parse_emails_tower_v1.1.py > /scripts/parse_emails.log 2>&1"
```
**Pokračování po přerušení (přeskočí už importované):**
```bash
docker exec -d python-runner bash -c \
"python /scripts/parse_emails_tower_v1.1.py --skip-existing > /scripts/parse_emails.log 2>&1"
```
---
## Stav importu
**Sledování průběhu (live log):**
```bash
docker exec -it python-runner tail -f /scripts/parse_emails.log
```
**Počet emailů v MongoDB:**
```bash
docker exec -it python-runner python -c \
"from pymongo import MongoClient; c=MongoClient('mongodb://192.168.1.76:27017'); print(c['emaily']['vbuzalka@its.jnj.com'].count_documents({}))"
```
---
**Název:** parse_emails_tower_v1.1.py
**Verze:** 1.1
**Datum:** 2026-06-02
**Autor:** vladimir.buzalka
---
## Účel
Import všech `.msg` souborů do MongoDB. Z každého souboru extrahuje **všechny dostupné vlastnosti** — podobně jako EXIF u fotek.
- **DB:** `emaily`
- **Kolekce:** `vbuzalka@its.jnj.com`
- `_id` = Internet Message-ID (nebo `filename:<stem>` jako fallback)
- Bezpečné přerušit a opakovat — upsert podle `_id`
---
## Prostředí
Běží v Docker containeru **python-runner** na **Unraid Tower**.
| Komponenta | Umístění |
|---|---|
| Container | `python-runner` (Docker na Unraid Tower) |
| .msg soubory | `/mnt/user/JNJEMAILS``/mnt/JNJEMAILS` uvnitř containeru |
| Skripty | `/mnt/user/Scripts``/scripts` uvnitř containeru |
| MongoDB | `192.168.1.76:27017` (externí, mimo container) |
---
## Spouštění (z Unraid terminálu)
**Test na 50 emailech:**
```bash
docker exec -it python-runner python /scripts/parse_emails_tower_v1.1.py --limit 50 --no-indexes
```
**Kompletní import na pozadí (log do souboru):**
```bash
docker exec -d python-runner bash -c \
"python /scripts/parse_emails_tower_v1.1.py > /scripts/parse_emails.log 2>&1"
```
**Pokračování po přerušení:**
```bash
docker exec -d python-runner bash -c \
"python /scripts/parse_emails_tower_v1.1.py --skip-existing > /scripts/parse_emails.log 2>&1"
```
**Sledování průběhu (Ctrl+C ukončí sledování, import běží dál):**
```bash
docker exec -it python-runner tail -f /scripts/parse_emails.log
```
### Všechny parametry
| Parametr | Popis |
|---|---|
| `--skip-existing` | Načte seznam hotových souborů z MongoDB a přeskočí je. Použij pro pokračování po přerušení. |
| `--limit N` | Zpracuje jen prvních N souborů. Vhodné pro test. |
| `--no-indexes` | Nevytváří indexy na konci. Použij pokud přerušíš uprostřed — indexy vytvoř ručně až je vše hotové. |
| `--msgs-dir PATH` | Přepíše výchozí cestu k .msg souborům (výchozí: `/mnt/JNJEMAILS`). |
---
## Průběh na konzoli
Každý email na jednom řádku:
```
1/69371 OK RE: Protocol deviation CZ10022 jan.novak@its.jnj.com
2/69371 OK UCO3001: Draft FUL pro DD5-CZ10022 monitor@4gclinical.com
3/69371 ERR ? ?
```
Každých 500 emailů oddělovač s průběhem:
```
────────────────────────────────────────────────────────────────────────────────
Průběh: ok=498 err=2 0.4 msg/s ETA 47h12m
────────────────────────────────────────────────────────────────────────────────
```
Na konci souhrn:
```
====================================================
Vysledek: ok=69300 | skip=0 | err=71
Celkovy cas: 47h 23m 10s
Dokumentu v kolekci: 69300
```
---
## Zdroje dat z každého .msg
| Pole | Popis |
|---|---|
| Předmět, normalized subject | |
| Odesílatel | email, jméno, SMTP adresa |
| Příjemci To/CC/BCC | strukturovaně `[{type, email, name}]` |
| Čas doručení a odeslání | UTC |
| Tělo | plaintext + HTML (max 2 MB) |
| Přílohy | metadata: jméno, velikost, MIME typ, inline flag |
| Internet headers | X-Originating-IP, Received, DKIM, X-Mailer, ... |
| MAPI | důležitost, citlivost, příznak, konverzační vlákno, kategorie |
| In-Reply-To, References | pro rekonstrukci vlákna |
| Raw MAPI properties | `{0xXXXX: value}` |
---
## Hodnotové kódy
| Pole | Hodnota | Význam |
|---|---|---|
| `importance` | 0 | Nízká |
| | 1 | Normální |
| | 2 | Vysoká |
| `sensitivity` | 0 | Normální |
| | 1 | Osobní |
| | 2 | Soukromé |
| | 3 | Důvěrné |
| `flag_status` | 0 | Bez příznaku |
| | 1 | Označeno (follow up) |
| | 2 | Dokončeno |
---
## MongoDB indexy
Automaticky vytvořeny na konci importu (`--no-indexes` přeskočí):
| Index | Pole |
|---|---|
| Chronologický | `received_at`, `sent_at` |
| Odesílatel | `sender.email` |
| Soubor | `filename` (unique) |
| Konverzace | `conversation_topic` |
| Filtry | `has_attachments`, `categories`, `importance`, `flag_status` |
| Full-text | `subject` + `body_text` + `to` + `cc` (text index `text_search`) |
---
## Ukázkové dotazy (MongoDB shell / MCP)
**Emaily o UCO3001 s přílohou:**
```javascript
db["vbuzalka@its.jnj.com"].find({
$text: { $search: "UCO3001" },
has_attachments: true
}).sort({ received_at: -1 })
```
**Emaily od konkrétního odesílatele:**
```javascript
db["vbuzalka@its.jnj.com"].find({
"sender.email": /covance/i
}).sort({ received_at: -1 })
```
**Celé konverzační vlákno:**
```javascript
db["vbuzalka@its.jnj.com"].find({
conversation_topic: "Protocol deviation CZ10022"
}).sort({ received_at: 1 })
```
**Statistiky podle odesílatele (top 20):**
```javascript
db["vbuzalka@its.jnj.com"].aggregate([
{ $group: { _id: "$sender.email", count: { $sum: 1 } } },
{ $sort: { count: -1 } },
{ $limit: 20 }
])
```
---
## Chybový log
Soubory které selhaly jsou zalogrovány do `parse_emails_errors.log` vedle skriptu (tj. `/scripts/parse_emails_errors.log``\\tower\Scripts\parse_emails_errors.log`):
```
2026-06-02 20:14:33 | open failed [7A3F...0000.msg]: <důvod>
```
---
## Výkon
| Parametr | Hodnota |
|---|---|
| Počet souborů | ~69 000 |
| Rychlost | ~0.4 msg/s (htmlBody dekódování) |
| Odhadovaný čas | 48 hodin |
| Batch size | 200 dokumentů / bulk_write |
| Odhadovaná velikost DB | 25 GB |
---
## Závislosti (v Docker image python-runner)
```
extract-msg==0.55.0
pymongo
python-dateutil
```
Image sestaven z `Dockerfile` v `/mnt/user/Scripts/python-runner/`.
---
## Historie verzí
| Verze | Datum | Změna |
|---|---|---|
| 1.0 | 2026-06-01 | Iniciální verze |
| 1.1 | 2026-06-02 | Nasazení na Unraid Tower v Docker containeru python-runner; MSGS_DIR změněno z SMB share (`\\tower\JNJEMAILS`) na lokální mount (`/mnt/JNJEMAILS`); aktualizován popis spouštění pro `docker exec` |
-660
View File
@@ -1,660 +0,0 @@
"""
parse_emails_tower_v1.1.py
Nazev: parse_emails_tower_v1.1.py
Verze: 1.1
Datum: 2026-06-02
Autor: vladimir.buzalka
Popis:
Parsuje vsechny .msg soubory z MSGS_DIR a importuje je jako dokumenty
do MongoDB. Z kazdeho souboru extrahuje VSECHNY dostupne vlastnosti —
podobne jako EXIF u fotek:
- predmet, odesilatel, prijemci (To/CC/BCC s typy)
- cas doruceni a odeslani (UTC)
- telo plaintext + HTML (max 2 MB)
- prilohy (metadata: jmeno, velikost, MIME typ, inline flag)
- internet headers (X-Originating-IP, Received, DKIM, ...)
- MAPI vlastnosti: dulezitost, citlivost, priznak, konverzacni vlakno,
kategorie, In-Reply-To, References, ...
- vsechny raw MAPI properties jako {0xXXXX: value}
DB: emaily
Kolekce: vbuzalka@its.jnj.com
_id: Internet Message-ID (nebo "filename:<stem>" jako fallback)
Bezpecne prerusit a opakovat:
- upsert podle _id — duplicity se automaticky prepisi
- --skip-existing nacte seznam hotovych souboru z MongoDB a
preskoci je => pokracovani po preruseni bez ztraty prace
Prostredi:
Bezi v Docker containeru "python-runner" na Unraid Tower.
.msg soubory jsou dostupne jako lokalni disk (volume mount):
/mnt/user/JNJEMAILS -> /mnt/JNJEMAILS (uvnitr containeru)
MongoDB na 192.168.1.76:27017 (externi, bezi mimo container).
Spousteni (z Unraid terminalu):
# Test na 50 emailech:
docker exec -it python-runner python /scripts/parse_emails_tower_v1.1.py --limit 50 --no-indexes
# Kompletni import na pozadi (log do souboru):
docker exec -d python-runner bash -c \
"python /scripts/parse_emails_tower_v1.1.py > /scripts/parse_emails.log 2>&1"
# Pokracovani po preruseni:
docker exec -d python-runner bash -c \
"python /scripts/parse_emails_tower_v1.1.py --skip-existing > /scripts/parse_emails.log 2>&1"
# Sledovani prubehu:
docker exec -it python-runner tail -f /scripts/parse_emails.log
Vystup na konzoli:
Kazdy email na jednom radku:
<poradi>/<celkem> OK/ERR <predmet 60 znaku> <odesilatel>
Kazych 500 emailu: oddelovac s prubehem, rychlosti a ETA.
Na konci: souhrn ok/skip/err, celkovy cas, pocet dokumentu v kolekci.
Zavislosti (nainstalovane v Docker image python-runner):
extract-msg==0.55.0, pymongo, python-dateutil
Python 3.12, Linux (Docker container na Unraid Tower)
Struktura dokumentu v MongoDB:
_id Internet Message-ID (nebo filename: fallback)
filename jmeno .msg souboru (20znakovy hex + .msg)
subject predmet zpravy
normalized_subject predmet bez RE:/FW: prefixu
importance 0=nizka 1=normalni 2=vysoka
sensitivity 0=normalni 1=osobni 2=soukrome 3=duverne
flag_status 0=bez priznaku 1=oznaceno 2=dokonceno
read_receipt_requested bool
delivery_receipt_requested bool
has_attachments bool
attachment_count int
message_size_bytes velikost .msg souboru na disku
conversation_topic tema vlakna (PR_CONVERSATION_TOPIC)
conversation_index base64 PR_CONVERSATION_INDEX
in_reply_to Message-ID predchozi zpravy
internet_references [Message-ID] — cela historia vlakna
categories [str] — MAPI kategorie / stitky
read_receipt_requested bool
delivery_receipt_requested bool
received_at datetime UTC — cas doruceni
sent_at datetime UTC — cas odeslani
sender.email emailova adresa odesilatele
sender.name zobrazovane jmeno odesilatele
sender.smtp SMTP adresa (pro interni EX adresy)
to retezec To (tak jak v Outlooku)
cc retezec CC
bcc retezec BCC
display_to PR_DISPLAY_TO (zkraceny seznam)
display_cc PR_DISPLAY_CC
recipients [{type, email, name}] — to/cc/bcc s typy
body_text plain text telo
body_html HTML telo (max 2 MB, None pokud neni)
attachments [{filename, size_bytes, mime_type,
content_id, is_inline}]
headers dict internet headers (lowercase_s_podtrzitky)
mapi dict vsech raw MAPI properties {0xXXXX: value}
parsed_at datetime UTC — cas parsovani
Indexy (vytvoreny automaticky na konci):
received_at, sent_at, sender.email, filename (unique),
conversation_topic, has_attachments, categories, importance,
flag_status, text_search (subject + body_text + to + cc)
Chyby:
Soubory ktere selhaly jsou zalogiovany do parse_emails_errors.log
v adresari skriptu. Radek: timestamp | open/extract failed | duvod.
Historie verzi:
1.0 2026-06-01 Inicialni verze
1.1 2026-06-02 Nasazeni na Unraid Tower v Docker containeru python-runner;
MSGS_DIR zmeneno z SMB share na lokalni mount /mnt/JNJEMAILS;
aktualizovany popis spousteni pro docker exec
"""
import sys
import re
import logging
import argparse
import base64
from pathlib import Path
from datetime import datetime, timezone
from typing import Optional
import extract_msg
from dateutil import parser as dtparser
from pymongo import MongoClient, UpdateOne, ASCENDING, TEXT
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
# ─── KONFIGURACE ──────────────────────────────────────────────────────────────
MSGS_DIR = Path("/mnt/JNJEMAILS")
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "emaily"
MONGO_COL = "vbuzalka@its.jnj.com"
BATCH_SIZE = 200
LOG_FILE = Path(__file__).parent / "parse_emails_errors.log"
SCRIPT_VERSION = "1.1"
# ──────────────────────────────────────────────────────────────────────────────
logging.basicConfig(
filename=str(LOG_FILE),
level=logging.ERROR,
format="%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
encoding="utf-8",
)
# ─── Pomocné funkce ───────────────────────────────────────────────────────────
def safe(obj, *attrs, default=None):
"""Bezpecne cteni atributu — vrati prvni non-None hodnotu."""
for attr in attrs:
try:
val = getattr(obj, attr, None)
if val is None:
continue
if isinstance(val, str) and not val.strip():
continue
return val
except Exception:
continue
return default
def parse_date(raw) -> Optional[datetime]:
"""Libovolny datum -> UTC datetime bez tzinfo (pro MongoDB)."""
if raw is None:
return None
if isinstance(raw, datetime):
if raw.tzinfo:
return raw.astimezone(timezone.utc).replace(tzinfo=None)
return raw
try:
dt = dtparser.parse(str(raw))
if dt.tzinfo:
return dt.astimezone(timezone.utc).replace(tzinfo=None)
return dt
except Exception:
return None
def to_bson(val):
"""Konvertuje hodnotu na BSON-serializovatelny typ."""
if isinstance(val, bytes):
return val.hex() if len(val) <= 128 else f"<bytes:{len(val)}>"
if isinstance(val, datetime):
return parse_date(val)
if isinstance(val, (str, int, float, bool, type(None))):
return val
if isinstance(val, list):
return [to_bson(v) for v in val]
try:
return int(val)
except Exception:
pass
return str(val)
# ─── Extrakce částí zprávy ────────────────────────────────────────────────────
def extract_headers(msg) -> dict:
headers = {}
try:
hdr = msg.header
if not hdr:
return {}
from email.header import decode_header as _dh
def _decode(v: str) -> str:
try:
parts = _dh(v)
out = ""
for part, enc in parts:
out += part.decode(enc or "utf-8", errors="replace") if isinstance(part, bytes) else part
return out
except Exception:
return v
for key in set(hdr.keys()):
k = key.lower().replace("-", "_")
vals = [_decode(v) for v in hdr.get_all(key, [])]
headers[k] = vals if len(vals) > 1 else (vals[0] if vals else "")
except Exception as e:
logging.error("extract_headers: %s", e)
return headers
def extract_recipients(msg) -> list:
result = []
type_map = {1: "to", 2: "cc", 3: "bcc"}
try:
for r in msg.recipients:
rtype = getattr(r, "type", 1)
try:
rtype = int(rtype)
except Exception:
try:
rtype = int(rtype.value)
except Exception:
rtype = 1
rec = {
"type": type_map.get(rtype, "to"),
"email": safe(r, "email", default=""),
"name": safe(r, "name", default=""),
}
result.append(rec)
except Exception as e:
logging.error("extract_recipients: %s", e)
return result
def extract_attachments(msg) -> list:
result = []
try:
for att in msg.attachments:
fname = safe(att, "longFilename", "shortFilename", default="")
if not fname:
continue
size = 0
try:
d = att.data
size = len(d) if d else 0
except Exception:
pass
result.append({
"filename": fname,
"size_bytes": size,
"mime_type": safe(att, "mimetype", "mimeType", default="application/octet-stream"),
"content_id": safe(att, "cid", default=None),
"is_inline": bool(safe(att, "isInline", default=False)),
})
except Exception as e:
logging.error("extract_attachments: %s", e)
return result
def extract_mapi_props(msg) -> dict:
"""Vsechny raw MAPI properties jako {0xXXXX: value}."""
result = {}
try:
props = msg.props
if not hasattr(props, "items"):
return {}
for key, prop in props.items():
try:
val = to_bson(prop.value)
prop_id = f"0x{key[:4].upper()}" if len(key) >= 4 else f"0x{key.upper()}"
result[prop_id] = val
except Exception:
pass
except Exception as e:
logging.error("extract_mapi_props: %s", e)
return result
# ─── Hlavní extrakce ─────────────────────────────────────────────────────────
def extract_message(msg_path: Path) -> Optional[dict]:
"""Parsuje jeden .msg soubor -> MongoDB dokument."""
try:
msg = extract_msg.Message(str(msg_path))
except Exception as e:
logging.error("open failed [%s]: %s", msg_path.name, e)
return None
try:
# ── Message-ID ────────────────────────────────────────────────
mid = None
for attr in ("messageId", "message_id", "internetMessageId"):
mid = safe(msg, attr)
if mid:
break
if not mid:
mid = f"filename:{msg_path.stem}"
mid = str(mid).strip()
# ── Předmět ───────────────────────────────────────────────────
try:
subject = msg.subject or ""
except Exception:
subject = ""
normalized_subject = safe(msg, "normalizedSubject", "normalized_subject", default="")
# ── Tělo ──────────────────────────────────────────────────────
try:
body_text = msg.body or ""
except Exception:
body_text = ""
body_html = None
try:
bh = msg.htmlBody
if isinstance(bh, bytes):
bh = bh.decode("utf-8", errors="replace")
if bh:
body_html = bh if len(bh) <= 2 * 1024 * 1024 else bh[:2 * 1024 * 1024]
except Exception:
pass
# ── Odesílatel ────────────────────────────────────────────────
try:
sender_email = msg.sender or ""
except Exception:
sender_email = ""
sender_name = safe(msg, "senderName", "sender_name", default="")
sender_smtp = safe(msg, "senderSmtpAddress", "sent_representing_smtp_address", default="")
# ── Příjemci ──────────────────────────────────────────────────
recipients = extract_recipients(msg)
try:
to_raw = msg.to or ""
except Exception:
to_raw = ""
try:
cc_raw = msg.cc or ""
except Exception:
cc_raw = ""
try:
bcc_raw = getattr(msg, "bcc", None) or ""
except Exception:
bcc_raw = ""
display_to = safe(msg, "displayTo", "display_to", default="")
display_cc = safe(msg, "displayCc", "display_cc", default="")
# ── Časy ──────────────────────────────────────────────────────
try:
received_at = parse_date(msg.date)
except Exception:
received_at = None
sent_at = None
for attr in ("clientSubmitTime", "client_submit_time", "sentOn"):
v = safe(msg, attr)
if v:
sent_at = parse_date(v)
break
# ── MAPI vlastnosti ───────────────────────────────────────────
importance = 1
try:
v = msg.importance
if v is not None:
importance = int(v)
except Exception:
pass
sensitivity = 0
try:
v = getattr(msg, "sensitivity", None)
if v is not None:
sensitivity = int(v)
except Exception:
pass
flag_status = 0
try:
v = safe(msg, "flagStatus", "flag_status")
if v is not None:
flag_status = int(v)
except Exception:
pass
conversation_topic = safe(msg, "conversationTopic", "conversation_topic", default="")
conversation_index = ""
try:
ci = safe(msg, "conversationIndex", "conversation_index")
if isinstance(ci, bytes):
conversation_index = base64.b64encode(ci).decode()
elif ci:
conversation_index = str(ci)
except Exception:
pass
in_reply_to = safe(msg, "inReplyTo", "in_reply_to", default="")
internet_refs = []
try:
refs = safe(msg, "internetReferences", "internet_references")
if isinstance(refs, list):
internet_refs = refs
elif isinstance(refs, str) and refs:
internet_refs = [r.strip() for r in refs.split() if r.strip()]
except Exception:
pass
categories = []
try:
cats = safe(msg, "categories")
if isinstance(cats, list):
categories = [str(c) for c in cats if c]
elif isinstance(cats, str) and cats:
categories = [c.strip() for c in re.split(r"[;,]", cats) if c.strip()]
except Exception:
pass
read_receipt = bool(safe(msg, "readReceiptRequested", "read_receipt_requested", default=False))
delivery_receipt = bool(safe(msg, "deliveryReceiptRequested", "delivery_receipt_requested", default=False))
# ── Internet headers ──────────────────────────────────────────
headers = extract_headers(msg)
if not in_reply_to:
in_reply_to = headers.get("in_reply_to", "")
if not internet_refs:
refs_str = headers.get("references", "")
if isinstance(refs_str, str) and refs_str:
internet_refs = [r.strip() for r in refs_str.split() if r.strip()]
# ── Přílohy ───────────────────────────────────────────────────
attachments = extract_attachments(msg)
# ── Raw MAPI ──────────────────────────────────────────────────
mapi_raw = extract_mapi_props(msg)
msg.close()
# ── Dokument ──────────────────────────────────────────────────
return {
"_id": mid,
"filename": msg_path.name,
"subject": subject,
"normalized_subject": normalized_subject,
"importance": importance,
"sensitivity": sensitivity,
"flag_status": flag_status,
"read_receipt_requested": read_receipt,
"delivery_receipt_requested": delivery_receipt,
"has_attachments": len(attachments) > 0,
"attachment_count": len(attachments),
"message_size_bytes": msg_path.stat().st_size,
"conversation_topic": conversation_topic,
"conversation_index": conversation_index,
"in_reply_to": in_reply_to,
"internet_references": internet_refs,
"categories": categories,
"received_at": received_at,
"sent_at": sent_at,
"sender": {
"email": sender_email,
"name": sender_name,
"smtp": sender_smtp,
},
"to": to_raw,
"cc": cc_raw,
"bcc": bcc_raw,
"display_to": display_to,
"display_cc": display_cc,
"recipients": recipients,
"body_text": body_text,
"body_html": body_html,
"attachments": attachments,
"headers": headers,
"mapi": mapi_raw,
"parsed_at": datetime.now(timezone.utc).replace(tzinfo=None),
}
except Exception as e:
logging.error("extract_message failed [%s]: %s", msg_path.name, e)
return None
# ─── MongoDB indexy ───────────────────────────────────────────────────────────
def create_indexes(col):
print(" Vytvarim indexy...")
col.create_index([("received_at", ASCENDING)])
col.create_index([("sent_at", ASCENDING)])
col.create_index([("sender.email", ASCENDING)])
col.create_index([("filename", ASCENDING)], unique=True, sparse=True)
col.create_index([("conversation_topic", ASCENDING)])
col.create_index([("has_attachments", ASCENDING)])
col.create_index([("categories", ASCENDING)])
col.create_index([("importance", ASCENDING)])
col.create_index([("flag_status", ASCENDING)])
col.create_index([
("subject", TEXT),
("body_text", TEXT),
("to", TEXT),
("cc", TEXT),
], name="text_search", default_language="none")
print(" Indexy hotovy.")
# ─── MAIN ─────────────────────────────────────────────────────────────────────
def main():
ap = argparse.ArgumentParser(description=f"parse_emails v{SCRIPT_VERSION}")
ap.add_argument("--msgs-dir", default=str(MSGS_DIR),
help="Cesta k .msg souborum")
ap.add_argument("--limit", type=int, default=0,
help="Zpracovat max N souboru (0 = vse)")
ap.add_argument("--skip-existing", action="store_true",
help="Preskocit soubory ktere jiz jsou v MongoDB (pokracovani)")
ap.add_argument("--no-indexes", action="store_true",
help="Nevytvorit indexy na konci")
args = ap.parse_args()
msgs_dir = Path(args.msgs_dir)
start = datetime.now()
print(f"=== parse_emails v{SCRIPT_VERSION} ===")
print(f"Start: {start.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Zdroj: {msgs_dir}")
print(f"MongoDB: {MONGO_URI} -> {MONGO_DB}.{MONGO_COL}")
# MongoDB
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
try:
client.admin.command("ping")
print(" MongoDB OK")
except Exception as e:
print(f" CHYBA: MongoDB neni dostupna -- {e}")
sys.exit(1)
col = client[MONGO_DB][MONGO_COL]
# Skip existing — nacti seznam uz importovanych souboru
existing: set = set()
if args.skip_existing:
print(" Nacitam existujici zaznamy z MongoDB...")
existing = set(col.distinct("filename"))
print(f" {len(existing)} jiz importovano")
# Scan
print(f"\nSkenuji {msgs_dir} ...")
all_files = sorted(msgs_dir.glob("*.msg"))
if args.limit:
all_files = all_files[:args.limit]
to_process = [f for f in all_files if f.name not in existing]
skipped = len(all_files) - len(to_process)
total = len(to_process)
print(f" Celkem .msg: {len(all_files)}")
print(f" Preskoceno: {skipped}")
print(f" Ke zpracovani: {total}\n")
if total == 0:
print("Neni co importovat.")
client.close()
return
batch = []
ok_count = 0
err_count = 0
def flush():
if not batch:
return
try:
col.bulk_write(batch, ordered=False)
except Exception as e:
logging.error("bulk_write: %s", e)
print(f" CHYBA bulk_write: {e}")
batch.clear()
for i, msg_path in enumerate(to_process, 1):
doc = extract_message(msg_path)
if doc is None:
err_count += 1
else:
batch.append(UpdateOne({"_id": doc["_id"]}, {"$set": doc}, upsert=True))
ok_count += 1
if len(batch) >= BATCH_SIZE:
flush()
# Výpis každého emailu
status = "ERR " if doc is None else "OK "
subject_str = (doc.get("subject") or "")[:60] if doc else "?"
sender_str = (doc.get("sender", {}).get("email") or "")[:40] if doc else "?"
print(f" {i:>6}/{total} {status} {subject_str:<60} {sender_str}")
if i % 500 == 0:
elapsed = (datetime.now() - start).total_seconds()
rate = i / elapsed if elapsed > 0 else 0
eta_s = int((total - i) / rate) if rate > 0 else 0
print(f" {''*80}")
print(f" Průběh: ok={ok_count} err={err_count} "
f"{rate:.1f} msg/s ETA {eta_s//3600}h{(eta_s%3600)//60}m")
print(f" {''*80}")
flush()
elapsed_total = (datetime.now() - start).total_seconds()
print(f"\n{'='*52}")
print(f"Vysledek: ok={ok_count} | skip={skipped} | err={err_count}")
print(f"Celkovy cas: {int(elapsed_total//3600)}h {int((elapsed_total%3600)//60)}m {int(elapsed_total%60)}s")
print(f"Dokumentu v kolekci: {col.count_documents({})}")
if not args.no_indexes:
print()
create_indexes(col)
print(f"\nKonec: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
if err_count:
print(f"Chyby logovany do: {LOG_FILE}")
client.close()
if __name__ == "__main__":
main()