Files
janssen/EmailsImport/Trash/jnj_tower_ingest_v1.1.py
T
2026-06-10 20:16:38 +02:00

1109 lines
44 KiB
Python

"""
jnj_tower_ingest v1.1
Nazev: jnj_tower_ingest_v1.1.py
Verze: 1.1.0
Datum: 2026-06-10
Autor: vladimir.buzalka
Popis:
Sjednoceny Tower-side ingest JNJ e-mailu. Spojuje tri drive oddelene
casti do jednoho behu (vse bezi v kontejneru python-runner u Monga):
FAZE 1 — PARSE (drive parse_emails_tower_v1.3.py):
.msg soubory z /mnt/JNJEMAILS -> dokument v Mongo
emaily."vbuzalka@its.jnj.com" (bohata extrakce: telo, prilohy,
hlavicky, MAPI props, ...). _id = Internet Message-ID.
INKREMENTALNE: parsuje jen soubory novejsi nez mtime watermark
(jnj_sync_state/_id="parse_state"). Prvni beh = seed dle filename
v Mongu. --full reparsuje vse.
FAZE 2 — SYNC (drive sync_jnj_state_v1.0.py):
nejnovejsi /mnt/JNJEMAILS/db/jnjemails_*.db (SQLite, JEN CTENI ro)
-> zrcadlo do Mongo kolekce 'jnj_messages' (upsert)
-> doplneni cesty/stavu do emaily."vbuzalka@its.jnj.com":
jnj_folder = COALESCE(jnj_folder, folder)
jnj_is_read, jnj_not_in_mailbox, jnj_left_mailbox_at,
jnj_folder_synced_at (match _id==message_id, fallback
filename; BEZ upsertu — nezakladame stuby).
Inkrementalne pres watermark updated_at (jnj_sync_state/_id=
"watermark") + zkratka last_db (stejna DB -> hned no-op).
FAZE 3 — ENRICH (drive jnj_emails_to_fulltext_v1.0.py):
doindexuje JNJ schranku do PG fulltextu zavolanim SDILENEHO
skriptu 5_enrich_fulltext_emails_vX.Y.py --mailbox
"vbuzalka@its.jnj.com" (stejny extractor jako Graph pipeline ->
konzistentni schema). Verze enrich se auto-detekuje (nejnovejsi
/scripts/5_enrich_fulltext_emails_v*.py). Spousti se JEN kdyz
parse pridal nove dokumenty (jinak preskok — JNJ stejne enrichuje
pipeline v 6:00/18:00). --no-enrich vypne, --enrich-always vynuti.
PORADI: parse -> sync -> enrich. Cerstve naparsovane maily dostanou cestu
(sync) i fulltext (enrich) hned ve stejnem behu (drive: pokud sync/enrich
predbehl parse, novy mail nemel co zpracovat). Tri nezavisle udalosti
(nova .msg / nova .db / nove doc pro PG) -> skript udela jen to, co ma
praci; jinak levny no-op (vhodne pro cron kazdych 5 minut).
Spojovaci klic vsude = Internet Message-ID = Mongo _id.
Prostredi:
Docker container "python-runner" na Unraid Tower.
/mnt/user/JNJEMAILS -> /mnt/JNJEMAILS (.msg v rootu, .db v db/)
MongoDB 192.168.1.76:27017 (externi).
Argumenty:
--dry-run nic nezapise, jen spocita a vypise plan vsech fazi
--full parse: reparsuj vse; sync: ignoruj watermark
--limit N max N souboru (parse) / radku (sync) — test
--reindex vynut vytvoreni indexu na konci parse faze
--force sync: ignoruj zkratku last_db (zpracuj i hotovou DB)
--parse-only spust jen fazi PARSE
--sync-only spust jen fazi SYNC
--enrich-only spust jen fazi ENRICH (vynuti enrich i bez novych dat)
--no-enrich preskoc fazi ENRICH
--enrich-always spust enrich i kdyz parse nepridal nove dokumenty
Spousteni (v kontejneru python-runner):
# Test:
docker exec -it python-runner python3 /scripts/jnj_tower_ingest_v1.1.py --dry-run
# Ostry inkrementalni beh (cron):
docker exec python-runner python3 /scripts/jnj_tower_ingest_v1.1.py
# Plny reparse + reindex:
docker exec -it python-runner python3 /scripts/jnj_tower_ingest_v1.1.py --full --reindex
Zavislosti (v image python-runner):
extract-msg==0.55.0, olefile, pymongo, python-dateutil, sqlite3 (stdlib).
Enrich faze deleguje na 5_enrich_fulltext_emails (psycopg, bs4 v image).
Python 3.10+.
Historie verzi:
1.0.0 2026-06-10 Sjednoceni parse_emails_tower_v1.3 + sync_jnj_state_v1.0
do jedineho skriptu. Parse zinkrementalnen pres mtime
watermark (drive scan celeho adresare kazdy beh).
Indexy jen pri full/seed/--reindex. Poradi parse->sync.
1.1.0 2026-06-10 + FAZE 3 ENRICH: deleguje na sdileny
5_enrich_fulltext_emails --mailbox (auto-detekce verze),
jen kdyz parse pridal nove dokumenty. Nahrazuje
jnj_emails_to_fulltext_v1.0.py (ten -> Trash).
Flagy --enrich-only/--no-enrich/--enrich-always.
"""
import sys
import os
import re
import glob
import logging
import argparse
import base64
import struct
import sqlite3
import subprocess
from pathlib import Path
from datetime import datetime, timezone
from typing import Optional
import extract_msg
from extract_msg.enums import ErrorBehavior
import olefile
from dateutil import parser as dtparser
from pymongo import MongoClient, UpdateOne, ASCENDING, TEXT
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
# ─── KONFIGURACE ──────────────────────────────────────────────────────────────
MSGS_DIR = Path("/mnt/JNJEMAILS")
DB_DIR = "/mnt/JNJEMAILS/db"
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "emaily"
EMAILS_COL = "vbuzalka@its.jnj.com"
MIRROR_COL = "jnj_messages"
STATE_COL = "jnj_sync_state"
BATCH_SIZE = 200
LOG_FILE = Path(__file__).parent / "jnj_tower_ingest_errors.log"
ENRICH_GLOB = "/scripts/5_enrich_fulltext_emails_v*.py" # sdileny PG enrich
SCRIPT_VERSION = "1.1.0"
# Sloupce zrcadlene ze SQLite messages -> jnj_messages
ROW_COLS = ["message_id", "subject", "sender", "received_at", "folder",
"jnj_folder", "is_read", "not_in_mailbox_anymore", "left_mailbox_at",
"entry_id", "graph_id", "updated_at", "source"]
# ──────────────────────────────────────────────────────────────────────────────
logging.basicConfig(
filename=str(LOG_FILE),
level=logging.ERROR,
format="%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
encoding="utf-8",
)
# ══════════════════════════════════════════════════════════════════════════════
# FAZE 1 — PARSE (.msg -> Mongo emaily) [drive parse_emails_tower_v1.3.py]
# ══════════════════════════════════════════════════════════════════════════════
def safe(obj, *attrs, default=None):
"""Bezpecne cteni atributu — vrati prvni non-None hodnotu."""
for attr in attrs:
try:
val = getattr(obj, attr, None)
if val is None:
continue
if isinstance(val, str) and not val.strip():
continue
return val
except Exception:
continue
return default
def parse_date(raw) -> Optional[datetime]:
"""Libovolny datum -> UTC datetime bez tzinfo (pro MongoDB)."""
if raw is None:
return None
if isinstance(raw, datetime):
if raw.tzinfo:
return raw.astimezone(timezone.utc).replace(tzinfo=None)
return raw
try:
dt = dtparser.parse(str(raw))
if dt.tzinfo:
return dt.astimezone(timezone.utc).replace(tzinfo=None)
return dt
except Exception:
return None
_INT64_MIN, _INT64_MAX = -(2 ** 63), 2 ** 63 - 1
def to_bson(val):
"""Konvertuje hodnotu na BSON-serializovatelny typ.
Pozor: BSON umi jen signed int64. Python ma neomezene integery, takze
velke MAPI hodnoty (PR_CHANGE_KEY, FILETIME, 64-bit handle) mimo rozsah
int64 prevadime na string — jinak cely bulk_write spadne na
'MongoDB can only handle up to 8-byte ints'.
"""
# bool musi byt PRED int (isinstance(True, int) == True)
if isinstance(val, bool):
return val
if isinstance(val, bytes):
return val.hex() if len(val) <= 128 else f"<bytes:{len(val)}>"
if isinstance(val, datetime):
return parse_date(val)
if isinstance(val, int):
return val if _INT64_MIN <= val <= _INT64_MAX else str(val)
if isinstance(val, (str, float, type(None))):
return val
if isinstance(val, list):
return [to_bson(v) for v in val]
try:
iv = int(val)
return iv if _INT64_MIN <= iv <= _INT64_MAX else str(iv)
except Exception:
pass
return str(val)
def extract_headers(msg) -> dict:
headers = {}
try:
hdr = msg.header
if not hdr:
return {}
from email.header import decode_header as _dh
def _decode(v: str) -> str:
try:
parts = _dh(v)
out = ""
for part, enc in parts:
out += part.decode(enc or "utf-8", errors="replace") if isinstance(part, bytes) else part
return out
except Exception:
return v
for key in set(hdr.keys()):
k = key.lower().replace("-", "_")
vals = [_decode(v) for v in hdr.get_all(key, [])]
headers[k] = vals if len(vals) > 1 else (vals[0] if vals else "")
except Exception as e:
logging.error("extract_headers: %s", e)
return headers
def extract_recipients(msg) -> list:
result = []
type_map = {1: "to", 2: "cc", 3: "bcc"}
try:
for r in msg.recipients:
rtype = getattr(r, "type", 1)
try:
rtype = int(rtype)
except Exception:
try:
rtype = int(rtype.value)
except Exception:
rtype = 1
rec = {
"type": type_map.get(rtype, "to"),
"email": safe(r, "email", default=""),
"name": safe(r, "name", default=""),
}
result.append(rec)
except Exception as e:
logging.error("extract_recipients: %s", e)
return result
def extract_attachments(msg) -> list:
result = []
try:
for att in msg.attachments:
fname = safe(att, "longFilename", "shortFilename", default="")
if not fname:
continue
size = 0
try:
d = att.data
size = len(d) if d else 0
except Exception:
pass
result.append({
"filename": fname,
"size_bytes": size,
"mime_type": safe(att, "mimetype", "mimeType", default="application/octet-stream"),
"content_id": safe(att, "cid", default=None),
"is_inline": bool(safe(att, "isInline", default=False)),
})
except Exception as e:
logging.error("extract_attachments: %s", e)
return result
def extract_mapi_props(msg) -> dict:
"""Vsechny raw MAPI properties jako {0xXXXX: value}."""
result = {}
try:
props = msg.props
if not hasattr(props, "items"):
return {}
for key, prop in props.items():
try:
val = to_bson(prop.value)
prop_id = f"0x{key[:4].upper()}" if len(key) >= 4 else f"0x{key.upper()}"
result[prop_id] = val
except Exception:
pass
except Exception as e:
logging.error("extract_mapi_props: %s", e)
return result
# ─── Tolerantni otevirani a raw-OLE fallback ─────────────────────────────────
_CPID_TO_CODEC = {
1250: "cp1250", 1251: "cp1251", 1252: "cp1252", 1253: "cp1253",
1254: "cp1254", 1255: "cp1255", 1256: "cp1256", 1257: "cp1257",
1258: "cp1258", 874: "cp874", 932: "shift_jis", 936: "gb2312",
949: "euc_kr", 950: "big5", 65001: "utf-8", 28591: "iso-8859-1",
28592: "iso-8859-2", 20127: "ascii",
}
def _read_u32_prop(ole, propid):
"""Precte 32-bit hodnotu MAPI property z top-level __properties_version1.0."""
try:
data = ole.openstream("__properties_version1.0").read()
except Exception:
return None
body = data[32:] # 32-bajtova hlavicka top-level property streamu
for i in range(0, len(body) - 16 + 1, 16):
rec = body[i:i + 16]
tag = struct.unpack("<I", rec[0:4])[0]
if ((tag >> 16) & 0xFFFF) == propid:
return struct.unpack("<I", rec[8:12])[0]
return None
def _detect_cpid(ole) -> Optional[str]:
"""Codec dle PR_INTERNET_CPID / PR_MESSAGE_CODEPAGE (jako napoveda, ne dogma)."""
for pid in (0x3FDE, 0x3FFD): # INTERNET_CPID, MESSAGE_CODEPAGE
codec = _CPID_TO_CODEC.get(_read_u32_prop(ole, pid))
# utf-8/ascii nejsou dobry hint pro 8-bit stream (casto lzou)
if codec and codec not in ("utf-8", "ascii"):
return codec
return None
def _cascade_decode(raw: bytes, is_unicode: bool, cpid_codec: Optional[str]) -> str:
"""Dekoduje bajty MAPI stringu. Hlavickam se neveri — zkousime striktne
v poradi priorit a vezmeme prvni, co projde bez chyby."""
if not raw:
return ""
if is_unicode: # PT_UNICODE = utf-16-le
try:
return raw.decode("utf-16-le")
except Exception:
return raw.decode("utf-16-le", errors="replace")
order = ["utf-8"] # utf-8 strict = silny rozlisovac
if cpid_codec:
order.append(cpid_codec)
order += ["cp1250", "cp1252", "gb2312", "big5"]
for enc in order:
try:
return raw.decode(enc, errors="strict")
except Exception:
continue
return raw.decode("latin-1", errors="replace") # nikdy nespadne
def _raw_mapi_strings(msg_path: Path) -> dict:
"""Cte klicova textova MAPI pole PRIMO z OLE (mimo extract_msg).
Pouzije se jen kdyz extract_msg vrati degradovane pole."""
out = {"subject": "", "normalized_subject": "", "sender_name": "",
"sender_email": "", "sender_smtp": "", "body_text": "", "body_html": ""}
try:
ole = olefile.OleFileIO(str(msg_path))
except Exception:
return out
try:
cpid = _detect_cpid(ole)
wanted = { # MAPI tag -> klic v out
"0037": "subject", "0E1D": "normalized_subject",
"0C1A": "sender_name", "5D01": "sender_smtp",
"0C1F": "sender_email", "1000": "body_text", "1013": "body_html",
}
prefix = "__substg1.0_"
found = {} # key -> (priorita_typu, hodnota)
for entry in ole.listdir():
if len(entry) != 1: # jen top-level (ne vnorene zpravy)
continue
name = entry[0]
if not name.startswith(prefix):
continue
tag = name[len(prefix):len(prefix) + 4].upper()
key = wanted.get(tag)
if not key:
continue
typ = name[-4:].upper()
prio = {"001F": 3, "001E": 2, "0102": 1}.get(typ, 0)
if prio == 0:
continue
prev = found.get(key)
if prev and prev[0] >= prio: # preferuj unicode > ansi > binarni
continue
try:
raw = ole.openstream(entry).read()
val = _cascade_decode(raw, typ == "001F", cpid)
except Exception:
continue
found[key] = (prio, val)
for key, (_, val) in found.items():
out[key] = val
finally:
ole.close()
return out
def _degraded(s) -> bool:
"""Pole je degradovane: prazdne nebo obsahuje U+FFFD (nahradni znak)."""
return (not s) or ("" in s)
def open_message(msg_path: Path):
"""Kaskadove otevreni .msg -> (msg, mode) nebo (None, None)."""
try:
return extract_msg.Message(str(msg_path)), "normal"
except Exception:
pass
try:
return extract_msg.Message(
str(msg_path), errorBehavior=ErrorBehavior.SUPPRESS_ALL), "suppress_all"
except Exception:
pass
encs = []
try:
ole = olefile.OleFileIO(str(msg_path))
c = _detect_cpid(ole)
ole.close()
if c:
encs.append(c)
except Exception:
pass
for e in encs + ["cp1250", "cp1252"]:
try:
return extract_msg.Message(
str(msg_path), errorBehavior=ErrorBehavior.SUPPRESS_ALL,
overrideEncoding=e), f"override:{e}"
except Exception:
continue
return None, None
def extract_message(msg_path: Path) -> Optional[dict]:
"""Parsuje jeden .msg soubor -> MongoDB dokument."""
msg, parse_mode = open_message(msg_path)
if msg is None:
logging.error("open failed [%s]: vsechny pokusy o otevreni selhaly", msg_path.name)
return None
try:
# ── Message-ID ────────────────────────────────────────────────
mid = None
for attr in ("messageId", "message_id", "internetMessageId"):
mid = safe(msg, attr)
if mid:
break
if not mid:
mid = f"filename:{msg_path.stem}"
mid = str(mid).strip()
# ── Predmet ───────────────────────────────────────────────────
try:
subject = msg.subject or ""
except Exception:
subject = ""
normalized_subject = safe(msg, "normalizedSubject", "normalized_subject", default="")
# ── Telo ──────────────────────────────────────────────────────
try:
body_text = msg.body or ""
except Exception:
body_text = ""
body_html = None
try:
bh = msg.htmlBody
if isinstance(bh, bytes):
bh = bh.decode("utf-8", errors="replace")
if bh:
body_html = bh if len(bh) <= 2 * 1024 * 1024 else bh[:2 * 1024 * 1024]
except Exception:
pass
# ── Odesilatel ────────────────────────────────────────────────
try:
sender_email = msg.sender or ""
except Exception:
sender_email = ""
sender_name = safe(msg, "senderName", "sender_name", default="")
sender_smtp = safe(msg, "senderSmtpAddress", "sent_representing_smtp_address", default="")
# ── Prijemci ──────────────────────────────────────────────────
recipients = extract_recipients(msg)
try:
to_raw = msg.to or ""
except Exception:
to_raw = ""
try:
cc_raw = msg.cc or ""
except Exception:
cc_raw = ""
try:
bcc_raw = getattr(msg, "bcc", None) or ""
except Exception:
bcc_raw = ""
display_to = safe(msg, "displayTo", "display_to", default="")
display_cc = safe(msg, "displayCc", "display_cc", default="")
# ── Casy ──────────────────────────────────────────────────────
try:
received_at = parse_date(msg.date)
except Exception:
received_at = None
sent_at = None
for attr in ("clientSubmitTime", "client_submit_time", "sentOn"):
v = safe(msg, attr)
if v:
sent_at = parse_date(v)
break
# ── MAPI vlastnosti ───────────────────────────────────────────
importance = 1
try:
v = msg.importance
if v is not None:
importance = int(v)
except Exception:
pass
sensitivity = 0
try:
v = getattr(msg, "sensitivity", None)
if v is not None:
sensitivity = int(v)
except Exception:
pass
flag_status = 0
try:
v = safe(msg, "flagStatus", "flag_status")
if v is not None:
flag_status = int(v)
except Exception:
pass
conversation_topic = safe(msg, "conversationTopic", "conversation_topic", default="")
conversation_index = ""
try:
ci = safe(msg, "conversationIndex", "conversation_index")
if isinstance(ci, bytes):
conversation_index = base64.b64encode(ci).decode()
elif ci:
conversation_index = str(ci)
except Exception:
pass
in_reply_to = safe(msg, "inReplyTo", "in_reply_to", default="")
internet_refs = []
try:
refs = safe(msg, "internetReferences", "internet_references")
if isinstance(refs, list):
internet_refs = refs
elif isinstance(refs, str) and refs:
internet_refs = [r.strip() for r in refs.split() if r.strip()]
except Exception:
pass
categories = []
try:
cats = safe(msg, "categories")
if isinstance(cats, list):
categories = [str(c) for c in cats if c]
elif isinstance(cats, str) and cats:
categories = [c.strip() for c in re.split(r"[;,]", cats) if c.strip()]
except Exception:
pass
read_receipt = bool(safe(msg, "readReceiptRequested", "read_receipt_requested", default=False))
delivery_receipt = bool(safe(msg, "deliveryReceiptRequested", "delivery_receipt_requested", default=False))
# ── Internet headers ──────────────────────────────────────────
headers = extract_headers(msg)
if not in_reply_to:
in_reply_to = headers.get("in_reply_to", "")
if not internet_refs:
refs_str = headers.get("references", "")
if isinstance(refs_str, str) and refs_str:
internet_refs = [r.strip() for r in refs_str.split() if r.strip()]
# ── Prilohy ───────────────────────────────────────────────────
attachments = extract_attachments(msg)
# ── Raw MAPI ──────────────────────────────────────────────────
mapi_raw = extract_mapi_props(msg)
msg.close()
# ── Raw-OLE fallback pro degradovana textova pole ─────────────
parse_degraded = parse_mode != "normal"
forced = parse_mode != "normal"
if (forced or _degraded(subject) or _degraded(body_text)
or _degraded(sender_email) or (body_html and "" in body_html)):
raw = _raw_mapi_strings(msg_path)
if raw["subject"] and (forced or _degraded(subject)):
subject = raw["subject"]
if raw["normalized_subject"] and (forced or _degraded(normalized_subject)):
normalized_subject = raw["normalized_subject"]
if raw["body_text"] and (forced or _degraded(body_text)):
body_text = raw["body_text"]
if raw["body_html"] and (forced or not body_html or "" in body_html):
bh = raw["body_html"]
body_html = bh if len(bh) <= 2 * 1024 * 1024 else bh[:2 * 1024 * 1024]
if (raw["sender_smtp"] or raw["sender_email"]) and (forced or _degraded(sender_email)):
sender_email = raw["sender_smtp"] or raw["sender_email"]
if raw["sender_name"] and (forced or _degraded(sender_name)):
sender_name = raw["sender_name"]
if raw["sender_smtp"] and not sender_smtp:
sender_smtp = raw["sender_smtp"]
# ── Dokument ──────────────────────────────────────────────────
return {
"_id": mid,
"filename": msg_path.name,
"subject": subject,
"normalized_subject": normalized_subject,
"importance": importance,
"sensitivity": sensitivity,
"flag_status": flag_status,
"read_receipt_requested": read_receipt,
"delivery_receipt_requested": delivery_receipt,
"has_attachments": len(attachments) > 0,
"attachment_count": len(attachments),
"message_size_bytes": msg_path.stat().st_size,
"conversation_topic": conversation_topic,
"conversation_index": conversation_index,
"in_reply_to": in_reply_to,
"internet_references": internet_refs,
"categories": categories,
"received_at": received_at,
"sent_at": sent_at,
"sender": {
"email": sender_email,
"name": sender_name,
"smtp": sender_smtp,
},
"to": to_raw,
"cc": cc_raw,
"bcc": bcc_raw,
"display_to": display_to,
"display_cc": display_cc,
"recipients": recipients,
"body_text": body_text,
"body_html": body_html,
"attachments": attachments,
"headers": headers,
"mapi": mapi_raw,
"parse_mode": parse_mode,
"parse_degraded": parse_degraded,
"parsed_at": datetime.now(timezone.utc).replace(tzinfo=None),
}
except Exception as e:
logging.error("extract_message failed [%s]: %s", msg_path.name, e)
return None
def create_indexes(col):
print(" Vytvarim indexy...")
col.create_index([("received_at", ASCENDING)])
col.create_index([("sent_at", ASCENDING)])
col.create_index([("sender.email", ASCENDING)])
col.create_index([("filename", ASCENDING)], unique=True, sparse=True)
col.create_index([("conversation_topic", ASCENDING)])
col.create_index([("has_attachments", ASCENDING)])
col.create_index([("categories", ASCENDING)])
col.create_index([("importance", ASCENDING)])
col.create_index([("flag_status", ASCENDING)])
col.create_index([
("subject", TEXT),
("body_text", TEXT),
("to", TEXT),
("cc", TEXT),
], name="text_search", default_language="none")
print(" Indexy hotovy.")
def run_parse(col, state_col, args, now) -> dict:
"""FAZE 1: inkrementalni parse .msg -> emaily. Vraci statistiku."""
stats = {"mode": None, "total_files": 0, "candidates": 0, "ok": 0, "err": 0}
print("\n=== FAZE 1: PARSE (.msg -> emaily) ===")
all_files = sorted(MSGS_DIR.glob("*.msg"))
stats["total_files"] = len(all_files)
if not all_files:
print(" Zadne .msg ve zdroji -> preskakuji.")
return stats
max_mtime = max(f.stat().st_mtime for f in all_files)
ps = state_col.find_one({"_id": "parse_state"}) or {}
last_mtime = ps.get("last_parse_mtime")
if args.full:
candidates = all_files
mode = "full"
elif last_mtime is None:
print(" Prvni beh (zadny mtime watermark) -> seed dle filename v Mongu...")
existing = set(col.distinct("filename"))
candidates = [f for f in all_files if f.name not in existing]
mode = "seed"
print(f" V Mongu jiz {len(existing)} filename; nove k naparsovani: {len(candidates)}")
else:
candidates = [f for f in all_files if f.stat().st_mtime > last_mtime]
mode = "incremental"
if args.limit:
candidates = candidates[:args.limit]
stats["mode"] = mode
stats["candidates"] = len(candidates)
wm_str = datetime.fromtimestamp(last_mtime).strftime("%Y-%m-%d %H:%M:%S") if last_mtime else "(zadny)"
print(f" Rezim: {mode} | .msg celkem {len(all_files)} | watermark {wm_str} | ke zpracovani {len(candidates)}")
if not candidates:
print(" Nic noveho k parsovani.")
# I tak posun watermark na nejnovejsi soubor (krome --full a dry-run)
if not args.dry_run and mode != "full":
state_col.update_one({"_id": "parse_state"},
{"$set": {"last_parse_mtime": max_mtime, "last_parse_at": now}}, upsert=True)
return stats
if args.dry_run:
print(f" DRY-RUN: naparsoval bych {len(candidates)} souboru (Mongo se nemeni). Ukazka:")
for f in candidates[:10]:
mt = datetime.fromtimestamp(f.stat().st_mtime).strftime("%Y-%m-%d %H:%M:%S")
print(f" + {f.name} (mtime {mt})")
if len(candidates) > 10:
print(f" ... a dalsich {len(candidates) - 10}")
return stats
batch = []
verbose = len(candidates) <= 30
def flush():
if not batch:
return
try:
col.bulk_write(batch, ordered=False)
except Exception as e:
logging.error("bulk_write spadl (%s) -- prepinam na per-dokument", e)
print(f" CHYBA bulk_write: {e} -- zkousim per-dokument")
for op in batch:
try:
col.bulk_write([op], ordered=False)
except Exception as e2:
try:
bad_id = getattr(op, "_filter", {}).get("_id", "?")
except Exception:
bad_id = "?"
logging.error("per-dokument selhal [_id=%s]: %s", bad_id, e2)
print(f" ZAHOZEN _id={bad_id}: {e2}")
stats["ok"] -= 1
stats["err"] += 1
batch.clear()
for i, msg_path in enumerate(candidates, 1):
doc = extract_message(msg_path)
if doc is None:
stats["err"] += 1
else:
batch.append(UpdateOne({"_id": doc["_id"]}, {"$set": doc}, upsert=True))
stats["ok"] += 1
if len(batch) >= BATCH_SIZE:
flush()
if verbose:
status = "ERR " if doc is None else "OK "
subj = (doc.get("subject") or "")[:60] if doc else "?"
print(f" {i:>5}/{len(candidates)} {status} {subj}")
elif i % 500 == 0:
print(f" prubeh {i}/{len(candidates)} ok={stats['ok']} err={stats['err']}")
flush()
# Indexy jen pri full/seed/--reindex (v inkrementalnim behu uz existuji)
if mode in ("full", "seed") or args.reindex:
create_indexes(col)
# Posun watermark na nejnovejsi soubor
state_col.update_one({"_id": "parse_state"},
{"$set": {"last_parse_mtime": max_mtime, "last_parse_at": now,
"last_parsed_count": stats["ok"], "last_parse_mode": mode}},
upsert=True)
print(f" PARSE hotovo: ok={stats['ok']} err={stats['err']} "
f"watermark={datetime.fromtimestamp(max_mtime):%Y-%m-%d %H:%M:%S}")
return stats
# ══════════════════════════════════════════════════════════════════════════════
# FAZE 2 — SYNC (SQLite -> Mongo jnj_messages + emaily cesta)
# [drive sync_jnj_state_v1.0.py]
# ══════════════════════════════════════════════════════════════════════════════
def norm_mid(s: str) -> str:
return (s or "").strip().strip("<>").strip()
def coalesce_path(jnjf, fld) -> str:
return jnjf if (jnjf and jnjf.strip()) else (fld or "")
def newest_db():
cands = glob.glob(os.path.join(DB_DIR, "jnjemails_*.db")) or glob.glob(os.path.join(DB_DIR, "*.db"))
return max(cands, key=os.path.getmtime) if cands else None
def run_sync(db, args, now) -> dict:
"""FAZE 2: SQLite -> jnj_messages (zrcadlo) + emaily (cesta/stav)."""
stats = {"total": 0, "matched": 0, "skipped": False}
print("\n=== FAZE 2: SYNC (SQLite -> jnj_messages + emaily cesta) ===")
emails = db[EMAILS_COL]
state_col = db[STATE_COL]
db_path = newest_db()
if not db_path:
print(f" Zadna .db v {DB_DIR} -> preskakuji.")
stats["skipped"] = True
return stats
db_name = os.path.basename(db_path)
print(f" SQLite: {db_name}")
st = state_col.find_one({"_id": "watermark"}) or {}
# ── Zkratka: tuto DB uz jsme zpracovali? (jen inkrementalni rezim) ─────
if not args.full and not args.force and st.get("last_db") == db_name:
print(f" DB {db_name} uz byla zpracovana (last_db) -> nic na praci.")
stats["skipped"] = True
return stats
wm = None if args.full else st.get("last_updated_at")
print(f" Watermark: {wm or '(zadny -> vse)'}")
# ── SQLite (read-only) ────────────────────────────────────────────────
con = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
con.row_factory = sqlite3.Row
available = {row[1] for row in con.execute("PRAGMA table_info(messages)")}
sel_cols = [c for c in ROW_COLS if c in available]
missing = [c for c in ROW_COLS if c not in available]
if missing:
print(f" (DB nema sloupce: {', '.join(missing)} -> default None/0)")
has_updated = "updated_at" in available
q = f"SELECT {', '.join(sel_cols)} FROM messages"
params = ()
if wm and has_updated:
q += " WHERE updated_at > ?"
params = (wm,)
elif wm and not has_updated:
print(" (DB nema updated_at -> watermark ignorovan, beru vse)")
wm = None
rows = [dict(row) for row in con.execute(q, params).fetchall()]
con.close()
if args.limit:
rows = rows[:args.limit]
total = len(rows)
stats["total"] = total
print(f" Radku ke zpracovani: {total}")
if total == 0:
print(" Neni co synchronizovat (zadne nove radky).")
if not args.dry_run:
state_col.update_one({"_id": "watermark"},
{"$set": {"last_db": db_name, "synced_at": now}}, upsert=True)
return stats
# ── Indexy z Monga ────────────────────────────────────────────────────
print(" Nacitam _id + filename + jnj_folder z Mongo...")
ids_exact = set()
ids_norm = {}
fnames = {}
has_path = set()
for d in emails.find({}, {"_id": 1, "filename": 1, "jnj_folder": 1}):
_id = d["_id"]
ids_exact.add(_id)
ids_norm.setdefault(norm_mid(_id), _id)
fn = d.get("filename")
if fn:
fnames[fn] = _id
if d.get("jnj_folder"):
has_path.add(_id)
print(f" Mongo dokumentu v {EMAILS_COL}: {len(ids_exact)} (z toho s jnj_folder: {len(has_path)})")
# ── Plan ──────────────────────────────────────────────────────────────
m_exact = m_norm = m_fname = unmatched = 0
examples = []
mirror_ops = []
emaily_ops = []
max_wm = wm or ""
for r in rows:
mid = r.get("message_id")
uv = r.get("updated_at")
if uv and uv > max_wm:
max_wm = uv
# Krok A — zrcadlo (vzdy)
doc = {k: r.get(k) for k in ROW_COLS}
doc["mirrored_at"] = now
mirror_ops.append(UpdateOne({"_id": mid}, {"$set": doc}, upsert=True))
# Krok B — match do emaily
target = None
if mid in ids_exact:
target = mid; m_exact += 1
elif norm_mid(mid) in ids_norm:
target = ids_norm[norm_mid(mid)]; m_norm += 1
else:
eid = r.get("entry_id")
fn = (eid[-20:] + ".msg") if eid else None
if fn and fn in fnames:
target = fnames[fn]; m_fname += 1
else:
unmatched += 1
if len(examples) < 6:
examples.append(mid)
if target is not None:
setdoc = {
"jnj_folder": coalesce_path(r.get("jnj_folder"), r.get("folder")),
"jnj_is_read": bool(r.get("is_read")),
"jnj_not_in_mailbox": bool(r.get("not_in_mailbox_anymore")),
"jnj_left_mailbox_at": r.get("left_mailbox_at"),
"jnj_folder_synced_at": now,
}
emaily_ops.append(UpdateOne({"_id": target}, {"$set": setdoc}))
matched = m_exact + m_norm + m_fname
stats["matched"] = matched
print(" --- PLAN ---")
print(f" Zrcadlo -> {MIRROR_COL}: {len(mirror_ops)} upsert")
print(f" Emaily match exact (_id): {m_exact}")
print(f" Emaily match norm (<>): {m_norm}")
print(f" Emaily match filename: {m_fname}")
print(f" Emaily match CELKEM: {matched}/{total} ({100.0*matched/total:.1f}%)")
print(f" NEnamatchovano: {unmatched}")
if examples:
print(" Priklady nenamatchovanych message_id:")
for e in examples:
print(f" {str(e)[:72]}")
# ── Zapis ─────────────────────────────────────────────────────────────
if args.dry_run:
print(" DRY-RUN: Mongo se NEMENI.")
return stats
print(" Zapisuji...")
if mirror_ops:
db[MIRROR_COL].bulk_write(mirror_ops, ordered=False)
if emaily_ops:
emails.bulk_write(emaily_ops, ordered=False)
state_col.update_one(
{"_id": "watermark"},
{"$set": {"last_updated_at": max_wm, "synced_at": now, "last_db": db_name,
"last_total": total, "last_matched": matched}},
upsert=True,
)
print(f" SYNC hotovo: zrcadlo={len(mirror_ops)} emaily={len(emaily_ops)} watermark={max_wm}")
return stats
# ══════════════════════════════════════════════════════════════════════════════
# FAZE 3 — ENRICH (Mongo -> PG fulltext, deleguje na sdileny 5_enrich)
# [drive jnj_emails_to_fulltext_v1.0.py]
# ══════════════════════════════════════════════════════════════════════════════
def newest_enrich():
"""Najde nejnovejsi /scripts/5_enrich_fulltext_emails_v*.py podle verze vX.Y."""
cands = glob.glob(ENRICH_GLOB)
if not cands:
return None
def ver(p):
m = re.search(r"_v(\d+)\.(\d+)", os.path.basename(p))
return (int(m.group(1)), int(m.group(2))) if m else (0, 0)
return max(cands, key=ver)
def run_enrich(args, new_docs, force) -> dict:
"""FAZE 3: doindexuje JNJ schranku do PG fulltextu pres sdileny enrich.
Spousti se jen kdyz parse pridal nove dokumenty (nebo force/enrich-only)."""
stats = {"ran": False, "rc": None, "skipped_reason": None}
print("\n=== FAZE 3: ENRICH (PG fulltext) ===")
if args.no_enrich:
stats["skipped_reason"] = "--no-enrich"
print(" Preskoceno [--no-enrich].")
return stats
if args.dry_run:
enrich = newest_enrich()
stats["skipped_reason"] = "dry-run"
print(f" DRY-RUN: zavolal bych {enrich or '(enrich nenalezen!)'} --mailbox {EMAILS_COL}"
f" (nove doc z parse: {new_docs}, force={force})")
return stats
if not force and new_docs <= 0:
stats["skipped_reason"] = "zadne nove doc"
print(" Zadne nove maily z parse -> enrich preskocen "
"(JNJ stejne enrichuje pipeline v 6:00/18:00; --enrich-always vynuti).")
return stats
enrich = newest_enrich()
if not enrich:
stats["skipped_reason"] = "enrich skript nenalezen"
print(f" CHYBA: zadny enrich skript ({ENRICH_GLOB}) -> preskakuji.")
return stats
cmd = [sys.executable, enrich, "--mailbox", EMAILS_COL]
print(f" Spoustim: {' '.join(cmd)}")
sys.stdout.flush()
r = subprocess.run(cmd)
stats["ran"] = True
stats["rc"] = r.returncode
print(f" ENRICH hotovo: exit code {r.returncode}")
return stats
# ══════════════════════════════════════════════════════════════════════════════
# MAIN
# ══════════════════════════════════════════════════════════════════════════════
def main():
ap = argparse.ArgumentParser(description=f"jnj_tower_ingest v{SCRIPT_VERSION}")
ap.add_argument("--dry-run", action="store_true", help="nic nezapise, jen plan")
ap.add_argument("--full", action="store_true",
help="parse: reparsuj vse; sync: ignoruj watermark")
ap.add_argument("--limit", type=int, default=0, help="max N souboru/radku (test)")
ap.add_argument("--reindex", action="store_true", help="vynut indexy po parse")
ap.add_argument("--force", action="store_true",
help="sync: ignoruj last_db zkratku")
ap.add_argument("--parse-only", action="store_true", help="jen faze PARSE")
ap.add_argument("--sync-only", action="store_true", help="jen faze SYNC")
ap.add_argument("--enrich-only", action="store_true", help="jen faze ENRICH")
ap.add_argument("--no-enrich", action="store_true", help="preskoc fazi ENRICH")
ap.add_argument("--enrich-always", action="store_true",
help="spust enrich i bez novych dokumentu z parse")
args = ap.parse_args()
now = datetime.now(timezone.utc).replace(tzinfo=None)
print(f"=== jnj_tower_ingest v{SCRIPT_VERSION} {'[DRY-RUN]' if args.dry_run else ''} ===")
print(f"Start: {datetime.now():%Y-%m-%d %H:%M:%S}")
print(f"MongoDB: {MONGO_URI} -> {MONGO_DB}")
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
try:
client.admin.command("ping")
print(" MongoDB OK")
except Exception as e:
print(f"CHYBA: MongoDB nedostupna -- {e}")
sys.exit(1)
db = client[MONGO_DB]
col = db[EMAILS_COL]
state_col = db[STATE_COL]
p_stats = s_stats = e_stats = None
if not args.sync_only and not args.enrich_only:
p_stats = run_parse(col, state_col, args, now)
if not args.parse_only and not args.enrich_only:
s_stats = run_sync(db, args, now)
if not args.parse_only and not args.sync_only:
new_docs = p_stats["ok"] if p_stats else 0
force = args.enrich_only or args.enrich_always or args.full
e_stats = run_enrich(args, new_docs, force)
# ── Souhrn ────────────────────────────────────────────────────────────
print("\n=== SOUHRN ===")
if p_stats is not None:
print(f" PARSE: rezim={p_stats['mode']} kandidatu={p_stats['candidates']} "
f"ok={p_stats['ok']} err={p_stats['err']}")
if s_stats is not None:
if s_stats.get("skipped"):
print(" SYNC: preskoceno (zadna nova DB / uz zpracovana)")
else:
print(f" SYNC: radku={s_stats['total']} match={s_stats['matched']}")
if e_stats is not None:
if e_stats.get("ran"):
print(f" ENRICH: spusten, exit code {e_stats['rc']}")
else:
print(f" ENRICH: preskoceno ({e_stats.get('skipped_reason')})")
print(f"Konec: {datetime.now():%Y-%m-%d %H:%M:%S}")
client.close()
if __name__ == "__main__":
main()