5545f05eee
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
661 lines
25 KiB
Python
661 lines
25 KiB
Python
"""
|
|
parse_emails_tower_v1.1.py
|
|
Nazev: parse_emails_tower_v1.1.py
|
|
Verze: 1.1
|
|
Datum: 2026-06-02
|
|
Autor: vladimir.buzalka
|
|
|
|
Popis:
|
|
Parsuje vsechny .msg soubory z MSGS_DIR a importuje je jako dokumenty
|
|
do MongoDB. Z kazdeho souboru extrahuje VSECHNY dostupne vlastnosti —
|
|
podobne jako EXIF u fotek:
|
|
|
|
- predmet, odesilatel, prijemci (To/CC/BCC s typy)
|
|
- cas doruceni a odeslani (UTC)
|
|
- telo plaintext + HTML (max 2 MB)
|
|
- prilohy (metadata: jmeno, velikost, MIME typ, inline flag)
|
|
- internet headers (X-Originating-IP, Received, DKIM, ...)
|
|
- MAPI vlastnosti: dulezitost, citlivost, priznak, konverzacni vlakno,
|
|
kategorie, In-Reply-To, References, ...
|
|
- vsechny raw MAPI properties jako {0xXXXX: value}
|
|
|
|
DB: emaily
|
|
Kolekce: vbuzalka@its.jnj.com
|
|
_id: Internet Message-ID (nebo "filename:<stem>" jako fallback)
|
|
|
|
Bezpecne prerusit a opakovat:
|
|
- upsert podle _id — duplicity se automaticky prepisi
|
|
- --skip-existing nacte seznam hotovych souboru z MongoDB a
|
|
preskoci je => pokracovani po preruseni bez ztraty prace
|
|
|
|
Prostredi:
|
|
Bezi v Docker containeru "python-runner" na Unraid Tower.
|
|
.msg soubory jsou dostupne jako lokalni disk (volume mount):
|
|
/mnt/user/JNJEMAILS -> /mnt/JNJEMAILS (uvnitr containeru)
|
|
MongoDB na 192.168.1.76:27017 (externi, bezi mimo container).
|
|
|
|
Spousteni (z Unraid terminalu):
|
|
# Test na 50 emailech:
|
|
docker exec -it python-runner python /scripts/parse_emails_tower_v1.1.py --limit 50 --no-indexes
|
|
|
|
# Kompletni import na pozadi (log do souboru):
|
|
docker exec -d python-runner bash -c \
|
|
"python /scripts/parse_emails_tower_v1.1.py > /scripts/parse_emails.log 2>&1"
|
|
|
|
# Pokracovani po preruseni:
|
|
docker exec -d python-runner bash -c \
|
|
"python /scripts/parse_emails_tower_v1.1.py --skip-existing > /scripts/parse_emails.log 2>&1"
|
|
|
|
# Sledovani prubehu:
|
|
docker exec -it python-runner tail -f /scripts/parse_emails.log
|
|
|
|
Vystup na konzoli:
|
|
Kazdy email na jednom radku:
|
|
<poradi>/<celkem> OK/ERR <predmet 60 znaku> <odesilatel>
|
|
Kazych 500 emailu: oddelovac s prubehem, rychlosti a ETA.
|
|
Na konci: souhrn ok/skip/err, celkovy cas, pocet dokumentu v kolekci.
|
|
|
|
Zavislosti (nainstalovane v Docker image python-runner):
|
|
extract-msg==0.55.0, pymongo, python-dateutil
|
|
Python 3.12, Linux (Docker container na Unraid Tower)
|
|
|
|
Struktura dokumentu v MongoDB:
|
|
_id Internet Message-ID (nebo filename: fallback)
|
|
filename jmeno .msg souboru (20znakovy hex + .msg)
|
|
subject predmet zpravy
|
|
normalized_subject predmet bez RE:/FW: prefixu
|
|
importance 0=nizka 1=normalni 2=vysoka
|
|
sensitivity 0=normalni 1=osobni 2=soukrome 3=duverne
|
|
flag_status 0=bez priznaku 1=oznaceno 2=dokonceno
|
|
read_receipt_requested bool
|
|
delivery_receipt_requested bool
|
|
has_attachments bool
|
|
attachment_count int
|
|
message_size_bytes velikost .msg souboru na disku
|
|
conversation_topic tema vlakna (PR_CONVERSATION_TOPIC)
|
|
conversation_index base64 PR_CONVERSATION_INDEX
|
|
in_reply_to Message-ID predchozi zpravy
|
|
internet_references [Message-ID] — cela historia vlakna
|
|
categories [str] — MAPI kategorie / stitky
|
|
read_receipt_requested bool
|
|
delivery_receipt_requested bool
|
|
received_at datetime UTC — cas doruceni
|
|
sent_at datetime UTC — cas odeslani
|
|
sender.email emailova adresa odesilatele
|
|
sender.name zobrazovane jmeno odesilatele
|
|
sender.smtp SMTP adresa (pro interni EX adresy)
|
|
to retezec To (tak jak v Outlooku)
|
|
cc retezec CC
|
|
bcc retezec BCC
|
|
display_to PR_DISPLAY_TO (zkraceny seznam)
|
|
display_cc PR_DISPLAY_CC
|
|
recipients [{type, email, name}] — to/cc/bcc s typy
|
|
body_text plain text telo
|
|
body_html HTML telo (max 2 MB, None pokud neni)
|
|
attachments [{filename, size_bytes, mime_type,
|
|
content_id, is_inline}]
|
|
headers dict internet headers (lowercase_s_podtrzitky)
|
|
mapi dict vsech raw MAPI properties {0xXXXX: value}
|
|
parsed_at datetime UTC — cas parsovani
|
|
|
|
Indexy (vytvoreny automaticky na konci):
|
|
received_at, sent_at, sender.email, filename (unique),
|
|
conversation_topic, has_attachments, categories, importance,
|
|
flag_status, text_search (subject + body_text + to + cc)
|
|
|
|
Chyby:
|
|
Soubory ktere selhaly jsou zalogiovany do parse_emails_errors.log
|
|
v adresari skriptu. Radek: timestamp | open/extract failed | duvod.
|
|
|
|
Historie verzi:
|
|
1.0 2026-06-01 Inicialni verze
|
|
1.1 2026-06-02 Nasazeni na Unraid Tower v Docker containeru python-runner;
|
|
MSGS_DIR zmeneno z SMB share na lokalni mount /mnt/JNJEMAILS;
|
|
aktualizovany popis spousteni pro docker exec
|
|
"""
|
|
|
|
import sys
|
|
import re
|
|
import logging
|
|
import argparse
|
|
import base64
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
import extract_msg
|
|
from dateutil import parser as dtparser
|
|
from pymongo import MongoClient, UpdateOne, ASCENDING, TEXT
|
|
|
|
if hasattr(sys.stdout, "reconfigure"):
|
|
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
|
|
|
|
# ─── KONFIGURACE ──────────────────────────────────────────────────────────────
|
|
MSGS_DIR = Path("/mnt/JNJEMAILS")
|
|
MONGO_URI = "mongodb://192.168.1.76:27017"
|
|
MONGO_DB = "emaily"
|
|
MONGO_COL = "vbuzalka@its.jnj.com"
|
|
BATCH_SIZE = 200
|
|
LOG_FILE = Path(__file__).parent / "parse_emails_errors.log"
|
|
SCRIPT_VERSION = "1.1"
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
logging.basicConfig(
|
|
filename=str(LOG_FILE),
|
|
level=logging.ERROR,
|
|
format="%(asctime)s | %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
|
|
# ─── Pomocné funkce ───────────────────────────────────────────────────────────
|
|
|
|
def safe(obj, *attrs, default=None):
|
|
"""Bezpecne cteni atributu — vrati prvni non-None hodnotu."""
|
|
for attr in attrs:
|
|
try:
|
|
val = getattr(obj, attr, None)
|
|
if val is None:
|
|
continue
|
|
if isinstance(val, str) and not val.strip():
|
|
continue
|
|
return val
|
|
except Exception:
|
|
continue
|
|
return default
|
|
|
|
|
|
def parse_date(raw) -> Optional[datetime]:
|
|
"""Libovolny datum -> UTC datetime bez tzinfo (pro MongoDB)."""
|
|
if raw is None:
|
|
return None
|
|
if isinstance(raw, datetime):
|
|
if raw.tzinfo:
|
|
return raw.astimezone(timezone.utc).replace(tzinfo=None)
|
|
return raw
|
|
try:
|
|
dt = dtparser.parse(str(raw))
|
|
if dt.tzinfo:
|
|
return dt.astimezone(timezone.utc).replace(tzinfo=None)
|
|
return dt
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def to_bson(val):
|
|
"""Konvertuje hodnotu na BSON-serializovatelny typ."""
|
|
if isinstance(val, bytes):
|
|
return val.hex() if len(val) <= 128 else f"<bytes:{len(val)}>"
|
|
if isinstance(val, datetime):
|
|
return parse_date(val)
|
|
if isinstance(val, (str, int, float, bool, type(None))):
|
|
return val
|
|
if isinstance(val, list):
|
|
return [to_bson(v) for v in val]
|
|
try:
|
|
return int(val)
|
|
except Exception:
|
|
pass
|
|
return str(val)
|
|
|
|
|
|
# ─── Extrakce částí zprávy ────────────────────────────────────────────────────
|
|
|
|
def extract_headers(msg) -> dict:
|
|
headers = {}
|
|
try:
|
|
hdr = msg.header
|
|
if not hdr:
|
|
return {}
|
|
from email.header import decode_header as _dh
|
|
|
|
def _decode(v: str) -> str:
|
|
try:
|
|
parts = _dh(v)
|
|
out = ""
|
|
for part, enc in parts:
|
|
out += part.decode(enc or "utf-8", errors="replace") if isinstance(part, bytes) else part
|
|
return out
|
|
except Exception:
|
|
return v
|
|
|
|
for key in set(hdr.keys()):
|
|
k = key.lower().replace("-", "_")
|
|
vals = [_decode(v) for v in hdr.get_all(key, [])]
|
|
headers[k] = vals if len(vals) > 1 else (vals[0] if vals else "")
|
|
except Exception as e:
|
|
logging.error("extract_headers: %s", e)
|
|
return headers
|
|
|
|
|
|
def extract_recipients(msg) -> list:
|
|
result = []
|
|
type_map = {1: "to", 2: "cc", 3: "bcc"}
|
|
try:
|
|
for r in msg.recipients:
|
|
rtype = getattr(r, "type", 1)
|
|
try:
|
|
rtype = int(rtype)
|
|
except Exception:
|
|
try:
|
|
rtype = int(rtype.value)
|
|
except Exception:
|
|
rtype = 1
|
|
rec = {
|
|
"type": type_map.get(rtype, "to"),
|
|
"email": safe(r, "email", default=""),
|
|
"name": safe(r, "name", default=""),
|
|
}
|
|
result.append(rec)
|
|
except Exception as e:
|
|
logging.error("extract_recipients: %s", e)
|
|
return result
|
|
|
|
|
|
def extract_attachments(msg) -> list:
|
|
result = []
|
|
try:
|
|
for att in msg.attachments:
|
|
fname = safe(att, "longFilename", "shortFilename", default="")
|
|
if not fname:
|
|
continue
|
|
size = 0
|
|
try:
|
|
d = att.data
|
|
size = len(d) if d else 0
|
|
except Exception:
|
|
pass
|
|
result.append({
|
|
"filename": fname,
|
|
"size_bytes": size,
|
|
"mime_type": safe(att, "mimetype", "mimeType", default="application/octet-stream"),
|
|
"content_id": safe(att, "cid", default=None),
|
|
"is_inline": bool(safe(att, "isInline", default=False)),
|
|
})
|
|
except Exception as e:
|
|
logging.error("extract_attachments: %s", e)
|
|
return result
|
|
|
|
|
|
def extract_mapi_props(msg) -> dict:
|
|
"""Vsechny raw MAPI properties jako {0xXXXX: value}."""
|
|
result = {}
|
|
try:
|
|
props = msg.props
|
|
if not hasattr(props, "items"):
|
|
return {}
|
|
for key, prop in props.items():
|
|
try:
|
|
val = to_bson(prop.value)
|
|
prop_id = f"0x{key[:4].upper()}" if len(key) >= 4 else f"0x{key.upper()}"
|
|
result[prop_id] = val
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
logging.error("extract_mapi_props: %s", e)
|
|
return result
|
|
|
|
|
|
# ─── Hlavní extrakce ─────────────────────────────────────────────────────────
|
|
|
|
def extract_message(msg_path: Path) -> Optional[dict]:
|
|
"""Parsuje jeden .msg soubor -> MongoDB dokument."""
|
|
try:
|
|
msg = extract_msg.Message(str(msg_path))
|
|
except Exception as e:
|
|
logging.error("open failed [%s]: %s", msg_path.name, e)
|
|
return None
|
|
|
|
try:
|
|
# ── Message-ID ────────────────────────────────────────────────
|
|
mid = None
|
|
for attr in ("messageId", "message_id", "internetMessageId"):
|
|
mid = safe(msg, attr)
|
|
if mid:
|
|
break
|
|
if not mid:
|
|
mid = f"filename:{msg_path.stem}"
|
|
mid = str(mid).strip()
|
|
|
|
# ── Předmět ───────────────────────────────────────────────────
|
|
try:
|
|
subject = msg.subject or ""
|
|
except Exception:
|
|
subject = ""
|
|
|
|
normalized_subject = safe(msg, "normalizedSubject", "normalized_subject", default="")
|
|
|
|
# ── Tělo ──────────────────────────────────────────────────────
|
|
try:
|
|
body_text = msg.body or ""
|
|
except Exception:
|
|
body_text = ""
|
|
|
|
body_html = None
|
|
try:
|
|
bh = msg.htmlBody
|
|
if isinstance(bh, bytes):
|
|
bh = bh.decode("utf-8", errors="replace")
|
|
if bh:
|
|
body_html = bh if len(bh) <= 2 * 1024 * 1024 else bh[:2 * 1024 * 1024]
|
|
except Exception:
|
|
pass
|
|
|
|
# ── Odesílatel ────────────────────────────────────────────────
|
|
try:
|
|
sender_email = msg.sender or ""
|
|
except Exception:
|
|
sender_email = ""
|
|
|
|
sender_name = safe(msg, "senderName", "sender_name", default="")
|
|
sender_smtp = safe(msg, "senderSmtpAddress", "sent_representing_smtp_address", default="")
|
|
|
|
# ── Příjemci ──────────────────────────────────────────────────
|
|
recipients = extract_recipients(msg)
|
|
|
|
try:
|
|
to_raw = msg.to or ""
|
|
except Exception:
|
|
to_raw = ""
|
|
try:
|
|
cc_raw = msg.cc or ""
|
|
except Exception:
|
|
cc_raw = ""
|
|
try:
|
|
bcc_raw = getattr(msg, "bcc", None) or ""
|
|
except Exception:
|
|
bcc_raw = ""
|
|
|
|
display_to = safe(msg, "displayTo", "display_to", default="")
|
|
display_cc = safe(msg, "displayCc", "display_cc", default="")
|
|
|
|
# ── Časy ──────────────────────────────────────────────────────
|
|
try:
|
|
received_at = parse_date(msg.date)
|
|
except Exception:
|
|
received_at = None
|
|
|
|
sent_at = None
|
|
for attr in ("clientSubmitTime", "client_submit_time", "sentOn"):
|
|
v = safe(msg, attr)
|
|
if v:
|
|
sent_at = parse_date(v)
|
|
break
|
|
|
|
# ── MAPI vlastnosti ───────────────────────────────────────────
|
|
importance = 1
|
|
try:
|
|
v = msg.importance
|
|
if v is not None:
|
|
importance = int(v)
|
|
except Exception:
|
|
pass
|
|
|
|
sensitivity = 0
|
|
try:
|
|
v = getattr(msg, "sensitivity", None)
|
|
if v is not None:
|
|
sensitivity = int(v)
|
|
except Exception:
|
|
pass
|
|
|
|
flag_status = 0
|
|
try:
|
|
v = safe(msg, "flagStatus", "flag_status")
|
|
if v is not None:
|
|
flag_status = int(v)
|
|
except Exception:
|
|
pass
|
|
|
|
conversation_topic = safe(msg, "conversationTopic", "conversation_topic", default="")
|
|
|
|
conversation_index = ""
|
|
try:
|
|
ci = safe(msg, "conversationIndex", "conversation_index")
|
|
if isinstance(ci, bytes):
|
|
conversation_index = base64.b64encode(ci).decode()
|
|
elif ci:
|
|
conversation_index = str(ci)
|
|
except Exception:
|
|
pass
|
|
|
|
in_reply_to = safe(msg, "inReplyTo", "in_reply_to", default="")
|
|
|
|
internet_refs = []
|
|
try:
|
|
refs = safe(msg, "internetReferences", "internet_references")
|
|
if isinstance(refs, list):
|
|
internet_refs = refs
|
|
elif isinstance(refs, str) and refs:
|
|
internet_refs = [r.strip() for r in refs.split() if r.strip()]
|
|
except Exception:
|
|
pass
|
|
|
|
categories = []
|
|
try:
|
|
cats = safe(msg, "categories")
|
|
if isinstance(cats, list):
|
|
categories = [str(c) for c in cats if c]
|
|
elif isinstance(cats, str) and cats:
|
|
categories = [c.strip() for c in re.split(r"[;,]", cats) if c.strip()]
|
|
except Exception:
|
|
pass
|
|
|
|
read_receipt = bool(safe(msg, "readReceiptRequested", "read_receipt_requested", default=False))
|
|
delivery_receipt = bool(safe(msg, "deliveryReceiptRequested", "delivery_receipt_requested", default=False))
|
|
|
|
# ── Internet headers ──────────────────────────────────────────
|
|
headers = extract_headers(msg)
|
|
|
|
if not in_reply_to:
|
|
in_reply_to = headers.get("in_reply_to", "")
|
|
if not internet_refs:
|
|
refs_str = headers.get("references", "")
|
|
if isinstance(refs_str, str) and refs_str:
|
|
internet_refs = [r.strip() for r in refs_str.split() if r.strip()]
|
|
|
|
# ── Přílohy ───────────────────────────────────────────────────
|
|
attachments = extract_attachments(msg)
|
|
|
|
# ── Raw MAPI ──────────────────────────────────────────────────
|
|
mapi_raw = extract_mapi_props(msg)
|
|
|
|
msg.close()
|
|
|
|
# ── Dokument ──────────────────────────────────────────────────
|
|
return {
|
|
"_id": mid,
|
|
"filename": msg_path.name,
|
|
|
|
"subject": subject,
|
|
"normalized_subject": normalized_subject,
|
|
"importance": importance,
|
|
"sensitivity": sensitivity,
|
|
"flag_status": flag_status,
|
|
"read_receipt_requested": read_receipt,
|
|
"delivery_receipt_requested": delivery_receipt,
|
|
"has_attachments": len(attachments) > 0,
|
|
"attachment_count": len(attachments),
|
|
"message_size_bytes": msg_path.stat().st_size,
|
|
|
|
"conversation_topic": conversation_topic,
|
|
"conversation_index": conversation_index,
|
|
"in_reply_to": in_reply_to,
|
|
"internet_references": internet_refs,
|
|
"categories": categories,
|
|
|
|
"received_at": received_at,
|
|
"sent_at": sent_at,
|
|
|
|
"sender": {
|
|
"email": sender_email,
|
|
"name": sender_name,
|
|
"smtp": sender_smtp,
|
|
},
|
|
"to": to_raw,
|
|
"cc": cc_raw,
|
|
"bcc": bcc_raw,
|
|
"display_to": display_to,
|
|
"display_cc": display_cc,
|
|
"recipients": recipients,
|
|
|
|
"body_text": body_text,
|
|
"body_html": body_html,
|
|
|
|
"attachments": attachments,
|
|
"headers": headers,
|
|
"mapi": mapi_raw,
|
|
|
|
"parsed_at": datetime.now(timezone.utc).replace(tzinfo=None),
|
|
}
|
|
|
|
except Exception as e:
|
|
logging.error("extract_message failed [%s]: %s", msg_path.name, e)
|
|
return None
|
|
|
|
|
|
# ─── MongoDB indexy ───────────────────────────────────────────────────────────
|
|
|
|
def create_indexes(col):
|
|
print(" Vytvarim indexy...")
|
|
col.create_index([("received_at", ASCENDING)])
|
|
col.create_index([("sent_at", ASCENDING)])
|
|
col.create_index([("sender.email", ASCENDING)])
|
|
col.create_index([("filename", ASCENDING)], unique=True, sparse=True)
|
|
col.create_index([("conversation_topic", ASCENDING)])
|
|
col.create_index([("has_attachments", ASCENDING)])
|
|
col.create_index([("categories", ASCENDING)])
|
|
col.create_index([("importance", ASCENDING)])
|
|
col.create_index([("flag_status", ASCENDING)])
|
|
col.create_index([
|
|
("subject", TEXT),
|
|
("body_text", TEXT),
|
|
("to", TEXT),
|
|
("cc", TEXT),
|
|
], name="text_search", default_language="none")
|
|
print(" Indexy hotovy.")
|
|
|
|
|
|
# ─── MAIN ─────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description=f"parse_emails v{SCRIPT_VERSION}")
|
|
ap.add_argument("--msgs-dir", default=str(MSGS_DIR),
|
|
help="Cesta k .msg souborum")
|
|
ap.add_argument("--limit", type=int, default=0,
|
|
help="Zpracovat max N souboru (0 = vse)")
|
|
ap.add_argument("--skip-existing", action="store_true",
|
|
help="Preskocit soubory ktere jiz jsou v MongoDB (pokracovani)")
|
|
ap.add_argument("--no-indexes", action="store_true",
|
|
help="Nevytvorit indexy na konci")
|
|
args = ap.parse_args()
|
|
|
|
msgs_dir = Path(args.msgs_dir)
|
|
start = datetime.now()
|
|
|
|
print(f"=== parse_emails v{SCRIPT_VERSION} ===")
|
|
print(f"Start: {start.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print(f"Zdroj: {msgs_dir}")
|
|
print(f"MongoDB: {MONGO_URI} -> {MONGO_DB}.{MONGO_COL}")
|
|
|
|
# MongoDB
|
|
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
|
|
try:
|
|
client.admin.command("ping")
|
|
print(" MongoDB OK")
|
|
except Exception as e:
|
|
print(f" CHYBA: MongoDB neni dostupna -- {e}")
|
|
sys.exit(1)
|
|
|
|
col = client[MONGO_DB][MONGO_COL]
|
|
|
|
# Skip existing — nacti seznam uz importovanych souboru
|
|
existing: set = set()
|
|
if args.skip_existing:
|
|
print(" Nacitam existujici zaznamy z MongoDB...")
|
|
existing = set(col.distinct("filename"))
|
|
print(f" {len(existing)} jiz importovano")
|
|
|
|
# Scan
|
|
print(f"\nSkenuji {msgs_dir} ...")
|
|
all_files = sorted(msgs_dir.glob("*.msg"))
|
|
if args.limit:
|
|
all_files = all_files[:args.limit]
|
|
|
|
to_process = [f for f in all_files if f.name not in existing]
|
|
skipped = len(all_files) - len(to_process)
|
|
total = len(to_process)
|
|
|
|
print(f" Celkem .msg: {len(all_files)}")
|
|
print(f" Preskoceno: {skipped}")
|
|
print(f" Ke zpracovani: {total}\n")
|
|
|
|
if total == 0:
|
|
print("Neni co importovat.")
|
|
client.close()
|
|
return
|
|
|
|
batch = []
|
|
ok_count = 0
|
|
err_count = 0
|
|
|
|
def flush():
|
|
if not batch:
|
|
return
|
|
try:
|
|
col.bulk_write(batch, ordered=False)
|
|
except Exception as e:
|
|
logging.error("bulk_write: %s", e)
|
|
print(f" CHYBA bulk_write: {e}")
|
|
batch.clear()
|
|
|
|
for i, msg_path in enumerate(to_process, 1):
|
|
doc = extract_message(msg_path)
|
|
|
|
if doc is None:
|
|
err_count += 1
|
|
else:
|
|
batch.append(UpdateOne({"_id": doc["_id"]}, {"$set": doc}, upsert=True))
|
|
ok_count += 1
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
flush()
|
|
|
|
# Výpis každého emailu
|
|
status = "ERR " if doc is None else "OK "
|
|
subject_str = (doc.get("subject") or "")[:60] if doc else "?"
|
|
sender_str = (doc.get("sender", {}).get("email") or "")[:40] if doc else "?"
|
|
print(f" {i:>6}/{total} {status} {subject_str:<60} {sender_str}")
|
|
|
|
if i % 500 == 0:
|
|
elapsed = (datetime.now() - start).total_seconds()
|
|
rate = i / elapsed if elapsed > 0 else 0
|
|
eta_s = int((total - i) / rate) if rate > 0 else 0
|
|
print(f" {'─'*80}")
|
|
print(f" Průběh: ok={ok_count} err={err_count} "
|
|
f"{rate:.1f} msg/s ETA {eta_s//3600}h{(eta_s%3600)//60}m")
|
|
print(f" {'─'*80}")
|
|
|
|
flush()
|
|
|
|
elapsed_total = (datetime.now() - start).total_seconds()
|
|
print(f"\n{'='*52}")
|
|
print(f"Vysledek: ok={ok_count} | skip={skipped} | err={err_count}")
|
|
print(f"Celkovy cas: {int(elapsed_total//3600)}h {int((elapsed_total%3600)//60)}m {int(elapsed_total%60)}s")
|
|
print(f"Dokumentu v kolekci: {col.count_documents({})}")
|
|
|
|
if not args.no_indexes:
|
|
print()
|
|
create_indexes(col)
|
|
|
|
print(f"\nKonec: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
if err_count:
|
|
print(f"Chyby logovany do: {LOG_FILE}")
|
|
|
|
client.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|