197 lines
6.8 KiB
Python
197 lines
6.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
# =============================================================================
|
|
# Nazev: analyze_sent_suspects_v1.0.py
|
|
# Verze: 1.0
|
|
# Datum: 2026-06-16
|
|
# Popis: LOKALNI (Z230) analyzator .msg souboru prenesenych z JNJ (vystup
|
|
# jnj_scan_failed_sent). Pres olefile precte u kazdeho .msg klicove
|
|
# MAPI vlastnosti a klasifikuje, zda jde o NEODESLANY e-mail:
|
|
# FAIL_BODY = telo/report obsahuje "could not be sent"/"SendAsDenied"
|
|
# SENDAS_BUZ = send-account / sentrep / sender obsahuje "buzalka.cz"
|
|
# NO_MSGID = chybi Internet Message-ID (0x1035)
|
|
# Vytahne prijemce (externi = lekar), subjekt, send-account, Message-ID.
|
|
# Vystup: prehled do konzole + timestampovany .xlsx.
|
|
# Pouziti: python analyze_sent_suspects_v1.0.py [SLOZKA_S_MSG]
|
|
# (default INPUT_DIR nize). Vyzaduje olefile + openpyxl.
|
|
# =============================================================================
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import glob
|
|
import datetime
|
|
import olefile
|
|
import openpyxl
|
|
|
|
INPUT_DIR = r"U:\Dropbox\!!!Days\Downloads Z230\sent_suspects"
|
|
OUT_DIR = r"U:\Dropbox\!!!Days\Downloads Z230"
|
|
|
|
FAIL_SIGNS = [
|
|
"could not be sent", "sendasdenied",
|
|
"permission to send the message on behalf",
|
|
"transportsend operation has failed", "mapiexceptionsendasdenied",
|
|
]
|
|
INTERNAL = ("its.jnj.com",) # interni = ne-lekar (vc. cc Kocourkova/Bartosova)
|
|
|
|
|
|
def rd(o, tag):
|
|
"""Precti string stream __substg1.0_<tag> (zkousi 001F unicode i 001E ansi)."""
|
|
for t in (tag, tag[:-1] + "F", tag[:-1] + "E"):
|
|
name = "__substg1.0_" + t
|
|
if o.exists(name):
|
|
b = o.openstream(name).read()
|
|
if t.endswith("001F"):
|
|
try:
|
|
return b.decode("utf-16-le")
|
|
except Exception:
|
|
pass
|
|
for enc in ("cp1250", "latin-1", "utf-8"):
|
|
try:
|
|
return b.decode(enc)
|
|
except Exception:
|
|
pass
|
|
return ""
|
|
|
|
|
|
def read_body(o):
|
|
txt = rd(o, "1000001F") # PR_BODY
|
|
if not txt:
|
|
txt = rd(o, "1001001F") # ReportText
|
|
# PR_HTML (binary) jako fallback
|
|
if not txt and o.exists("__substg1.0_10130102"):
|
|
try:
|
|
txt = o.openstream("__substg1.0_10130102").read().decode("latin-1", "ignore")
|
|
except Exception:
|
|
pass
|
|
return txt or ""
|
|
|
|
|
|
def recipients_smtp(o):
|
|
"""Posbira SMTP vsech prijemcu z __recip_version1.0_#xxxx storages."""
|
|
out = []
|
|
seen = set()
|
|
for entry in o.listdir():
|
|
# entry je list segmentu cesty; zajima nas prvni segment recip storage
|
|
if entry and entry[0].startswith("__recip_version1.0_#") and len(entry) == 2:
|
|
top = entry[0]
|
|
if top in seen:
|
|
continue
|
|
seen.add(top)
|
|
smtp = ""
|
|
for tag in ("39FE001F", "39FE001E", "3003001F", "3003001E", "0C1F001F"):
|
|
nm = top + "/__substg1.0_" + tag
|
|
if o.exists(nm):
|
|
b = o.openstream(nm).read()
|
|
try:
|
|
s = b.decode("utf-16-le") if tag.endswith("1F") else b.decode("cp1250")
|
|
except Exception:
|
|
s = b.decode("latin-1", "ignore")
|
|
s = s.strip()
|
|
if "@" in s:
|
|
smtp = s
|
|
break
|
|
if smtp:
|
|
out.append(smtp)
|
|
return out
|
|
|
|
|
|
def analyze_file(path):
|
|
o = olefile.OleFileIO(path)
|
|
try:
|
|
subject = rd(o, "0037001F")
|
|
msgid = rd(o, "1035001F")
|
|
sendacct = rd(o, "0E28001F")
|
|
sentrep = rd(o, "0065001F")
|
|
sender = rd(o, "0C1F001F")
|
|
body = read_body(o)
|
|
recs = recipients_smtp(o)
|
|
finally:
|
|
o.close()
|
|
|
|
low = body.lower()
|
|
flags = []
|
|
if any(s in low for s in FAIL_SIGNS):
|
|
flags.append("FAIL_BODY")
|
|
joined = " ".join([sendacct, sentrep, sender]).lower()
|
|
if "buzalka.cz" in joined:
|
|
flags.append("SENDAS_BUZ")
|
|
if not msgid:
|
|
flags.append("NO_MSGID")
|
|
|
|
# prijemce-lekar = externi (ne its.jnj.com)
|
|
ext = [r for r in recs if not any(d in r.lower() for d in INTERNAL)]
|
|
recipient = ext[0] if ext else (recs[0] if recs else "")
|
|
|
|
# datum z nazvu souboru (STRONG_YYYY-MM-DD_... / weak_YYYY-MM-DD_...)
|
|
m = re.search(r"(\d{4}-\d{2}-\d{2})", os.path.basename(path))
|
|
date = m.group(1) if m else ""
|
|
|
|
return {
|
|
"file": os.path.basename(path),
|
|
"date": date,
|
|
"recipient": recipient,
|
|
"subject": subject.strip(),
|
|
"msgid": msgid.strip(),
|
|
"send_account": sendacct.strip(),
|
|
"sentrep": sentrep.strip(),
|
|
"flags": "+".join(flags),
|
|
"failed": "ANO" if ("FAIL_BODY" in flags or "SENDAS_BUZ" in flags) else "?",
|
|
}
|
|
|
|
|
|
def main():
|
|
indir = sys.argv[1] if len(sys.argv) > 1 else INPUT_DIR
|
|
files = sorted(glob.glob(os.path.join(indir, "*.msg")))
|
|
if not files:
|
|
print("Zadne .msg v:", indir)
|
|
return
|
|
|
|
rows = []
|
|
for f in files:
|
|
try:
|
|
rows.append(analyze_file(f))
|
|
except Exception as e:
|
|
rows.append({"file": os.path.basename(f), "date": "", "recipient": "",
|
|
"subject": "<chyba cteni>", "msgid": "", "send_account": "",
|
|
"sentrep": "", "flags": "ERR:" + str(e), "failed": "?"})
|
|
|
|
# serad: nejdriv jiste selhane, pak dle data
|
|
rows.sort(key=lambda r: (r["failed"] != "ANO", r["date"]))
|
|
|
|
n_fail = sum(1 for r in rows if r["failed"] == "ANO")
|
|
n_sendas = sum(1 for r in rows if "SENDAS_BUZ" in r["flags"])
|
|
n_failbody = sum(1 for r in rows if "FAIL_BODY" in r["flags"])
|
|
n_nomid = sum(1 for r in rows if "NO_MSGID" in r["flags"])
|
|
|
|
print(f"Souboru: {len(rows)}")
|
|
print(f" jiste selhane (FAIL_BODY/SENDAS_BUZ): {n_fail}")
|
|
print(f" z toho SENDAS_BUZ (buzalka.cz): {n_sendas} | FAIL_BODY: {n_failbody}")
|
|
print(f" jen NO_MSGID (slabe): {n_nomid - n_fail if n_nomid>=n_fail else n_nomid}")
|
|
print("=" * 110)
|
|
print(f"{'datum':10} {'prijemce':32} {'fail':4} {'flags':22} subjekt")
|
|
print("-" * 110)
|
|
for r in rows:
|
|
print(f"{r['date']:10} {r['recipient'][:32]:32} {r['failed']:4} {r['flags']:22} {r['subject'][:40]}")
|
|
|
|
# xlsx
|
|
wb = openpyxl.Workbook()
|
|
ws = wb.active
|
|
ws.title = "suspects"
|
|
cols = ["file", "date", "recipient", "subject", "msgid", "send_account", "sentrep", "flags", "failed"]
|
|
from openpyxl.cell.cell import ILLEGAL_CHARACTERS_RE
|
|
|
|
def clean(v):
|
|
return ILLEGAL_CHARACTERS_RE.sub("", str(v)) if v is not None else ""
|
|
|
|
ws.append(cols)
|
|
for r in rows:
|
|
ws.append([clean(r[c]) for c in cols])
|
|
stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
out = os.path.join(OUT_DIR, f"sent_suspects_analyza_{stamp}.xlsx")
|
|
wb.save(out)
|
|
print("\nXLSX:", out)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|