""" parse_emails_tower_v1.1.py Nazev: parse_emails_tower_v1.1.py Verze: 1.1 Datum: 2026-06-02 Autor: vladimir.buzalka Popis: Parsuje vsechny .msg soubory z MSGS_DIR a importuje je jako dokumenty do MongoDB. Z kazdeho souboru extrahuje VSECHNY dostupne vlastnosti — podobne jako EXIF u fotek: - predmet, odesilatel, prijemci (To/CC/BCC s typy) - cas doruceni a odeslani (UTC) - telo plaintext + HTML (max 2 MB) - prilohy (metadata: jmeno, velikost, MIME typ, inline flag) - internet headers (X-Originating-IP, Received, DKIM, ...) - MAPI vlastnosti: dulezitost, citlivost, priznak, konverzacni vlakno, kategorie, In-Reply-To, References, ... - vsechny raw MAPI properties jako {0xXXXX: value} DB: emaily Kolekce: vbuzalka@its.jnj.com _id: Internet Message-ID (nebo "filename:" jako fallback) Bezpecne prerusit a opakovat: - upsert podle _id — duplicity se automaticky prepisi - --skip-existing nacte seznam hotovych souboru z MongoDB a preskoci je => pokracovani po preruseni bez ztraty prace Prostredi: Bezi v Docker containeru "python-runner" na Unraid Tower. .msg soubory jsou dostupne jako lokalni disk (volume mount): /mnt/user/JNJEMAILS -> /mnt/JNJEMAILS (uvnitr containeru) MongoDB na 192.168.1.76:27017 (externi, bezi mimo container). Spousteni (z Unraid terminalu): # Test na 50 emailech: docker exec -it python-runner python /scripts/parse_emails_tower_v1.1.py --limit 50 --no-indexes # Kompletni import na pozadi (log do souboru): docker exec -d python-runner bash -c \ "python /scripts/parse_emails_tower_v1.1.py > /scripts/parse_emails.log 2>&1" # Pokracovani po preruseni: docker exec -d python-runner bash -c \ "python /scripts/parse_emails_tower_v1.1.py --skip-existing > /scripts/parse_emails.log 2>&1" # Sledovani prubehu: docker exec -it python-runner tail -f /scripts/parse_emails.log Vystup na konzoli: Kazdy email na jednom radku: / OK/ERR Kazych 500 emailu: oddelovac s prubehem, rychlosti a ETA. Na konci: souhrn ok/skip/err, celkovy cas, pocet dokumentu v kolekci. Zavislosti (nainstalovane v Docker image python-runner): extract-msg==0.55.0, pymongo, python-dateutil Python 3.12, Linux (Docker container na Unraid Tower) Struktura dokumentu v MongoDB: _id Internet Message-ID (nebo filename: fallback) filename jmeno .msg souboru (20znakovy hex + .msg) subject predmet zpravy normalized_subject predmet bez RE:/FW: prefixu importance 0=nizka 1=normalni 2=vysoka sensitivity 0=normalni 1=osobni 2=soukrome 3=duverne flag_status 0=bez priznaku 1=oznaceno 2=dokonceno read_receipt_requested bool delivery_receipt_requested bool has_attachments bool attachment_count int message_size_bytes velikost .msg souboru na disku conversation_topic tema vlakna (PR_CONVERSATION_TOPIC) conversation_index base64 PR_CONVERSATION_INDEX in_reply_to Message-ID predchozi zpravy internet_references [Message-ID] — cela historia vlakna categories [str] — MAPI kategorie / stitky read_receipt_requested bool delivery_receipt_requested bool received_at datetime UTC — cas doruceni sent_at datetime UTC — cas odeslani sender.email emailova adresa odesilatele sender.name zobrazovane jmeno odesilatele sender.smtp SMTP adresa (pro interni EX adresy) to retezec To (tak jak v Outlooku) cc retezec CC bcc retezec BCC display_to PR_DISPLAY_TO (zkraceny seznam) display_cc PR_DISPLAY_CC recipients [{type, email, name}] — to/cc/bcc s typy body_text plain text telo body_html HTML telo (max 2 MB, None pokud neni) attachments [{filename, size_bytes, mime_type, content_id, is_inline}] headers dict internet headers (lowercase_s_podtrzitky) mapi dict vsech raw MAPI properties {0xXXXX: value} parsed_at datetime UTC — cas parsovani Indexy (vytvoreny automaticky na konci): received_at, sent_at, sender.email, filename (unique), conversation_topic, has_attachments, categories, importance, flag_status, text_search (subject + body_text + to + cc) Chyby: Soubory ktere selhaly jsou zalogiovany do parse_emails_errors.log v adresari skriptu. Radek: timestamp | open/extract failed | duvod. Historie verzi: 1.0 2026-06-01 Inicialni verze 1.1 2026-06-02 Nasazeni na Unraid Tower v Docker containeru python-runner; MSGS_DIR zmeneno z SMB share na lokalni mount /mnt/JNJEMAILS; aktualizovany popis spousteni pro docker exec """ import sys import re import logging import argparse import base64 from pathlib import Path from datetime import datetime, timezone from typing import Optional import extract_msg from dateutil import parser as dtparser from pymongo import MongoClient, UpdateOne, ASCENDING, TEXT if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8", errors="replace") # ─── KONFIGURACE ────────────────────────────────────────────────────────────── MSGS_DIR = Path("/mnt/JNJEMAILS") MONGO_URI = "mongodb://192.168.1.76:27017" MONGO_DB = "emaily" MONGO_COL = "vbuzalka@its.jnj.com" BATCH_SIZE = 200 LOG_FILE = Path(__file__).parent / "parse_emails_errors.log" SCRIPT_VERSION = "1.1" # ────────────────────────────────────────────────────────────────────────────── logging.basicConfig( filename=str(LOG_FILE), level=logging.ERROR, format="%(asctime)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", encoding="utf-8", ) # ─── Pomocné funkce ─────────────────────────────────────────────────────────── def safe(obj, *attrs, default=None): """Bezpecne cteni atributu — vrati prvni non-None hodnotu.""" for attr in attrs: try: val = getattr(obj, attr, None) if val is None: continue if isinstance(val, str) and not val.strip(): continue return val except Exception: continue return default def parse_date(raw) -> Optional[datetime]: """Libovolny datum -> UTC datetime bez tzinfo (pro MongoDB).""" if raw is None: return None if isinstance(raw, datetime): if raw.tzinfo: return raw.astimezone(timezone.utc).replace(tzinfo=None) return raw try: dt = dtparser.parse(str(raw)) if dt.tzinfo: return dt.astimezone(timezone.utc).replace(tzinfo=None) return dt except Exception: return None def to_bson(val): """Konvertuje hodnotu na BSON-serializovatelny typ.""" if isinstance(val, bytes): return val.hex() if len(val) <= 128 else f"" if isinstance(val, datetime): return parse_date(val) if isinstance(val, (str, int, float, bool, type(None))): return val if isinstance(val, list): return [to_bson(v) for v in val] try: return int(val) except Exception: pass return str(val) # ─── Extrakce částí zprávy ──────────────────────────────────────────────────── def extract_headers(msg) -> dict: headers = {} try: hdr = msg.header if not hdr: return {} from email.header import decode_header as _dh def _decode(v: str) -> str: try: parts = _dh(v) out = "" for part, enc in parts: out += part.decode(enc or "utf-8", errors="replace") if isinstance(part, bytes) else part return out except Exception: return v for key in set(hdr.keys()): k = key.lower().replace("-", "_") vals = [_decode(v) for v in hdr.get_all(key, [])] headers[k] = vals if len(vals) > 1 else (vals[0] if vals else "") except Exception as e: logging.error("extract_headers: %s", e) return headers def extract_recipients(msg) -> list: result = [] type_map = {1: "to", 2: "cc", 3: "bcc"} try: for r in msg.recipients: rtype = getattr(r, "type", 1) try: rtype = int(rtype) except Exception: try: rtype = int(rtype.value) except Exception: rtype = 1 rec = { "type": type_map.get(rtype, "to"), "email": safe(r, "email", default=""), "name": safe(r, "name", default=""), } result.append(rec) except Exception as e: logging.error("extract_recipients: %s", e) return result def extract_attachments(msg) -> list: result = [] try: for att in msg.attachments: fname = safe(att, "longFilename", "shortFilename", default="") if not fname: continue size = 0 try: d = att.data size = len(d) if d else 0 except Exception: pass result.append({ "filename": fname, "size_bytes": size, "mime_type": safe(att, "mimetype", "mimeType", default="application/octet-stream"), "content_id": safe(att, "cid", default=None), "is_inline": bool(safe(att, "isInline", default=False)), }) except Exception as e: logging.error("extract_attachments: %s", e) return result def extract_mapi_props(msg) -> dict: """Vsechny raw MAPI properties jako {0xXXXX: value}.""" result = {} try: props = msg.props if not hasattr(props, "items"): return {} for key, prop in props.items(): try: val = to_bson(prop.value) prop_id = f"0x{key[:4].upper()}" if len(key) >= 4 else f"0x{key.upper()}" result[prop_id] = val except Exception: pass except Exception as e: logging.error("extract_mapi_props: %s", e) return result # ─── Hlavní extrakce ───────────────────────────────────────────────────────── def extract_message(msg_path: Path) -> Optional[dict]: """Parsuje jeden .msg soubor -> MongoDB dokument.""" try: msg = extract_msg.Message(str(msg_path)) except Exception as e: logging.error("open failed [%s]: %s", msg_path.name, e) return None try: # ── Message-ID ──────────────────────────────────────────────── mid = None for attr in ("messageId", "message_id", "internetMessageId"): mid = safe(msg, attr) if mid: break if not mid: mid = f"filename:{msg_path.stem}" mid = str(mid).strip() # ── Předmět ─────────────────────────────────────────────────── try: subject = msg.subject or "" except Exception: subject = "" normalized_subject = safe(msg, "normalizedSubject", "normalized_subject", default="") # ── Tělo ────────────────────────────────────────────────────── try: body_text = msg.body or "" except Exception: body_text = "" body_html = None try: bh = msg.htmlBody if isinstance(bh, bytes): bh = bh.decode("utf-8", errors="replace") if bh: body_html = bh if len(bh) <= 2 * 1024 * 1024 else bh[:2 * 1024 * 1024] except Exception: pass # ── Odesílatel ──────────────────────────────────────────────── try: sender_email = msg.sender or "" except Exception: sender_email = "" sender_name = safe(msg, "senderName", "sender_name", default="") sender_smtp = safe(msg, "senderSmtpAddress", "sent_representing_smtp_address", default="") # ── Příjemci ────────────────────────────────────────────────── recipients = extract_recipients(msg) try: to_raw = msg.to or "" except Exception: to_raw = "" try: cc_raw = msg.cc or "" except Exception: cc_raw = "" try: bcc_raw = getattr(msg, "bcc", None) or "" except Exception: bcc_raw = "" display_to = safe(msg, "displayTo", "display_to", default="") display_cc = safe(msg, "displayCc", "display_cc", default="") # ── Časy ────────────────────────────────────────────────────── try: received_at = parse_date(msg.date) except Exception: received_at = None sent_at = None for attr in ("clientSubmitTime", "client_submit_time", "sentOn"): v = safe(msg, attr) if v: sent_at = parse_date(v) break # ── MAPI vlastnosti ─────────────────────────────────────────── importance = 1 try: v = msg.importance if v is not None: importance = int(v) except Exception: pass sensitivity = 0 try: v = getattr(msg, "sensitivity", None) if v is not None: sensitivity = int(v) except Exception: pass flag_status = 0 try: v = safe(msg, "flagStatus", "flag_status") if v is not None: flag_status = int(v) except Exception: pass conversation_topic = safe(msg, "conversationTopic", "conversation_topic", default="") conversation_index = "" try: ci = safe(msg, "conversationIndex", "conversation_index") if isinstance(ci, bytes): conversation_index = base64.b64encode(ci).decode() elif ci: conversation_index = str(ci) except Exception: pass in_reply_to = safe(msg, "inReplyTo", "in_reply_to", default="") internet_refs = [] try: refs = safe(msg, "internetReferences", "internet_references") if isinstance(refs, list): internet_refs = refs elif isinstance(refs, str) and refs: internet_refs = [r.strip() for r in refs.split() if r.strip()] except Exception: pass categories = [] try: cats = safe(msg, "categories") if isinstance(cats, list): categories = [str(c) for c in cats if c] elif isinstance(cats, str) and cats: categories = [c.strip() for c in re.split(r"[;,]", cats) if c.strip()] except Exception: pass read_receipt = bool(safe(msg, "readReceiptRequested", "read_receipt_requested", default=False)) delivery_receipt = bool(safe(msg, "deliveryReceiptRequested", "delivery_receipt_requested", default=False)) # ── Internet headers ────────────────────────────────────────── headers = extract_headers(msg) if not in_reply_to: in_reply_to = headers.get("in_reply_to", "") if not internet_refs: refs_str = headers.get("references", "") if isinstance(refs_str, str) and refs_str: internet_refs = [r.strip() for r in refs_str.split() if r.strip()] # ── Přílohy ─────────────────────────────────────────────────── attachments = extract_attachments(msg) # ── Raw MAPI ────────────────────────────────────────────────── mapi_raw = extract_mapi_props(msg) msg.close() # ── Dokument ────────────────────────────────────────────────── return { "_id": mid, "filename": msg_path.name, "subject": subject, "normalized_subject": normalized_subject, "importance": importance, "sensitivity": sensitivity, "flag_status": flag_status, "read_receipt_requested": read_receipt, "delivery_receipt_requested": delivery_receipt, "has_attachments": len(attachments) > 0, "attachment_count": len(attachments), "message_size_bytes": msg_path.stat().st_size, "conversation_topic": conversation_topic, "conversation_index": conversation_index, "in_reply_to": in_reply_to, "internet_references": internet_refs, "categories": categories, "received_at": received_at, "sent_at": sent_at, "sender": { "email": sender_email, "name": sender_name, "smtp": sender_smtp, }, "to": to_raw, "cc": cc_raw, "bcc": bcc_raw, "display_to": display_to, "display_cc": display_cc, "recipients": recipients, "body_text": body_text, "body_html": body_html, "attachments": attachments, "headers": headers, "mapi": mapi_raw, "parsed_at": datetime.now(timezone.utc).replace(tzinfo=None), } except Exception as e: logging.error("extract_message failed [%s]: %s", msg_path.name, e) return None # ─── MongoDB indexy ─────────────────────────────────────────────────────────── def create_indexes(col): print(" Vytvarim indexy...") col.create_index([("received_at", ASCENDING)]) col.create_index([("sent_at", ASCENDING)]) col.create_index([("sender.email", ASCENDING)]) col.create_index([("filename", ASCENDING)], unique=True, sparse=True) col.create_index([("conversation_topic", ASCENDING)]) col.create_index([("has_attachments", ASCENDING)]) col.create_index([("categories", ASCENDING)]) col.create_index([("importance", ASCENDING)]) col.create_index([("flag_status", ASCENDING)]) col.create_index([ ("subject", TEXT), ("body_text", TEXT), ("to", TEXT), ("cc", TEXT), ], name="text_search", default_language="none") print(" Indexy hotovy.") # ─── MAIN ───────────────────────────────────────────────────────────────────── def main(): ap = argparse.ArgumentParser(description=f"parse_emails v{SCRIPT_VERSION}") ap.add_argument("--msgs-dir", default=str(MSGS_DIR), help="Cesta k .msg souborum") ap.add_argument("--limit", type=int, default=0, help="Zpracovat max N souboru (0 = vse)") ap.add_argument("--skip-existing", action="store_true", help="Preskocit soubory ktere jiz jsou v MongoDB (pokracovani)") ap.add_argument("--no-indexes", action="store_true", help="Nevytvorit indexy na konci") args = ap.parse_args() msgs_dir = Path(args.msgs_dir) start = datetime.now() print(f"=== parse_emails v{SCRIPT_VERSION} ===") print(f"Start: {start.strftime('%Y-%m-%d %H:%M:%S')}") print(f"Zdroj: {msgs_dir}") print(f"MongoDB: {MONGO_URI} -> {MONGO_DB}.{MONGO_COL}") # MongoDB client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) try: client.admin.command("ping") print(" MongoDB OK") except Exception as e: print(f" CHYBA: MongoDB neni dostupna -- {e}") sys.exit(1) col = client[MONGO_DB][MONGO_COL] # Skip existing — nacti seznam uz importovanych souboru existing: set = set() if args.skip_existing: print(" Nacitam existujici zaznamy z MongoDB...") existing = set(col.distinct("filename")) print(f" {len(existing)} jiz importovano") # Scan print(f"\nSkenuji {msgs_dir} ...") all_files = sorted(msgs_dir.glob("*.msg")) if args.limit: all_files = all_files[:args.limit] to_process = [f for f in all_files if f.name not in existing] skipped = len(all_files) - len(to_process) total = len(to_process) print(f" Celkem .msg: {len(all_files)}") print(f" Preskoceno: {skipped}") print(f" Ke zpracovani: {total}\n") if total == 0: print("Neni co importovat.") client.close() return batch = [] ok_count = 0 err_count = 0 def flush(): if not batch: return try: col.bulk_write(batch, ordered=False) except Exception as e: logging.error("bulk_write: %s", e) print(f" CHYBA bulk_write: {e}") batch.clear() for i, msg_path in enumerate(to_process, 1): doc = extract_message(msg_path) if doc is None: err_count += 1 else: batch.append(UpdateOne({"_id": doc["_id"]}, {"$set": doc}, upsert=True)) ok_count += 1 if len(batch) >= BATCH_SIZE: flush() # Výpis každého emailu status = "ERR " if doc is None else "OK " subject_str = (doc.get("subject") or "")[:60] if doc else "?" sender_str = (doc.get("sender", {}).get("email") or "")[:40] if doc else "?" print(f" {i:>6}/{total} {status} {subject_str:<60} {sender_str}") if i % 500 == 0: elapsed = (datetime.now() - start).total_seconds() rate = i / elapsed if elapsed > 0 else 0 eta_s = int((total - i) / rate) if rate > 0 else 0 print(f" {'─'*80}") print(f" Průběh: ok={ok_count} err={err_count} " f"{rate:.1f} msg/s ETA {eta_s//3600}h{(eta_s%3600)//60}m") print(f" {'─'*80}") flush() elapsed_total = (datetime.now() - start).total_seconds() print(f"\n{'='*52}") print(f"Vysledek: ok={ok_count} | skip={skipped} | err={err_count}") print(f"Celkovy cas: {int(elapsed_total//3600)}h {int((elapsed_total%3600)//60)}m {int(elapsed_total%60)}s") print(f"Dokumentu v kolekci: {col.count_documents({})}") if not args.no_indexes: print() create_indexes(col) print(f"\nKonec: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") if err_count: print(f"Chyby logovany do: {LOG_FILE}") client.close() if __name__ == "__main__": main()