""" jnj_tower_ingest v1.5 Nazev: jnj_tower_ingest_v1.5.py Verze: 1.5.0 Datum: 2026-06-17 Autor: vladimir.buzalka ZMENA 1.5: PARSE cte navic MAPI PrimarySendAccount (0x0E28) + SentRepresenting email (0x0065) PRIMO z OLE (extract_msg je u stringovych props nevraci) a uklada je do pole send_account. Kdyz send_account obsahuje "buzalka.cz", nastavi send_failed=True + send_error="SendAs buzalka.cz (PrimarySendAccount)". Toto je SPOLEHLIVY priznak neodeslane (SendAs-denied) zpravy — na rozdil od detekce v tele (1.4), ktera u Sent kopie casto chybi. Rychly dotaz na neodeslane: { send_failed: true }. (Historicke doplneno backfill_send_failed.) ZMENA 1.4: (a) PARSE detekuje NEODESLANY e-mail: kdyz telo obsahuje stopy chyby odeslani (SendAsDenied / "could not be sent" / "TransportSend operation has failed" / MapiExceptionSendAsDenied), dokument dostane send_failed=true + send_error (vc. kodu ec=). Dotaz na neodeslane: {send_failed: true}. (Telo s chybou doteche az re-uploadem z jnj_mailbox_sync v1.3 + overwrite na app.py v2.4.) (b) Nova FAZE RECONCILE (--reconcile): smaze PROVIZORNI duplikat. Sent polozka bez Message-ID (_id zacina filename:/entryid:) je jen prechodny snimek; kdyz k ni existuje "dvojce" s REALNYM Message-ID (stejni 'to' prijemci + stejny normalized_subject + received_at do 24h), je provizorni kopie redundantni -> smaze se. Neodeslane (bez dvojcete) ZUSTANOU. Bezi jen s --reconcile; --dry-run = jen plan (nic nemaze). Konzervativni match na stabilnim obsahu, ne na EntryID. ZMENA 1.3: PARSE faze pri extrakci priloh zaroven nahraje binarku do SeaweedFS (Tower1) pres sdileny seaweed_store.py; kazda priloha dostane sha256 + seaweed_path + seaweed_url. Dokument s prilohami dostane doc-level seaweed_synced_at. Vypadek SeaweedFS parse neshodi (jen warning). Backfill jiz naparsovanych: seaweed_attachments_backfill_jnj.py. Popis: Sjednoceny Tower-side ingest JNJ e-mailu. Spojuje tri drive oddelene casti do jednoho behu (vse bezi v kontejneru python-runner u Monga): FAZE 1 — PARSE (drive parse_emails_tower_v1.3.py): .msg soubory z /mnt/JNJEMAILS -> dokument v Mongo emaily."vbuzalka@its.jnj.com" (bohata extrakce: telo, prilohy, hlavicky, MAPI props, ...). _id = Internet Message-ID. INKREMENTALNE: parsuje jen soubory novejsi nez mtime watermark (jnj_sync_state/_id="parse_state"). Prvni beh = seed dle filename v Mongu. --full reparsuje vse. FAZE 2 — SYNC (drive sync_jnj_state_v1.0.py): nejnovejsi /mnt/JNJEMAILS/db/jnjemails_*.db (SQLite, JEN CTENI ro) -> zrcadlo do Mongo kolekce 'jnj_messages' (upsert) -> doplneni cesty/stavu do emaily."vbuzalka@its.jnj.com": jnj_folder = COALESCE(jnj_folder, folder) jnj_is_read, jnj_not_in_mailbox, jnj_left_mailbox_at, jnj_folder_synced_at (match _id==message_id, fallback filename; BEZ upsertu — nezakladame stuby). Inkrementalne pres watermark updated_at (jnj_sync_state/_id= "watermark") + zkratka last_db (stejna DB -> hned no-op). FAZE 3 — ENRICH (drive jnj_emails_to_fulltext_v1.0.py): doindexuje JNJ schranku do PG fulltextu zavolanim SDILENEHO skriptu 5_enrich_fulltext_emails_vX.Y.py --mailbox "vbuzalka@its.jnj.com" (stejny extractor jako Graph pipeline -> konzistentni schema). Verze enrich se auto-detekuje (nejnovejsi /scripts/5_enrich_fulltext_emails_v*.py). Spousti se JEN kdyz parse pridal nove dokumenty (jinak preskok — JNJ stejne enrichuje pipeline v 6:00/18:00). --no-enrich vypne, --enrich-always vynuti. PORADI: parse -> sync -> enrich. Cerstve naparsovane maily dostanou cestu (sync) i fulltext (enrich) hned ve stejnem behu (drive: pokud sync/enrich predbehl parse, novy mail nemel co zpracovat). Tri nezavisle udalosti (nova .msg / nova .db / nove doc pro PG) -> skript udela jen to, co ma praci; jinak levny no-op (vhodne pro cron kazdych 5 minut). Spojovaci klic vsude = Internet Message-ID = Mongo _id. Prostredi: Docker container "python-runner" na Unraid Tower. /mnt/user/JNJEMAILS -> /mnt/JNJEMAILS (.msg v rootu, .db v db/) MongoDB 192.168.1.76:27017 (externi). Argumenty: --dry-run nic nezapise, jen spocita a vypise plan vsech fazi --full parse: reparsuj vse; sync: ignoruj watermark --limit N max N souboru (parse) / radku (sync) — test --reindex vynut vytvoreni indexu na konci parse faze --force sync: ignoruj zkratku last_db (zpracuj i hotovou DB) --parse-only spust jen fazi PARSE --sync-only spust jen fazi SYNC --enrich-only spust jen fazi ENRICH (vynuti enrich i bez novych dat) --no-enrich preskoc fazi ENRICH --enrich-always spust enrich i kdyz parse nepridal nove dokumenty Spousteni (v kontejneru python-runner): # Test: docker exec -it python-runner python3 /scripts/jnj_tower_ingest_v1.1.py --dry-run # Ostry inkrementalni beh (cron): docker exec python-runner python3 /scripts/jnj_tower_ingest_v1.1.py # Plny reparse + reindex: docker exec -it python-runner python3 /scripts/jnj_tower_ingest_v1.1.py --full --reindex Zavislosti (v image python-runner): extract-msg==0.55.0, olefile, pymongo, python-dateutil, sqlite3 (stdlib). Enrich faze deleguje na 5_enrich_fulltext_emails (psycopg, bs4 v image). Python 3.10+. Historie verzi: 1.0.0 2026-06-10 Sjednoceni parse_emails_tower_v1.3 + sync_jnj_state_v1.0 do jedineho skriptu. Parse zinkrementalnen pres mtime watermark (drive scan celeho adresare kazdy beh). Indexy jen pri full/seed/--reindex. Poradi parse->sync. 1.1.0 2026-06-10 + FAZE 3 ENRICH: deleguje na sdileny 5_enrich_fulltext_emails --mailbox (auto-detekce verze), jen kdyz parse pridal nove dokumenty. Nahrazuje jnj_emails_to_fulltext_v1.0.py (ten -> Trash). Flagy --enrich-only/--no-enrich/--enrich-always. 1.2.0 2026-06-10 SYNC NULL-safe: stary inbox_full_sync zapisuje radky s updated_at=NULL; watermark filtr "updated_at > wm" je tise zahazoval (NULL > x = false) -> maily mely telo ale nikdy nedostaly jnj_folder. Nyni se beru i radky s updated_at IS NULL, ktere jeste nejsou v jnj_messages (zpracuji se prave jednou). Nic uz se tise nezahodi. 1.3.0 2026-06-13 PARSE: prilohy do SeaweedFS (sha256 + seaweed_path/url). 1.4.0 2026-06-16 (a) PARSE detekuje neodeslany e-mail -> send_failed + send_error (SendAsDenied marker v tele). (b) Nova faze RECONCILE (--reconcile): smaze provizorni no-ID Sent kopie, ke kterym existuje dvojce s realnym Message-ID (match to+subjekt+cas, ne EntryID); neodeslane ponecha. """ import sys import os import re import glob import hashlib import logging import argparse import base64 import struct import sqlite3 import subprocess from pathlib import Path from datetime import datetime, timezone from typing import Optional import extract_msg from extract_msg.enums import ErrorBehavior import olefile from dateutil import parser as dtparser from pymongo import MongoClient, UpdateOne, ASCENDING, TEXT sys.path.insert(0, str(Path(__file__).resolve().parent)) import seaweed_store as sw if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8", errors="replace") # ─── KONFIGURACE ────────────────────────────────────────────────────────────── MSGS_DIR = Path("/mnt/JNJEMAILS") DB_DIR = "/mnt/JNJEMAILS/db" MONGO_URI = "mongodb://192.168.1.76:27017" MONGO_DB = "emaily" EMAILS_COL = "vbuzalka@its.jnj.com" MIRROR_COL = "jnj_messages" STATE_COL = "jnj_sync_state" BATCH_SIZE = 200 LOG_FILE = Path(__file__).parent / "jnj_tower_ingest_errors.log" ENRICH_GLOB = "/scripts/5_enrich_fulltext_emails_v*.py" # sdileny PG enrich SCRIPT_VERSION = "1.5.0" # Stopy chyby odeslani v tele (.msg neodeslaneho e-mailu) — viz hustak/SendAsDenied SEND_FAIL_MARKERS = ( "MapiExceptionSendAsDenied", "SendAsDeniedException", "could not be sent", "TransportSend operation has failed", "Transport-Send failed", ) # Sloupce zrcadlene ze SQLite messages -> jnj_messages ROW_COLS = ["message_id", "subject", "sender", "received_at", "folder", "jnj_folder", "is_read", "not_in_mailbox_anymore", "left_mailbox_at", "entry_id", "graph_id", "updated_at", "source"] # ────────────────────────────────────────────────────────────────────────────── logging.basicConfig( filename=str(LOG_FILE), level=logging.ERROR, format="%(asctime)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", encoding="utf-8", ) # ══════════════════════════════════════════════════════════════════════════════ # FAZE 1 — PARSE (.msg -> Mongo emaily) [drive parse_emails_tower_v1.3.py] # ══════════════════════════════════════════════════════════════════════════════ def safe(obj, *attrs, default=None): """Bezpecne cteni atributu — vrati prvni non-None hodnotu.""" for attr in attrs: try: val = getattr(obj, attr, None) if val is None: continue if isinstance(val, str) and not val.strip(): continue return val except Exception: continue return default def parse_date(raw) -> Optional[datetime]: """Libovolny datum -> UTC datetime bez tzinfo (pro MongoDB).""" if raw is None: return None if isinstance(raw, datetime): if raw.tzinfo: return raw.astimezone(timezone.utc).replace(tzinfo=None) return raw try: dt = dtparser.parse(str(raw)) if dt.tzinfo: return dt.astimezone(timezone.utc).replace(tzinfo=None) return dt except Exception: return None _INT64_MIN, _INT64_MAX = -(2 ** 63), 2 ** 63 - 1 def to_bson(val): """Konvertuje hodnotu na BSON-serializovatelny typ. Pozor: BSON umi jen signed int64. Python ma neomezene integery, takze velke MAPI hodnoty (PR_CHANGE_KEY, FILETIME, 64-bit handle) mimo rozsah int64 prevadime na string — jinak cely bulk_write spadne na 'MongoDB can only handle up to 8-byte ints'. """ # bool musi byt PRED int (isinstance(True, int) == True) if isinstance(val, bool): return val if isinstance(val, bytes): return val.hex() if len(val) <= 128 else f"" if isinstance(val, datetime): return parse_date(val) if isinstance(val, int): return val if _INT64_MIN <= val <= _INT64_MAX else str(val) if isinstance(val, (str, float, type(None))): return val if isinstance(val, list): return [to_bson(v) for v in val] try: iv = int(val) return iv if _INT64_MIN <= iv <= _INT64_MAX else str(iv) except Exception: pass return str(val) def extract_headers(msg) -> dict: headers = {} try: hdr = msg.header if not hdr: return {} from email.header import decode_header as _dh def _decode(v: str) -> str: try: parts = _dh(v) out = "" for part, enc in parts: out += part.decode(enc or "utf-8", errors="replace") if isinstance(part, bytes) else part return out except Exception: return v for key in set(hdr.keys()): k = key.lower().replace("-", "_") vals = [_decode(v) for v in hdr.get_all(key, [])] headers[k] = vals if len(vals) > 1 else (vals[0] if vals else "") except Exception as e: logging.error("extract_headers: %s", e) return headers def extract_recipients(msg) -> list: result = [] type_map = {1: "to", 2: "cc", 3: "bcc"} try: for r in msg.recipients: rtype = getattr(r, "type", 1) try: rtype = int(rtype) except Exception: try: rtype = int(rtype.value) except Exception: rtype = 1 rec = { "type": type_map.get(rtype, "to"), "email": safe(r, "email", default=""), "name": safe(r, "name", default=""), } result.append(rec) except Exception as e: logging.error("extract_recipients: %s", e) return result def extract_attachments(msg) -> list: result = [] try: for att in msg.attachments: fname = safe(att, "longFilename", "shortFilename", default="") if not fname: continue size = 0 raw = None try: d = att.data if isinstance(d, (bytes, bytearray)): raw = bytes(d) size = len(raw) elif d: size = len(d) # embedded message apod. — bez bajtu except Exception: pass mime = safe(att, "mimetype", "mimeType", default="application/octet-stream") entry = { "filename": fname, "size_bytes": size, "mime_type": mime, "content_id": safe(att, "cid", default=None), "is_inline": bool(safe(att, "isInline", default=False)), } # SeaweedFS upload (dedup dle obsahu, sdilene s Graph/mailstore vetvi). # Vypadek SeaweedFS NESMI shodit parse — pole se proste nedoplni a # dozene je seaweed_attachments_backfill_jnj.py. if raw: try: h = hashlib.sha256(raw).hexdigest() path, url, _ = sw.store(h, raw, mime) entry["sha256"] = h entry["seaweed_path"] = path entry["seaweed_url"] = url except Exception as e: logging.warning("SeaweedFS upload selhal (%s): %s", fname, e) result.append(entry) except Exception as e: logging.error("extract_attachments: %s", e) return result def extract_mapi_props(msg) -> dict: """Vsechny raw MAPI properties jako {0xXXXX: value}.""" result = {} try: props = msg.props if not hasattr(props, "items"): return {} for key, prop in props.items(): try: val = to_bson(prop.value) prop_id = f"0x{key[:4].upper()}" if len(key) >= 4 else f"0x{key.upper()}" result[prop_id] = val except Exception: pass except Exception as e: logging.error("extract_mapi_props: %s", e) return result # ─── Tolerantni otevirani a raw-OLE fallback ───────────────────────────────── _CPID_TO_CODEC = { 1250: "cp1250", 1251: "cp1251", 1252: "cp1252", 1253: "cp1253", 1254: "cp1254", 1255: "cp1255", 1256: "cp1256", 1257: "cp1257", 1258: "cp1258", 874: "cp874", 932: "shift_jis", 936: "gb2312", 949: "euc_kr", 950: "big5", 65001: "utf-8", 28591: "iso-8859-1", 28592: "iso-8859-2", 20127: "ascii", } def _read_u32_prop(ole, propid): """Precte 32-bit hodnotu MAPI property z top-level __properties_version1.0.""" try: data = ole.openstream("__properties_version1.0").read() except Exception: return None body = data[32:] # 32-bajtova hlavicka top-level property streamu for i in range(0, len(body) - 16 + 1, 16): rec = body[i:i + 16] tag = struct.unpack("> 16) & 0xFFFF) == propid: return struct.unpack(" Optional[str]: """Codec dle PR_INTERNET_CPID / PR_MESSAGE_CODEPAGE (jako napoveda, ne dogma).""" for pid in (0x3FDE, 0x3FFD): # INTERNET_CPID, MESSAGE_CODEPAGE codec = _CPID_TO_CODEC.get(_read_u32_prop(ole, pid)) # utf-8/ascii nejsou dobry hint pro 8-bit stream (casto lzou) if codec and codec not in ("utf-8", "ascii"): return codec return None def _cascade_decode(raw: bytes, is_unicode: bool, cpid_codec: Optional[str]) -> str: """Dekoduje bajty MAPI stringu. Hlavickam se neveri — zkousime striktne v poradi priorit a vezmeme prvni, co projde bez chyby.""" if not raw: return "" if is_unicode: # PT_UNICODE = utf-16-le try: return raw.decode("utf-16-le") except Exception: return raw.decode("utf-16-le", errors="replace") order = ["utf-8"] # utf-8 strict = silny rozlisovac if cpid_codec: order.append(cpid_codec) order += ["cp1250", "cp1252", "gb2312", "big5"] for enc in order: try: return raw.decode(enc, errors="strict") except Exception: continue return raw.decode("latin-1", errors="replace") # nikdy nespadne def _raw_mapi_strings(msg_path: Path) -> dict: """Cte klicova textova MAPI pole PRIMO z OLE (mimo extract_msg). Pouzije se jen kdyz extract_msg vrati degradovane pole.""" out = {"subject": "", "normalized_subject": "", "sender_name": "", "sender_email": "", "sender_smtp": "", "body_text": "", "body_html": ""} try: ole = olefile.OleFileIO(str(msg_path)) except Exception: return out try: cpid = _detect_cpid(ole) wanted = { # MAPI tag -> klic v out "0037": "subject", "0E1D": "normalized_subject", "0C1A": "sender_name", "5D01": "sender_smtp", "0C1F": "sender_email", "1000": "body_text", "1013": "body_html", } prefix = "__substg1.0_" found = {} # key -> (priorita_typu, hodnota) for entry in ole.listdir(): if len(entry) != 1: # jen top-level (ne vnorene zpravy) continue name = entry[0] if not name.startswith(prefix): continue tag = name[len(prefix):len(prefix) + 4].upper() key = wanted.get(tag) if not key: continue typ = name[-4:].upper() prio = {"001F": 3, "001E": 2, "0102": 1}.get(typ, 0) if prio == 0: continue prev = found.get(key) if prev and prev[0] >= prio: # preferuj unicode > ansi > binarni continue try: raw = ole.openstream(entry).read() val = _cascade_decode(raw, typ == "001F", cpid) except Exception: continue found[key] = (prio, val) for key, (_, val) in found.items(): out[key] = val finally: ole.close() return out def _degraded(s) -> bool: """Pole je degradovane: prazdne nebo obsahuje U+FFFD (nahradni znak).""" return (not s) or ("�" in s) def open_message(msg_path: Path): """Kaskadove otevreni .msg -> (msg, mode) nebo (None, None).""" try: return extract_msg.Message(str(msg_path)), "normal" except Exception: pass try: return extract_msg.Message( str(msg_path), errorBehavior=ErrorBehavior.SUPPRESS_ALL), "suppress_all" except Exception: pass encs = [] try: ole = olefile.OleFileIO(str(msg_path)) c = _detect_cpid(ole) ole.close() if c: encs.append(c) except Exception: pass for e in encs + ["cp1250", "cp1252"]: try: return extract_msg.Message( str(msg_path), errorBehavior=ErrorBehavior.SUPPRESS_ALL, overrideEncoding=e), f"override:{e}" except Exception: continue return None, None def detect_send_failure(*texts): """Vrati (send_failed, send_error) — hleda stopy chyby odeslani v tele. Stopy se objevi v neodeslanem .msg (napr. SendAsDenied) az kdyz Outlook chybu dopsal a re-upload (jnj_mailbox_sync v1.3) ji prinesl na Tower.""" blob = "\n".join(t for t in texts if isinstance(t, str)) if not blob: return False, None if not any(m in blob for m in SEND_FAIL_MARKERS): return False, None err = "send failed" m = re.search(r"ec=(\d+)", blob) if m: err = f"SendAsDenied (ec={m.group(1)})" m2 = re.search(r"Error is \[([0-9xA-Fa-f\-]+)\]", blob) if m2: err += f" {m2.group(1)}" return True, err def read_send_account(msg_path: Path) -> str: """Precte PrimarySendAccount (0x0E28) + SentRepresenting email (0x0065) PRIMO z OLE — extract_msg tyto stringove props nevraci. Spojene do jednoho retezce; pro detekci SendAs (buzalka.cz na uctu its.jnj.com = odmitnuto).""" try: ole = olefile.OleFileIO(str(msg_path)) except Exception: return "" out = [] try: for tag4 in ("0E28", "0065"): for t in (tag4 + "001F", tag4 + "001E"): name = "__substg1.0_" + t if ole.exists(name): try: raw = ole.openstream(name).read() s = (raw.decode("utf-16-le") if t.endswith("001F") else raw.decode("cp1250", errors="replace")) except Exception: s = raw.decode("latin-1", errors="replace") s = s.strip() if s: out.append(s) break finally: ole.close() return " ".join(out) def extract_message(msg_path: Path) -> Optional[dict]: """Parsuje jeden .msg soubor -> MongoDB dokument.""" msg, parse_mode = open_message(msg_path) if msg is None: logging.error("open failed [%s]: vsechny pokusy o otevreni selhaly", msg_path.name) return None try: # ── Message-ID ──────────────────────────────────────────────── mid = None for attr in ("messageId", "message_id", "internetMessageId"): mid = safe(msg, attr) if mid: break if not mid: mid = f"filename:{msg_path.stem}" mid = str(mid).strip() # ── Predmet ─────────────────────────────────────────────────── try: subject = msg.subject or "" except Exception: subject = "" normalized_subject = safe(msg, "normalizedSubject", "normalized_subject", default="") # ── Telo ────────────────────────────────────────────────────── try: body_text = msg.body or "" except Exception: body_text = "" body_html = None try: bh = msg.htmlBody if isinstance(bh, bytes): bh = bh.decode("utf-8", errors="replace") if bh: body_html = bh if len(bh) <= 2 * 1024 * 1024 else bh[:2 * 1024 * 1024] except Exception: pass # ── Odesilatel ──────────────────────────────────────────────── try: sender_email = msg.sender or "" except Exception: sender_email = "" sender_name = safe(msg, "senderName", "sender_name", default="") sender_smtp = safe(msg, "senderSmtpAddress", "sent_representing_smtp_address", default="") # ── Prijemci ────────────────────────────────────────────────── recipients = extract_recipients(msg) try: to_raw = msg.to or "" except Exception: to_raw = "" try: cc_raw = msg.cc or "" except Exception: cc_raw = "" try: bcc_raw = getattr(msg, "bcc", None) or "" except Exception: bcc_raw = "" display_to = safe(msg, "displayTo", "display_to", default="") display_cc = safe(msg, "displayCc", "display_cc", default="") # ── Casy ────────────────────────────────────────────────────── try: received_at = parse_date(msg.date) except Exception: received_at = None sent_at = None for attr in ("clientSubmitTime", "client_submit_time", "sentOn"): v = safe(msg, attr) if v: sent_at = parse_date(v) break # ── MAPI vlastnosti ─────────────────────────────────────────── importance = 1 try: v = msg.importance if v is not None: importance = int(v) except Exception: pass sensitivity = 0 try: v = getattr(msg, "sensitivity", None) if v is not None: sensitivity = int(v) except Exception: pass flag_status = 0 try: v = safe(msg, "flagStatus", "flag_status") if v is not None: flag_status = int(v) except Exception: pass conversation_topic = safe(msg, "conversationTopic", "conversation_topic", default="") conversation_index = "" try: ci = safe(msg, "conversationIndex", "conversation_index") if isinstance(ci, bytes): conversation_index = base64.b64encode(ci).decode() elif ci: conversation_index = str(ci) except Exception: pass in_reply_to = safe(msg, "inReplyTo", "in_reply_to", default="") internet_refs = [] try: refs = safe(msg, "internetReferences", "internet_references") if isinstance(refs, list): internet_refs = refs elif isinstance(refs, str) and refs: internet_refs = [r.strip() for r in refs.split() if r.strip()] except Exception: pass categories = [] try: cats = safe(msg, "categories") if isinstance(cats, list): categories = [str(c) for c in cats if c] elif isinstance(cats, str) and cats: categories = [c.strip() for c in re.split(r"[;,]", cats) if c.strip()] except Exception: pass read_receipt = bool(safe(msg, "readReceiptRequested", "read_receipt_requested", default=False)) delivery_receipt = bool(safe(msg, "deliveryReceiptRequested", "delivery_receipt_requested", default=False)) # ── Internet headers ────────────────────────────────────────── headers = extract_headers(msg) if not in_reply_to: in_reply_to = headers.get("in_reply_to", "") if not internet_refs: refs_str = headers.get("references", "") if isinstance(refs_str, str) and refs_str: internet_refs = [r.strip() for r in refs_str.split() if r.strip()] # ── Prilohy ─────────────────────────────────────────────────── attachments = extract_attachments(msg) # ── Raw MAPI ────────────────────────────────────────────────── mapi_raw = extract_mapi_props(msg) msg.close() # ── Raw-OLE fallback pro degradovana textova pole ───────────── parse_degraded = parse_mode != "normal" forced = parse_mode != "normal" if (forced or _degraded(subject) or _degraded(body_text) or _degraded(sender_email) or (body_html and "�" in body_html)): raw = _raw_mapi_strings(msg_path) if raw["subject"] and (forced or _degraded(subject)): subject = raw["subject"] if raw["normalized_subject"] and (forced or _degraded(normalized_subject)): normalized_subject = raw["normalized_subject"] if raw["body_text"] and (forced or _degraded(body_text)): body_text = raw["body_text"] if raw["body_html"] and (forced or not body_html or "�" in body_html): bh = raw["body_html"] body_html = bh if len(bh) <= 2 * 1024 * 1024 else bh[:2 * 1024 * 1024] if (raw["sender_smtp"] or raw["sender_email"]) and (forced or _degraded(sender_email)): sender_email = raw["sender_smtp"] or raw["sender_email"] if raw["sender_name"] and (forced or _degraded(sender_name)): sender_name = raw["sender_name"] if raw["sender_smtp"] and not sender_smtp: sender_smtp = raw["sender_smtp"] # ── Detekce neodeslaneho e-mailu (v1.4 telo + v1.5 send-account) ── send_failed, send_error = detect_send_failure(body_text, body_html) send_account = read_send_account(msg_path) # v1.5 if send_account and "buzalka.cz" in send_account.lower(): send_failed = True if not send_error: send_error = "SendAs buzalka.cz (PrimarySendAccount)" # ── Dokument ────────────────────────────────────────────────── return { "_id": mid, "filename": msg_path.name, "subject": subject, "normalized_subject": normalized_subject, "importance": importance, "sensitivity": sensitivity, "flag_status": flag_status, "read_receipt_requested": read_receipt, "delivery_receipt_requested": delivery_receipt, "has_attachments": len(attachments) > 0, "attachment_count": len(attachments), "message_size_bytes": msg_path.stat().st_size, "conversation_topic": conversation_topic, "conversation_index": conversation_index, "in_reply_to": in_reply_to, "internet_references": internet_refs, "categories": categories, "received_at": received_at, "sent_at": sent_at, "sender": { "email": sender_email, "name": sender_name, "smtp": sender_smtp, }, "to": to_raw, "cc": cc_raw, "bcc": bcc_raw, "display_to": display_to, "display_cc": display_cc, "recipients": recipients, "body_text": body_text, "body_html": body_html, "attachments": attachments, "headers": headers, "mapi": mapi_raw, "parse_mode": parse_mode, "parse_degraded": parse_degraded, "send_failed": send_failed, "send_error": send_error, "send_account": send_account, "parsed_at": datetime.now(timezone.utc).replace(tzinfo=None), # priznak ze prilohy (pokud nejake) jsou v SeaweedFS — pro backfill "seaweed_synced_at": (datetime.now(timezone.utc).replace(tzinfo=None) if any(a.get("seaweed_path") for a in attachments) else None), } except Exception as e: logging.error("extract_message failed [%s]: %s", msg_path.name, e) return None def create_indexes(col): print(" Vytvarim indexy...") col.create_index([("received_at", ASCENDING)]) col.create_index([("sent_at", ASCENDING)]) col.create_index([("sender.email", ASCENDING)]) col.create_index([("filename", ASCENDING)], unique=True, sparse=True) col.create_index([("conversation_topic", ASCENDING)]) col.create_index([("has_attachments", ASCENDING)]) col.create_index([("categories", ASCENDING)]) col.create_index([("importance", ASCENDING)]) col.create_index([("flag_status", ASCENDING)]) col.create_index([ ("subject", TEXT), ("body_text", TEXT), ("to", TEXT), ("cc", TEXT), ], name="text_search", default_language="none") print(" Indexy hotovy.") def run_parse(col, state_col, args, now) -> dict: """FAZE 1: inkrementalni parse .msg -> emaily. Vraci statistiku.""" stats = {"mode": None, "total_files": 0, "candidates": 0, "ok": 0, "err": 0} print("\n=== FAZE 1: PARSE (.msg -> emaily) ===") all_files = sorted(MSGS_DIR.glob("*.msg")) stats["total_files"] = len(all_files) if not all_files: print(" Zadne .msg ve zdroji -> preskakuji.") return stats max_mtime = max(f.stat().st_mtime for f in all_files) ps = state_col.find_one({"_id": "parse_state"}) or {} last_mtime = ps.get("last_parse_mtime") if args.full: candidates = all_files mode = "full" elif last_mtime is None: print(" Prvni beh (zadny mtime watermark) -> seed dle filename v Mongu...") existing = set(col.distinct("filename")) candidates = [f for f in all_files if f.name not in existing] mode = "seed" print(f" V Mongu jiz {len(existing)} filename; nove k naparsovani: {len(candidates)}") else: candidates = [f for f in all_files if f.stat().st_mtime > last_mtime] mode = "incremental" if args.limit: candidates = candidates[:args.limit] stats["mode"] = mode stats["candidates"] = len(candidates) wm_str = datetime.fromtimestamp(last_mtime).strftime("%Y-%m-%d %H:%M:%S") if last_mtime else "(zadny)" print(f" Rezim: {mode} | .msg celkem {len(all_files)} | watermark {wm_str} | ke zpracovani {len(candidates)}") if not candidates: print(" Nic noveho k parsovani.") # I tak posun watermark na nejnovejsi soubor (krome --full a dry-run) if not args.dry_run and mode != "full": state_col.update_one({"_id": "parse_state"}, {"$set": {"last_parse_mtime": max_mtime, "last_parse_at": now}}, upsert=True) return stats if args.dry_run: print(f" DRY-RUN: naparsoval bych {len(candidates)} souboru (Mongo se nemeni). Ukazka:") for f in candidates[:10]: mt = datetime.fromtimestamp(f.stat().st_mtime).strftime("%Y-%m-%d %H:%M:%S") print(f" + {f.name} (mtime {mt})") if len(candidates) > 10: print(f" ... a dalsich {len(candidates) - 10}") return stats batch = [] verbose = len(candidates) <= 30 def flush(): if not batch: return try: col.bulk_write(batch, ordered=False) except Exception as e: logging.error("bulk_write spadl (%s) -- prepinam na per-dokument", e) print(f" CHYBA bulk_write: {e} -- zkousim per-dokument") for op in batch: try: col.bulk_write([op], ordered=False) except Exception as e2: try: bad_id = getattr(op, "_filter", {}).get("_id", "?") except Exception: bad_id = "?" logging.error("per-dokument selhal [_id=%s]: %s", bad_id, e2) print(f" ZAHOZEN _id={bad_id}: {e2}") stats["ok"] -= 1 stats["err"] += 1 batch.clear() for i, msg_path in enumerate(candidates, 1): doc = extract_message(msg_path) if doc is None: stats["err"] += 1 else: batch.append(UpdateOne({"_id": doc["_id"]}, {"$set": doc}, upsert=True)) stats["ok"] += 1 if len(batch) >= BATCH_SIZE: flush() if verbose: status = "ERR " if doc is None else "OK " subj = (doc.get("subject") or "")[:60] if doc else "?" print(f" {i:>5}/{len(candidates)} {status} {subj}") elif i % 500 == 0: print(f" prubeh {i}/{len(candidates)} ok={stats['ok']} err={stats['err']}") flush() # Indexy jen pri full/seed/--reindex (v inkrementalnim behu uz existuji) if mode in ("full", "seed") or args.reindex: create_indexes(col) # Posun watermark na nejnovejsi soubor state_col.update_one({"_id": "parse_state"}, {"$set": {"last_parse_mtime": max_mtime, "last_parse_at": now, "last_parsed_count": stats["ok"], "last_parse_mode": mode}}, upsert=True) print(f" PARSE hotovo: ok={stats['ok']} err={stats['err']} " f"watermark={datetime.fromtimestamp(max_mtime):%Y-%m-%d %H:%M:%S}") return stats # ══════════════════════════════════════════════════════════════════════════════ # FAZE 2 — SYNC (SQLite -> Mongo jnj_messages + emaily cesta) # [drive sync_jnj_state_v1.0.py] # ══════════════════════════════════════════════════════════════════════════════ def norm_mid(s: str) -> str: return (s or "").strip().strip("<>").strip() def coalesce_path(jnjf, fld) -> str: return jnjf if (jnjf and jnjf.strip()) else (fld or "") def newest_db(): cands = glob.glob(os.path.join(DB_DIR, "jnjemails_*.db")) or glob.glob(os.path.join(DB_DIR, "*.db")) return max(cands, key=os.path.getmtime) if cands else None def run_sync(db, args, now) -> dict: """FAZE 2: SQLite -> jnj_messages (zrcadlo) + emaily (cesta/stav).""" stats = {"total": 0, "matched": 0, "skipped": False} print("\n=== FAZE 2: SYNC (SQLite -> jnj_messages + emaily cesta) ===") emails = db[EMAILS_COL] state_col = db[STATE_COL] db_path = newest_db() if not db_path: print(f" Zadna .db v {DB_DIR} -> preskakuji.") stats["skipped"] = True return stats db_name = os.path.basename(db_path) print(f" SQLite: {db_name}") st = state_col.find_one({"_id": "watermark"}) or {} # ── Zkratka: tuto DB uz jsme zpracovali? (jen inkrementalni rezim) ───── if not args.full and not args.force and st.get("last_db") == db_name: print(f" DB {db_name} uz byla zpracovana (last_db) -> nic na praci.") stats["skipped"] = True return stats wm = None if args.full else st.get("last_updated_at") print(f" Watermark: {wm or '(zadny -> vse)'}") # ── SQLite (read-only) ──────────────────────────────────────────────── con = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) con.row_factory = sqlite3.Row available = {row[1] for row in con.execute("PRAGMA table_info(messages)")} sel_cols = [c for c in ROW_COLS if c in available] missing = [c for c in ROW_COLS if c not in available] if missing: print(f" (DB nema sloupce: {', '.join(missing)} -> default None/0)") has_updated = "updated_at" in available # ── NULL-safe vyber radku ───────────────────────────────────────────── # Stary inbox_full_sync zapisuje radky s updated_at=NULL; cisty watermark # filtr "updated_at > wm" je v SQL TISE zahazuje (NULL > x = false). # Bereme proto i radky s updated_at IS NULL, ktere jeste NEJSOU v zrcadle # jnj_messages (aby se zpracovaly prave jednou). --full bere vse. mirrored_ids = set() if not args.full: mirrored_ids = {d["_id"] for d in db[MIRROR_COL].find({}, {"_id": 1})} q = f"SELECT {', '.join(sel_cols)} FROM messages" params = () if not args.full and wm and has_updated: q += " WHERE updated_at > ? OR updated_at IS NULL" params = (wm,) elif not args.full and wm and not has_updated: print(" (DB nema updated_at -> watermark ignorovan, beru vse)") wm = None raw_rows = con.execute(q, params).fetchall() con.close() rows = [] skipped_null = 0 for row in raw_rows: d = dict(row) if (not args.full) and d.get("updated_at") is None and d.get("message_id") in mirrored_ids: skipped_null += 1 # NULL radek uz zrcadleny -> hotovo, nepocitame znovu continue rows.append(d) if skipped_null: print(f" (NULL-safe: preskoceno {skipped_null} NULL-updated_at radku uz v jnj_messages)") if args.limit: rows = rows[:args.limit] total = len(rows) stats["total"] = total print(f" Radku ke zpracovani: {total}") if total == 0: print(" Neni co synchronizovat (zadne nove radky).") if not args.dry_run: state_col.update_one({"_id": "watermark"}, {"$set": {"last_db": db_name, "synced_at": now}}, upsert=True) return stats # ── Indexy z Monga ──────────────────────────────────────────────────── print(" Nacitam _id + filename + jnj_folder z Mongo...") ids_exact = set() ids_norm = {} fnames = {} has_path = set() for d in emails.find({}, {"_id": 1, "filename": 1, "jnj_folder": 1}): _id = d["_id"] ids_exact.add(_id) ids_norm.setdefault(norm_mid(_id), _id) fn = d.get("filename") if fn: fnames[fn] = _id if d.get("jnj_folder"): has_path.add(_id) print(f" Mongo dokumentu v {EMAILS_COL}: {len(ids_exact)} (z toho s jnj_folder: {len(has_path)})") # ── Plan ────────────────────────────────────────────────────────────── m_exact = m_norm = m_fname = unmatched = 0 examples = [] mirror_ops = [] emaily_ops = [] max_wm = wm or "" for r in rows: mid = r.get("message_id") uv = r.get("updated_at") if uv and uv > max_wm: max_wm = uv # Krok A — zrcadlo (vzdy) doc = {k: r.get(k) for k in ROW_COLS} doc["mirrored_at"] = now mirror_ops.append(UpdateOne({"_id": mid}, {"$set": doc}, upsert=True)) # Krok B — match do emaily target = None if mid in ids_exact: target = mid; m_exact += 1 elif norm_mid(mid) in ids_norm: target = ids_norm[norm_mid(mid)]; m_norm += 1 else: eid = r.get("entry_id") fn = (eid[-20:] + ".msg") if eid else None if fn and fn in fnames: target = fnames[fn]; m_fname += 1 else: unmatched += 1 if len(examples) < 6: examples.append(mid) if target is not None: setdoc = { "jnj_folder": coalesce_path(r.get("jnj_folder"), r.get("folder")), "jnj_is_read": bool(r.get("is_read")), "jnj_not_in_mailbox": bool(r.get("not_in_mailbox_anymore")), "jnj_left_mailbox_at": r.get("left_mailbox_at"), "jnj_folder_synced_at": now, } emaily_ops.append(UpdateOne({"_id": target}, {"$set": setdoc})) matched = m_exact + m_norm + m_fname stats["matched"] = matched print(" --- PLAN ---") print(f" Zrcadlo -> {MIRROR_COL}: {len(mirror_ops)} upsert") print(f" Emaily match exact (_id): {m_exact}") print(f" Emaily match norm (<>): {m_norm}") print(f" Emaily match filename: {m_fname}") print(f" Emaily match CELKEM: {matched}/{total} ({100.0*matched/total:.1f}%)") print(f" NEnamatchovano: {unmatched}") if examples: print(" Priklady nenamatchovanych message_id:") for e in examples: print(f" {str(e)[:72]}") # ── Zapis ───────────────────────────────────────────────────────────── if args.dry_run: print(" DRY-RUN: Mongo se NEMENI.") return stats print(" Zapisuji...") if mirror_ops: db[MIRROR_COL].bulk_write(mirror_ops, ordered=False) if emaily_ops: emails.bulk_write(emaily_ops, ordered=False) state_col.update_one( {"_id": "watermark"}, {"$set": {"last_updated_at": max_wm, "synced_at": now, "last_db": db_name, "last_total": total, "last_matched": matched}}, upsert=True, ) print(f" SYNC hotovo: zrcadlo={len(mirror_ops)} emaily={len(emaily_ops)} watermark={max_wm}") return stats # ══════════════════════════════════════════════════════════════════════════════ # FAZE 3 — ENRICH (Mongo -> PG fulltext, deleguje na sdileny 5_enrich) # [drive jnj_emails_to_fulltext_v1.0.py] # ══════════════════════════════════════════════════════════════════════════════ def newest_enrich(): """Najde nejnovejsi /scripts/5_enrich_fulltext_emails_v*.py podle verze vX.Y.""" cands = glob.glob(ENRICH_GLOB) if not cands: return None def ver(p): m = re.search(r"_v(\d+)\.(\d+)", os.path.basename(p)) return (int(m.group(1)), int(m.group(2))) if m else (0, 0) return max(cands, key=ver) def run_enrich(args, new_docs, force) -> dict: """FAZE 3: doindexuje JNJ schranku do PG fulltextu pres sdileny enrich. Spousti se jen kdyz parse pridal nove dokumenty (nebo force/enrich-only).""" stats = {"ran": False, "rc": None, "skipped_reason": None} print("\n=== FAZE 3: ENRICH (PG fulltext) ===") if args.no_enrich: stats["skipped_reason"] = "--no-enrich" print(" Preskoceno [--no-enrich].") return stats if args.dry_run: enrich = newest_enrich() stats["skipped_reason"] = "dry-run" print(f" DRY-RUN: zavolal bych {enrich or '(enrich nenalezen!)'} --mailbox {EMAILS_COL}" f" (nove doc z parse: {new_docs}, force={force})") return stats if not force and new_docs <= 0: stats["skipped_reason"] = "zadne nove doc" print(" Zadne nove maily z parse -> enrich preskocen " "(JNJ stejne enrichuje pipeline v 6:00/18:00; --enrich-always vynuti).") return stats enrich = newest_enrich() if not enrich: stats["skipped_reason"] = "enrich skript nenalezen" print(f" CHYBA: zadny enrich skript ({ENRICH_GLOB}) -> preskakuji.") return stats cmd = [sys.executable, enrich, "--mailbox", EMAILS_COL] print(f" Spoustim: {' '.join(cmd)}") sys.stdout.flush() r = subprocess.run(cmd) stats["ran"] = True stats["rc"] = r.returncode print(f" ENRICH hotovo: exit code {r.returncode}") return stats # ══════════════════════════════════════════════════════════════════════════════ # FAZE RECONCILE — smaz provizorni duplikat (no-ID Sent kopie s ID-dvojcetem) # ══════════════════════════════════════════════════════════════════════════════ _EMAIL_RE = re.compile(r"[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}") def _to_emails(s): return frozenset(e.lower() for e in _EMAIL_RE.findall(s or "")) def _subj_key(d): return (d.get("normalized_subject") or d.get("subject") or "").strip().lower() def _is_provisional_id(_id): return isinstance(_id, str) and (_id.startswith("filename:") or _id.startswith("entryid:")) def run_reconcile(db, args, now): """Smaze provizorni no-ID Sent kopie, ke kterym existuje dvojce s realnym Message-ID (stejni 'to' prijemci + stejny subjekt + received_at do 24h). Neodeslane (bez dvojcete) ponecha. --dry-run = jen plan, nic nemaze. Match je na STABILNIM obsahu (emailove adresy + normalized_subject + cas), NE na EntryID — provizorni a finalni kopie maji ruzny EntryID.""" stats = {"provisional": 0, "deletable": 0, "deleted": 0, "kept": 0} print("\n=== FAZE RECONCILE (smaz provizorni duplikaty Sent bez Message-ID) ===") emails = db[EMAILS_COL] # 1) index dvojcat: realne-ID Sent dokumenty -> klic (to_emails, subj) -> [received_at] twins = {} for d in emails.find( {"jnj_folder": {"$regex": "Sent Items"}}, {"_id": 1, "to": 1, "normalized_subject": 1, "subject": 1, "received_at": 1}): if _is_provisional_id(d.get("_id")): continue # jako dvojce berem jen dokumenty s realnym Message-ID key = (_to_emails(d.get("to")), _subj_key(d)) if not key[0] or not key[1]: continue twins.setdefault(key, []).append(d.get("received_at")) # 2) projdi provizorni a najdi dvojce v casovem okne 24h WINDOW = 24 * 3600 to_delete = [] examples_keep = [] for p in emails.find( {"jnj_folder": {"$regex": "Sent Items"}, "_id": {"$regex": "^(filename:|entryid:)"}}, {"_id": 1, "to": 1, "normalized_subject": 1, "subject": 1, "received_at": 1, "send_failed": 1}): stats["provisional"] += 1 key = (_to_emails(p.get("to")), _subj_key(p)) pr = p.get("received_at") matched = False if key[0] and key[1] and key in twins and pr is not None: for tr in twins[key]: if tr is None: continue try: if abs((tr - pr).total_seconds()) <= WINDOW: matched = True break except Exception: continue if matched: stats["deletable"] += 1 to_delete.append((p["_id"], p.get("to"))) else: stats["kept"] += 1 if p.get("send_failed") and len(examples_keep) < 8: examples_keep.append(p.get("to")) print(f" Provizornich (Sent bez Message-ID): {stats['provisional']}") print(f" S nalezenym ID-dvojcetem (smazat): {stats['deletable']}") print(f" Bez dvojcete (ponechat): {stats['kept']}") if examples_keep: print(" Priklady ponechanych s priznakem NEODESLANO:") for to in examples_keep: print(f" NEODESLANO | {to}") if not to_delete: print(" Nic ke smazani.") return stats if args.dry_run: print(" DRY-RUN: NIC se nemaze. Ukazka kandidatu na smazani:") for _id, to in to_delete[:15]: print(f" - {_id} ({to})") if len(to_delete) > 15: print(f" ... a dalsich {len(to_delete) - 15}") return stats ids = [x[0] for x in to_delete] res = emails.delete_many({"_id": {"$in": ids}}) stats["deleted"] = res.deleted_count print(f" SMAZANO provizornich duplikatu: {stats['deleted']}") return stats # ══════════════════════════════════════════════════════════════════════════════ # MAIN # ══════════════════════════════════════════════════════════════════════════════ def main(): ap = argparse.ArgumentParser(description=f"jnj_tower_ingest v{SCRIPT_VERSION}") ap.add_argument("--dry-run", action="store_true", help="nic nezapise, jen plan") ap.add_argument("--full", action="store_true", help="parse: reparsuj vse; sync: ignoruj watermark") ap.add_argument("--limit", type=int, default=0, help="max N souboru/radku (test)") ap.add_argument("--reindex", action="store_true", help="vynut indexy po parse") ap.add_argument("--force", action="store_true", help="sync: ignoruj last_db zkratku") ap.add_argument("--parse-only", action="store_true", help="jen faze PARSE") ap.add_argument("--sync-only", action="store_true", help="jen faze SYNC") ap.add_argument("--enrich-only", action="store_true", help="jen faze ENRICH") ap.add_argument("--no-enrich", action="store_true", help="preskoc fazi ENRICH") ap.add_argument("--enrich-always", action="store_true", help="spust enrich i bez novych dokumentu z parse") ap.add_argument("--reconcile", action="store_true", help="spust fazi RECONCILE (smaz provizorni Sent duplikaty; " "s --dry-run jen plan)") args = ap.parse_args() now = datetime.now(timezone.utc).replace(tzinfo=None) print(f"=== jnj_tower_ingest v{SCRIPT_VERSION} {'[DRY-RUN]' if args.dry_run else ''} ===") print(f"Start: {datetime.now():%Y-%m-%d %H:%M:%S}") print(f"MongoDB: {MONGO_URI} -> {MONGO_DB}") client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) try: client.admin.command("ping") print(" MongoDB OK") except Exception as e: print(f"CHYBA: MongoDB nedostupna -- {e}") sys.exit(1) db = client[MONGO_DB] col = db[EMAILS_COL] state_col = db[STATE_COL] p_stats = s_stats = e_stats = r_stats = None if not args.sync_only and not args.enrich_only: p_stats = run_parse(col, state_col, args, now) if not args.parse_only and not args.enrich_only: s_stats = run_sync(db, args, now) # RECONCILE bezi jen na vyzadani (--reconcile); potrebuje jnj_folder ze sync. if args.reconcile and not args.parse_only and not args.enrich_only: r_stats = run_reconcile(db, args, now) if not args.parse_only and not args.sync_only: new_docs = p_stats["ok"] if p_stats else 0 force = args.enrich_only or args.enrich_always or args.full e_stats = run_enrich(args, new_docs, force) # ── Souhrn ──────────────────────────────────────────────────────────── print("\n=== SOUHRN ===") if p_stats is not None: print(f" PARSE: rezim={p_stats['mode']} kandidatu={p_stats['candidates']} " f"ok={p_stats['ok']} err={p_stats['err']}") if s_stats is not None: if s_stats.get("skipped"): print(" SYNC: preskoceno (zadna nova DB / uz zpracovana)") else: print(f" SYNC: radku={s_stats['total']} match={s_stats['matched']}") if r_stats is not None: akce = "plan" if args.dry_run else f"smazano={r_stats['deleted']}" print(f" RECON: provizornich={r_stats['provisional']} " f"smazatelnych={r_stats['deletable']} {akce}") if e_stats is not None: if e_stats.get("ran"): print(f" ENRICH: spusten, exit code {e_stats['rc']}") else: print(f" ENRICH: preskoceno ({e_stats.get('skipped_reason')})") print(f"Konec: {datetime.now():%Y-%m-%d %H:%M:%S}") client.close() if __name__ == "__main__": main()