""" parse_emails_tower_v1.3.py Nazev: parse_emails_tower_v1.3.py Verze: 1.3 Datum: 2026-06-08 Autor: vladimir.buzalka Popis: Parsuje vsechny .msg soubory z MSGS_DIR a importuje je jako dokumenty do MongoDB. Z kazdeho souboru extrahuje VSECHNY dostupne vlastnosti — podobne jako EXIF u fotek: - predmet, odesilatel, prijemci (To/CC/BCC s typy) - cas doruceni a odeslani (UTC) - telo plaintext + HTML (max 2 MB) - prilohy (metadata: jmeno, velikost, MIME typ, inline flag) - internet headers (X-Originating-IP, Received, DKIM, ...) - MAPI vlastnosti: dulezitost, citlivost, priznak, konverzacni vlakno, kategorie, In-Reply-To, References, ... - vsechny raw MAPI properties jako {0xXXXX: value} DB: emaily Kolekce: vbuzalka@its.jnj.com _id: Internet Message-ID (nebo "filename:" jako fallback) Bezpecne prerusit a opakovat: - upsert podle _id — duplicity se automaticky prepisi - --skip-existing nacte seznam hotovych souboru z MongoDB a preskoci je => pokracovani po preruseni bez ztraty prace Prostredi: Bezi v Docker containeru "python-runner" na Unraid Tower. .msg soubory jsou dostupne jako lokalni disk (volume mount): /mnt/user/JNJEMAILS -> /mnt/JNJEMAILS (uvnitr containeru) MongoDB na 192.168.1.76:27017 (externi, bezi mimo container). Spousteni (z Unraid terminalu): # Test na 50 emailech: docker exec -it python-runner python /scripts/parse_emails_tower_v1.3.py --limit 50 --no-indexes # Kompletni import na pozadi (samostatny log, ne sdileny s Graph importem): docker exec -d python-runner bash -c \ "python /scripts/parse_emails_tower_v1.3.py > /scripts/parse_emails_tower.log 2>&1" # Pokracovani po preruseni: docker exec -d python-runner bash -c \ "python /scripts/parse_emails_tower_v1.3.py --skip-existing > /scripts/parse_emails_tower.log 2>&1" # Sledovani prubehu: docker exec -it python-runner tail -f /scripts/parse_emails_tower.log Vystup na konzoli: Kazdy email na jednom radku: / OK/ERR Kazych 500 emailu: oddelovac s prubehem, rychlosti a ETA. Na konci: souhrn ok/skip/err, celkovy cas, pocet dokumentu v kolekci. Zavislosti (nainstalovane v Docker image python-runner): extract-msg==0.55.0, olefile, pymongo, python-dateutil Python 3.12, Linux (Docker container na Unraid Tower) (olefile je tranzitivni zavislost extract-msg, raw-OLE fallback ji pouziva primo) Struktura dokumentu v MongoDB: _id Internet Message-ID (nebo filename: fallback) filename jmeno .msg souboru (20znakovy hex + .msg) subject predmet zpravy normalized_subject predmet bez RE:/FW: prefixu importance 0=nizka 1=normalni 2=vysoka sensitivity 0=normalni 1=osobni 2=soukrome 3=duverne flag_status 0=bez priznaku 1=oznaceno 2=dokonceno read_receipt_requested bool delivery_receipt_requested bool has_attachments bool attachment_count int message_size_bytes velikost .msg souboru na disku conversation_topic tema vlakna (PR_CONVERSATION_TOPIC) conversation_index base64 PR_CONVERSATION_INDEX in_reply_to Message-ID predchozi zpravy internet_references [Message-ID] — cela historia vlakna categories [str] — MAPI kategorie / stitky read_receipt_requested bool delivery_receipt_requested bool received_at datetime UTC — cas doruceni sent_at datetime UTC — cas odeslani sender.email emailova adresa odesilatele sender.name zobrazovane jmeno odesilatele sender.smtp SMTP adresa (pro interni EX adresy) to retezec To (tak jak v Outlooku) cc retezec CC bcc retezec BCC display_to PR_DISPLAY_TO (zkraceny seznam) display_cc PR_DISPLAY_CC recipients [{type, email, name}] — to/cc/bcc s typy body_text plain text telo body_html HTML telo (max 2 MB, None pokud neni) attachments [{filename, size_bytes, mime_type, content_id, is_inline}] headers dict internet headers (lowercase_s_podtrzitky) mapi dict vsech raw MAPI properties {0xXXXX: value} parsed_at datetime UTC — cas parsovani Indexy (vytvoreny automaticky na konci): received_at, sent_at, sender.email, filename (unique), conversation_topic, has_attachments, categories, importance, flag_status, text_search (subject + body_text + to + cc) Chyby: Soubory ktere selhaly jsou zalogovany do parse_emails_tower_errors.log v adresari skriptu (SAMOSTATNY log, oddeleny od Graph importu). Radek: timestamp | open/extract failed | duvod. Historie verzi: 1.0 2026-06-01 Inicialni verze 1.1 2026-06-02 Nasazeni na Unraid Tower v Docker containeru python-runner; MSGS_DIR zmeneno z SMB share na lokalni mount /mnt/JNJEMAILS; aktualizovany popis spousteni pro docker exec 1.2 2026-06-08 OPRAVA: to_bson prevadi int mimo rozsah int64 na string (BSON umi jen 8-byte ints) — drive cely bulk_write spadl na 'MongoDB can only handle up to 8-byte ints' a zahodil celou davku 200 dokumentu (v1.1 beh 8.6. neulozil NIC). flush() ma fallback per-dokument: vadny zaznam zahodi sam, ne celou davku. bool() testovan pred int(). Samostatny error log parse_emails_tower_errors.log a stdout log parse_emails_tower.log (drive sdilene s Graph importem — bordel v logu). 1.3 2026-06-08 ZACHRANA drive selhavajicich .msg (cca 126 z behu 8.6.): - open_message(): kaskadove otevreni normal -> SUPPRESS_ALL (vadne prilohy) -> +overrideEncoding Resi 'Attachment method missing' i 'not an MSG file'. - raw-OLE fallback: kdyz extract_msg vrati prazdno/� (vnoreny email, codepage 1200 lze byt cp1250/gb2312), klicova pole (subject/sender/body/html) se doctou PRIMO z OLE streamu s kaskadovym dekodovanim (utf-8 strict -> CPID -> cp1250 ...). Hlavickam o kodovani se neveri (casto si protireci). - nova pole: parse_mode (normal/suppress_all/override:ENC), parse_degraded (bool). """ import sys import re import logging import argparse import base64 import struct from pathlib import Path from datetime import datetime, timezone from typing import Optional import extract_msg from extract_msg.enums import ErrorBehavior import olefile from dateutil import parser as dtparser from pymongo import MongoClient, UpdateOne, ASCENDING, TEXT if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8", errors="replace") # ─── KONFIGURACE ────────────────────────────────────────────────────────────── MSGS_DIR = Path("/mnt/JNJEMAILS") MONGO_URI = "mongodb://192.168.1.76:27017" MONGO_DB = "emaily" MONGO_COL = "vbuzalka@its.jnj.com" BATCH_SIZE = 200 LOG_FILE = Path(__file__).parent / "parse_emails_tower_errors.log" SCRIPT_VERSION = "1.2" # ────────────────────────────────────────────────────────────────────────────── logging.basicConfig( filename=str(LOG_FILE), level=logging.ERROR, format="%(asctime)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", encoding="utf-8", ) # ─── Pomocné funkce ─────────────────────────────────────────────────────────── def safe(obj, *attrs, default=None): """Bezpecne cteni atributu — vrati prvni non-None hodnotu.""" for attr in attrs: try: val = getattr(obj, attr, None) if val is None: continue if isinstance(val, str) and not val.strip(): continue return val except Exception: continue return default def parse_date(raw) -> Optional[datetime]: """Libovolny datum -> UTC datetime bez tzinfo (pro MongoDB).""" if raw is None: return None if isinstance(raw, datetime): if raw.tzinfo: return raw.astimezone(timezone.utc).replace(tzinfo=None) return raw try: dt = dtparser.parse(str(raw)) if dt.tzinfo: return dt.astimezone(timezone.utc).replace(tzinfo=None) return dt except Exception: return None _INT64_MIN, _INT64_MAX = -(2 ** 63), 2 ** 63 - 1 def to_bson(val): """Konvertuje hodnotu na BSON-serializovatelny typ. Pozor: BSON umi jen signed int64. Python ma neomezene integery, takze velke MAPI hodnoty (PR_CHANGE_KEY, FILETIME, 64-bit handle) mimo rozsah int64 prevadime na string — jinak cely bulk_write spadne na 'MongoDB can only handle up to 8-byte ints'. """ # bool musi byt PRED int (isinstance(True, int) == True) if isinstance(val, bool): return val if isinstance(val, bytes): return val.hex() if len(val) <= 128 else f"" if isinstance(val, datetime): return parse_date(val) if isinstance(val, int): return val if _INT64_MIN <= val <= _INT64_MAX else str(val) if isinstance(val, (str, float, type(None))): return val if isinstance(val, list): return [to_bson(v) for v in val] try: iv = int(val) return iv if _INT64_MIN <= iv <= _INT64_MAX else str(iv) except Exception: pass return str(val) # ─── Extrakce částí zprávy ──────────────────────────────────────────────────── def extract_headers(msg) -> dict: headers = {} try: hdr = msg.header if not hdr: return {} from email.header import decode_header as _dh def _decode(v: str) -> str: try: parts = _dh(v) out = "" for part, enc in parts: out += part.decode(enc or "utf-8", errors="replace") if isinstance(part, bytes) else part return out except Exception: return v for key in set(hdr.keys()): k = key.lower().replace("-", "_") vals = [_decode(v) for v in hdr.get_all(key, [])] headers[k] = vals if len(vals) > 1 else (vals[0] if vals else "") except Exception as e: logging.error("extract_headers: %s", e) return headers def extract_recipients(msg) -> list: result = [] type_map = {1: "to", 2: "cc", 3: "bcc"} try: for r in msg.recipients: rtype = getattr(r, "type", 1) try: rtype = int(rtype) except Exception: try: rtype = int(rtype.value) except Exception: rtype = 1 rec = { "type": type_map.get(rtype, "to"), "email": safe(r, "email", default=""), "name": safe(r, "name", default=""), } result.append(rec) except Exception as e: logging.error("extract_recipients: %s", e) return result def extract_attachments(msg) -> list: result = [] try: for att in msg.attachments: fname = safe(att, "longFilename", "shortFilename", default="") if not fname: continue size = 0 try: d = att.data size = len(d) if d else 0 except Exception: pass result.append({ "filename": fname, "size_bytes": size, "mime_type": safe(att, "mimetype", "mimeType", default="application/octet-stream"), "content_id": safe(att, "cid", default=None), "is_inline": bool(safe(att, "isInline", default=False)), }) except Exception as e: logging.error("extract_attachments: %s", e) return result def extract_mapi_props(msg) -> dict: """Vsechny raw MAPI properties jako {0xXXXX: value}.""" result = {} try: props = msg.props if not hasattr(props, "items"): return {} for key, prop in props.items(): try: val = to_bson(prop.value) prop_id = f"0x{key[:4].upper()}" if len(key) >= 4 else f"0x{key.upper()}" result[prop_id] = val except Exception: pass except Exception as e: logging.error("extract_mapi_props: %s", e) return result # ─── Tolerantní otevírání a raw-OLE fallback ───────────────────────────────── # # Nektere .msg extract_msg neumi: (a) vadna priloha bez PR_ATTACH_METHOD, # (b) telo deklaruje codepage 1200 (UTF-16) ale bajty jsou cp1250/gb2312, # (c) vnoreny email ("not an MSG file") — extract_msg vrati prazdne pole. # Data v souboru ale jsou. Otevreme tolerantne a degradovana textova pole # docteme PRIMO z OLE streamu s kaskadovym dekodovanim (hlavickam se neveri). # Windows codepage -> python codec (PR_INTERNET_CPID / PR_MESSAGE_CODEPAGE) _CPID_TO_CODEC = { 1250: "cp1250", 1251: "cp1251", 1252: "cp1252", 1253: "cp1253", 1254: "cp1254", 1255: "cp1255", 1256: "cp1256", 1257: "cp1257", 1258: "cp1258", 874: "cp874", 932: "shift_jis", 936: "gb2312", 949: "euc_kr", 950: "big5", 65001: "utf-8", 28591: "iso-8859-1", 28592: "iso-8859-2", 20127: "ascii", } def _read_u32_prop(ole, propid): """Precte 32-bit hodnotu MAPI property z top-level __properties_version1.0.""" try: data = ole.openstream("__properties_version1.0").read() except Exception: return None body = data[32:] # 32-bajtova hlavicka top-level property streamu for i in range(0, len(body) - 16 + 1, 16): rec = body[i:i + 16] tag = struct.unpack("> 16) & 0xFFFF) == propid: return struct.unpack(" Optional[str]: """Codec dle PR_INTERNET_CPID / PR_MESSAGE_CODEPAGE (jako napoveda, ne dogma).""" for pid in (0x3FDE, 0x3FFD): # INTERNET_CPID, MESSAGE_CODEPAGE codec = _CPID_TO_CODEC.get(_read_u32_prop(ole, pid)) # utf-8/ascii nejsou dobry hint pro 8-bit stream (casto lzou) if codec and codec not in ("utf-8", "ascii"): return codec return None def _cascade_decode(raw: bytes, is_unicode: bool, cpid_codec: Optional[str]) -> str: """Dekoduje bajty MAPI stringu. Hlavickam se neveri — zkousime striktne v poradi priorit a vezmeme prvni, co projde bez chyby.""" if not raw: return "" if is_unicode: # PT_UNICODE = utf-16-le try: return raw.decode("utf-16-le") except Exception: return raw.decode("utf-16-le", errors="replace") order = ["utf-8"] # utf-8 strict = silny rozlisovac if cpid_codec: order.append(cpid_codec) order += ["cp1250", "cp1252", "gb2312", "big5"] for enc in order: try: return raw.decode(enc, errors="strict") except Exception: continue return raw.decode("latin-1", errors="replace") # nikdy nespadne def _raw_mapi_strings(msg_path: Path) -> dict: """Cte klicova textova MAPI pole PRIMO z OLE (mimo extract_msg). Pouzije se jen kdyz extract_msg vrati degradovane pole.""" out = {"subject": "", "normalized_subject": "", "sender_name": "", "sender_email": "", "sender_smtp": "", "body_text": "", "body_html": ""} try: ole = olefile.OleFileIO(str(msg_path)) except Exception: return out try: cpid = _detect_cpid(ole) wanted = { # MAPI tag -> klic v out "0037": "subject", "0E1D": "normalized_subject", "0C1A": "sender_name", "5D01": "sender_smtp", "0C1F": "sender_email", "1000": "body_text", "1013": "body_html", } prefix = "__substg1.0_" found = {} # key -> (priorita_typu, hodnota) for entry in ole.listdir(): if len(entry) != 1: # jen top-level (ne vnorene zpravy) continue name = entry[0] if not name.startswith(prefix): continue tag = name[len(prefix):len(prefix) + 4].upper() key = wanted.get(tag) if not key: continue typ = name[-4:].upper() prio = {"001F": 3, "001E": 2, "0102": 1}.get(typ, 0) if prio == 0: continue prev = found.get(key) if prev and prev[0] >= prio: # preferuj unicode > ansi > binarni continue try: raw = ole.openstream(entry).read() val = _cascade_decode(raw, typ == "001F", cpid) except Exception: continue found[key] = (prio, val) for key, (_, val) in found.items(): out[key] = val finally: ole.close() return out def _degraded(s) -> bool: """Pole je degradovane: prazdne nebo obsahuje U+FFFD (nahradni znak).""" return (not s) or ("�" in s) def open_message(msg_path: Path): """Kaskadove otevreni .msg -> (msg, mode) nebo (None, None). normal bezna cesta suppress_all tolerantni k vadnym prilohum override:ENC tolerantni + vnuceny encoding dle codepage property """ try: return extract_msg.Message(str(msg_path)), "normal" except Exception: pass try: return extract_msg.Message( str(msg_path), errorBehavior=ErrorBehavior.SUPPRESS_ALL), "suppress_all" except Exception: pass encs = [] try: ole = olefile.OleFileIO(str(msg_path)) c = _detect_cpid(ole) ole.close() if c: encs.append(c) except Exception: pass for e in encs + ["cp1250", "cp1252"]: try: return extract_msg.Message( str(msg_path), errorBehavior=ErrorBehavior.SUPPRESS_ALL, overrideEncoding=e), f"override:{e}" except Exception: continue return None, None # ─── Hlavní extrakce ───────────────────────────────────────────────────────── def extract_message(msg_path: Path) -> Optional[dict]: """Parsuje jeden .msg soubor -> MongoDB dokument.""" msg, parse_mode = open_message(msg_path) if msg is None: logging.error("open failed [%s]: vsechny pokusy o otevreni selhaly", msg_path.name) return None try: # ── Message-ID ──────────────────────────────────────────────── mid = None for attr in ("messageId", "message_id", "internetMessageId"): mid = safe(msg, attr) if mid: break if not mid: mid = f"filename:{msg_path.stem}" mid = str(mid).strip() # ── Předmět ─────────────────────────────────────────────────── try: subject = msg.subject or "" except Exception: subject = "" normalized_subject = safe(msg, "normalizedSubject", "normalized_subject", default="") # ── Tělo ────────────────────────────────────────────────────── try: body_text = msg.body or "" except Exception: body_text = "" body_html = None try: bh = msg.htmlBody if isinstance(bh, bytes): bh = bh.decode("utf-8", errors="replace") if bh: body_html = bh if len(bh) <= 2 * 1024 * 1024 else bh[:2 * 1024 * 1024] except Exception: pass # ── Odesílatel ──────────────────────────────────────────────── try: sender_email = msg.sender or "" except Exception: sender_email = "" sender_name = safe(msg, "senderName", "sender_name", default="") sender_smtp = safe(msg, "senderSmtpAddress", "sent_representing_smtp_address", default="") # ── Příjemci ────────────────────────────────────────────────── recipients = extract_recipients(msg) try: to_raw = msg.to or "" except Exception: to_raw = "" try: cc_raw = msg.cc or "" except Exception: cc_raw = "" try: bcc_raw = getattr(msg, "bcc", None) or "" except Exception: bcc_raw = "" display_to = safe(msg, "displayTo", "display_to", default="") display_cc = safe(msg, "displayCc", "display_cc", default="") # ── Časy ────────────────────────────────────────────────────── try: received_at = parse_date(msg.date) except Exception: received_at = None sent_at = None for attr in ("clientSubmitTime", "client_submit_time", "sentOn"): v = safe(msg, attr) if v: sent_at = parse_date(v) break # ── MAPI vlastnosti ─────────────────────────────────────────── importance = 1 try: v = msg.importance if v is not None: importance = int(v) except Exception: pass sensitivity = 0 try: v = getattr(msg, "sensitivity", None) if v is not None: sensitivity = int(v) except Exception: pass flag_status = 0 try: v = safe(msg, "flagStatus", "flag_status") if v is not None: flag_status = int(v) except Exception: pass conversation_topic = safe(msg, "conversationTopic", "conversation_topic", default="") conversation_index = "" try: ci = safe(msg, "conversationIndex", "conversation_index") if isinstance(ci, bytes): conversation_index = base64.b64encode(ci).decode() elif ci: conversation_index = str(ci) except Exception: pass in_reply_to = safe(msg, "inReplyTo", "in_reply_to", default="") internet_refs = [] try: refs = safe(msg, "internetReferences", "internet_references") if isinstance(refs, list): internet_refs = refs elif isinstance(refs, str) and refs: internet_refs = [r.strip() for r in refs.split() if r.strip()] except Exception: pass categories = [] try: cats = safe(msg, "categories") if isinstance(cats, list): categories = [str(c) for c in cats if c] elif isinstance(cats, str) and cats: categories = [c.strip() for c in re.split(r"[;,]", cats) if c.strip()] except Exception: pass read_receipt = bool(safe(msg, "readReceiptRequested", "read_receipt_requested", default=False)) delivery_receipt = bool(safe(msg, "deliveryReceiptRequested", "delivery_receipt_requested", default=False)) # ── Internet headers ────────────────────────────────────────── headers = extract_headers(msg) if not in_reply_to: in_reply_to = headers.get("in_reply_to", "") if not internet_refs: refs_str = headers.get("references", "") if isinstance(refs_str, str) and refs_str: internet_refs = [r.strip() for r in refs_str.split() if r.strip()] # ── Přílohy ─────────────────────────────────────────────────── attachments = extract_attachments(msg) # ── Raw MAPI ────────────────────────────────────────────────── mapi_raw = extract_mapi_props(msg) msg.close() # ── Raw-OLE fallback pro degradovana textova pole ───────────── # Kdyz extract_msg vratil prazdno/� nebo musel hadat encoding # (override/suppress), docteme klicova pole primo z OLE streamu # kaskadovym dekodovanim — spolehlivejsi nez jeden vnuceny encoding. parse_degraded = parse_mode != "normal" # v non-normal modu byl encoding hadany -> raw kaskade se veri vic forced = parse_mode != "normal" if (forced or _degraded(subject) or _degraded(body_text) or _degraded(sender_email) or (body_html and "�" in body_html)): raw = _raw_mapi_strings(msg_path) if raw["subject"] and (forced or _degraded(subject)): subject = raw["subject"] if raw["normalized_subject"] and (forced or _degraded(normalized_subject)): normalized_subject = raw["normalized_subject"] if raw["body_text"] and (forced or _degraded(body_text)): body_text = raw["body_text"] if raw["body_html"] and (forced or not body_html or "�" in body_html): bh = raw["body_html"] body_html = bh if len(bh) <= 2 * 1024 * 1024 else bh[:2 * 1024 * 1024] if (raw["sender_smtp"] or raw["sender_email"]) and (forced or _degraded(sender_email)): sender_email = raw["sender_smtp"] or raw["sender_email"] if raw["sender_name"] and (forced or _degraded(sender_name)): sender_name = raw["sender_name"] if raw["sender_smtp"] and not sender_smtp: sender_smtp = raw["sender_smtp"] # ── Dokument ────────────────────────────────────────────────── return { "_id": mid, "filename": msg_path.name, "subject": subject, "normalized_subject": normalized_subject, "importance": importance, "sensitivity": sensitivity, "flag_status": flag_status, "read_receipt_requested": read_receipt, "delivery_receipt_requested": delivery_receipt, "has_attachments": len(attachments) > 0, "attachment_count": len(attachments), "message_size_bytes": msg_path.stat().st_size, "conversation_topic": conversation_topic, "conversation_index": conversation_index, "in_reply_to": in_reply_to, "internet_references": internet_refs, "categories": categories, "received_at": received_at, "sent_at": sent_at, "sender": { "email": sender_email, "name": sender_name, "smtp": sender_smtp, }, "to": to_raw, "cc": cc_raw, "bcc": bcc_raw, "display_to": display_to, "display_cc": display_cc, "recipients": recipients, "body_text": body_text, "body_html": body_html, "attachments": attachments, "headers": headers, "mapi": mapi_raw, "parse_mode": parse_mode, # normal / suppress_all / override:ENC "parse_degraded": parse_degraded, # True = pouzit fallback (vadna priloha/encoding) "parsed_at": datetime.now(timezone.utc).replace(tzinfo=None), } except Exception as e: logging.error("extract_message failed [%s]: %s", msg_path.name, e) return None # ─── MongoDB indexy ─────────────────────────────────────────────────────────── def create_indexes(col): print(" Vytvarim indexy...") col.create_index([("received_at", ASCENDING)]) col.create_index([("sent_at", ASCENDING)]) col.create_index([("sender.email", ASCENDING)]) col.create_index([("filename", ASCENDING)], unique=True, sparse=True) col.create_index([("conversation_topic", ASCENDING)]) col.create_index([("has_attachments", ASCENDING)]) col.create_index([("categories", ASCENDING)]) col.create_index([("importance", ASCENDING)]) col.create_index([("flag_status", ASCENDING)]) col.create_index([ ("subject", TEXT), ("body_text", TEXT), ("to", TEXT), ("cc", TEXT), ], name="text_search", default_language="none") print(" Indexy hotovy.") # ─── MAIN ───────────────────────────────────────────────────────────────────── def main(): ap = argparse.ArgumentParser(description=f"parse_emails v{SCRIPT_VERSION}") ap.add_argument("--msgs-dir", default=str(MSGS_DIR), help="Cesta k .msg souborum") ap.add_argument("--limit", type=int, default=0, help="Zpracovat max N souboru (0 = vse)") ap.add_argument("--skip-existing", action="store_true", help="Preskocit soubory ktere jiz jsou v MongoDB (pokracovani)") ap.add_argument("--no-indexes", action="store_true", help="Nevytvorit indexy na konci") args = ap.parse_args() msgs_dir = Path(args.msgs_dir) start = datetime.now() print(f"=== parse_emails v{SCRIPT_VERSION} ===") print(f"Start: {start.strftime('%Y-%m-%d %H:%M:%S')}") print(f"Zdroj: {msgs_dir}") print(f"MongoDB: {MONGO_URI} -> {MONGO_DB}.{MONGO_COL}") # MongoDB client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) try: client.admin.command("ping") print(" MongoDB OK") except Exception as e: print(f" CHYBA: MongoDB neni dostupna -- {e}") sys.exit(1) col = client[MONGO_DB][MONGO_COL] # Skip existing — nacti seznam uz importovanych souboru existing: set = set() if args.skip_existing: print(" Nacitam existujici zaznamy z MongoDB...") existing = set(col.distinct("filename")) print(f" {len(existing)} jiz importovano") # Scan print(f"\nSkenuji {msgs_dir} ...") all_files = sorted(msgs_dir.glob("*.msg")) if args.limit: all_files = all_files[:args.limit] to_process = [f for f in all_files if f.name not in existing] skipped = len(all_files) - len(to_process) total = len(to_process) print(f" Celkem .msg: {len(all_files)}") print(f" Preskoceno: {skipped}") print(f" Ke zpracovani: {total}\n") if total == 0: print("Neni co importovat.") client.close() return batch = [] ok_count = 0 err_count = 0 def flush(): nonlocal ok_count, err_count if not batch: return try: col.bulk_write(batch, ordered=False) except Exception as e: # Cely batch spadl (typicky jeden vadny dokument). Zkusime # ho zapsat dokument po dokumentu, aby chyba zahodila jen # skutecne vadny zaznam, ne celych BATCH_SIZE. logging.error("bulk_write spadl (%s) -- prepinam na per-dokument", e) print(f" CHYBA bulk_write: {e} -- zkousim per-dokument") for op in batch: try: col.bulk_write([op], ordered=False) except Exception as e2: try: bad_id = getattr(op, "_filter", {}).get("_id", "?") except Exception: bad_id = "?" logging.error("per-dokument selhal [_id=%s]: %s", bad_id, e2) print(f" ZAHOZEN _id={bad_id}: {e2}") ok_count -= 1 err_count += 1 batch.clear() for i, msg_path in enumerate(to_process, 1): doc = extract_message(msg_path) if doc is None: err_count += 1 else: batch.append(UpdateOne({"_id": doc["_id"]}, {"$set": doc}, upsert=True)) ok_count += 1 if len(batch) >= BATCH_SIZE: flush() # Výpis každého emailu status = "ERR " if doc is None else "OK " subject_str = (doc.get("subject") or "")[:60] if doc else "?" sender_str = (doc.get("sender", {}).get("email") or "")[:40] if doc else "?" print(f" {i:>6}/{total} {status} {subject_str:<60} {sender_str}") if i % 500 == 0: elapsed = (datetime.now() - start).total_seconds() rate = i / elapsed if elapsed > 0 else 0 eta_s = int((total - i) / rate) if rate > 0 else 0 print(f" {'─'*80}") print(f" Průběh: ok={ok_count} err={err_count} " f"{rate:.1f} msg/s ETA {eta_s//3600}h{(eta_s%3600)//60}m") print(f" {'─'*80}") flush() elapsed_total = (datetime.now() - start).total_seconds() print(f"\n{'='*52}") print(f"Vysledek: ok={ok_count} | skip={skipped} | err={err_count}") print(f"Celkovy cas: {int(elapsed_total//3600)}h {int((elapsed_total%3600)//60)}m {int(elapsed_total%60)}s") print(f"Dokumentu v kolekci: {col.count_documents({})}") if not args.no_indexes: print() create_indexes(col) print(f"\nKonec: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") if err_count: print(f"Chyby logovany do: {LOG_FILE}") client.close() if __name__ == "__main__": main()