Files
janssen/Feasibility/analyze_sent_suspects_v1.0.py
2026-06-17 15:05:10 +02:00

197 lines
6.8 KiB
Python

# -*- coding: utf-8 -*-
# =============================================================================
# Nazev: analyze_sent_suspects_v1.0.py
# Verze: 1.0
# Datum: 2026-06-16
# Popis: LOKALNI (Z230) analyzator .msg souboru prenesenych z JNJ (vystup
# jnj_scan_failed_sent). Pres olefile precte u kazdeho .msg klicove
# MAPI vlastnosti a klasifikuje, zda jde o NEODESLANY e-mail:
# FAIL_BODY = telo/report obsahuje "could not be sent"/"SendAsDenied"
# SENDAS_BUZ = send-account / sentrep / sender obsahuje "buzalka.cz"
# NO_MSGID = chybi Internet Message-ID (0x1035)
# Vytahne prijemce (externi = lekar), subjekt, send-account, Message-ID.
# Vystup: prehled do konzole + timestampovany .xlsx.
# Pouziti: python analyze_sent_suspects_v1.0.py [SLOZKA_S_MSG]
# (default INPUT_DIR nize). Vyzaduje olefile + openpyxl.
# =============================================================================
import os
import re
import sys
import glob
import datetime
import olefile
import openpyxl
INPUT_DIR = r"U:\Dropbox\!!!Days\Downloads Z230\sent_suspects"
OUT_DIR = r"U:\Dropbox\!!!Days\Downloads Z230"
FAIL_SIGNS = [
"could not be sent", "sendasdenied",
"permission to send the message on behalf",
"transportsend operation has failed", "mapiexceptionsendasdenied",
]
INTERNAL = ("its.jnj.com",) # interni = ne-lekar (vc. cc Kocourkova/Bartosova)
def rd(o, tag):
"""Precti string stream __substg1.0_<tag> (zkousi 001F unicode i 001E ansi)."""
for t in (tag, tag[:-1] + "F", tag[:-1] + "E"):
name = "__substg1.0_" + t
if o.exists(name):
b = o.openstream(name).read()
if t.endswith("001F"):
try:
return b.decode("utf-16-le")
except Exception:
pass
for enc in ("cp1250", "latin-1", "utf-8"):
try:
return b.decode(enc)
except Exception:
pass
return ""
def read_body(o):
txt = rd(o, "1000001F") # PR_BODY
if not txt:
txt = rd(o, "1001001F") # ReportText
# PR_HTML (binary) jako fallback
if not txt and o.exists("__substg1.0_10130102"):
try:
txt = o.openstream("__substg1.0_10130102").read().decode("latin-1", "ignore")
except Exception:
pass
return txt or ""
def recipients_smtp(o):
"""Posbira SMTP vsech prijemcu z __recip_version1.0_#xxxx storages."""
out = []
seen = set()
for entry in o.listdir():
# entry je list segmentu cesty; zajima nas prvni segment recip storage
if entry and entry[0].startswith("__recip_version1.0_#") and len(entry) == 2:
top = entry[0]
if top in seen:
continue
seen.add(top)
smtp = ""
for tag in ("39FE001F", "39FE001E", "3003001F", "3003001E", "0C1F001F"):
nm = top + "/__substg1.0_" + tag
if o.exists(nm):
b = o.openstream(nm).read()
try:
s = b.decode("utf-16-le") if tag.endswith("1F") else b.decode("cp1250")
except Exception:
s = b.decode("latin-1", "ignore")
s = s.strip()
if "@" in s:
smtp = s
break
if smtp:
out.append(smtp)
return out
def analyze_file(path):
o = olefile.OleFileIO(path)
try:
subject = rd(o, "0037001F")
msgid = rd(o, "1035001F")
sendacct = rd(o, "0E28001F")
sentrep = rd(o, "0065001F")
sender = rd(o, "0C1F001F")
body = read_body(o)
recs = recipients_smtp(o)
finally:
o.close()
low = body.lower()
flags = []
if any(s in low for s in FAIL_SIGNS):
flags.append("FAIL_BODY")
joined = " ".join([sendacct, sentrep, sender]).lower()
if "buzalka.cz" in joined:
flags.append("SENDAS_BUZ")
if not msgid:
flags.append("NO_MSGID")
# prijemce-lekar = externi (ne its.jnj.com)
ext = [r for r in recs if not any(d in r.lower() for d in INTERNAL)]
recipient = ext[0] if ext else (recs[0] if recs else "")
# datum z nazvu souboru (STRONG_YYYY-MM-DD_... / weak_YYYY-MM-DD_...)
m = re.search(r"(\d{4}-\d{2}-\d{2})", os.path.basename(path))
date = m.group(1) if m else ""
return {
"file": os.path.basename(path),
"date": date,
"recipient": recipient,
"subject": subject.strip(),
"msgid": msgid.strip(),
"send_account": sendacct.strip(),
"sentrep": sentrep.strip(),
"flags": "+".join(flags),
"failed": "ANO" if ("FAIL_BODY" in flags or "SENDAS_BUZ" in flags) else "?",
}
def main():
indir = sys.argv[1] if len(sys.argv) > 1 else INPUT_DIR
files = sorted(glob.glob(os.path.join(indir, "*.msg")))
if not files:
print("Zadne .msg v:", indir)
return
rows = []
for f in files:
try:
rows.append(analyze_file(f))
except Exception as e:
rows.append({"file": os.path.basename(f), "date": "", "recipient": "",
"subject": "<chyba cteni>", "msgid": "", "send_account": "",
"sentrep": "", "flags": "ERR:" + str(e), "failed": "?"})
# serad: nejdriv jiste selhane, pak dle data
rows.sort(key=lambda r: (r["failed"] != "ANO", r["date"]))
n_fail = sum(1 for r in rows if r["failed"] == "ANO")
n_sendas = sum(1 for r in rows if "SENDAS_BUZ" in r["flags"])
n_failbody = sum(1 for r in rows if "FAIL_BODY" in r["flags"])
n_nomid = sum(1 for r in rows if "NO_MSGID" in r["flags"])
print(f"Souboru: {len(rows)}")
print(f" jiste selhane (FAIL_BODY/SENDAS_BUZ): {n_fail}")
print(f" z toho SENDAS_BUZ (buzalka.cz): {n_sendas} | FAIL_BODY: {n_failbody}")
print(f" jen NO_MSGID (slabe): {n_nomid - n_fail if n_nomid>=n_fail else n_nomid}")
print("=" * 110)
print(f"{'datum':10} {'prijemce':32} {'fail':4} {'flags':22} subjekt")
print("-" * 110)
for r in rows:
print(f"{r['date']:10} {r['recipient'][:32]:32} {r['failed']:4} {r['flags']:22} {r['subject'][:40]}")
# xlsx
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "suspects"
cols = ["file", "date", "recipient", "subject", "msgid", "send_account", "sentrep", "flags", "failed"]
from openpyxl.cell.cell import ILLEGAL_CHARACTERS_RE
def clean(v):
return ILLEGAL_CHARACTERS_RE.sub("", str(v)) if v is not None else ""
ws.append(cols)
for r in rows:
ws.append([clean(r[c]) for c in cols])
stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
out = os.path.join(OUT_DIR, f"sent_suspects_analyza_{stamp}.xlsx")
wb.save(out)
print("\nXLSX:", out)
if __name__ == "__main__":
main()