Files
janssen/Python-runner/4_unwrap_smime_v1.0.py
T
2026-06-05 21:21:30 +02:00

446 lines
16 KiB
Python

"""
==============================================================================
Skript: unwrap_smime_v1.0.py
Verze: 1.0
Datum: 2026-06-03
Autor: vladimir.buzalka
Popis:
Najde v Mongo emaily s prilohou smime.p7m (S/MIME signed-data),
stahne binarni obsah prilohy z Microsoft Graph API, rozbali PKCS7
SignedData (CMS), extrahuje vnitrni MIME message, a ulozi do Mongo:
- smime_unwrapped: True
- smime_body_text : plain text vnitrniho tela
- smime_body_html : HTML vnitrniho tela (kdyz je)
- smime_subject : Subject vnitrni MIME hlavicky
- smime_inner_attachments : [{filename, content_type, size_bytes}]
Tyto pole pak pouzije enrich_fulltext_emails_v1.2 a doplni jejich
obsah do PG fulltext indexu.
Typicke S/MIME odesilatele:
notifikace@mojedatovaschranka.cz (844 emailu)
kontakt@mbank.cz (226)
payments@comgate.cz, service@payu.com (~250)
info.postsignum@cpost.cz
Architekturalni poznamka:
S/MIME priloha smime.p7m ma Content-Type application/pkcs7-mime
s parametrem smime-type=signed-data. Vnitrni obsah je v PKCS7
ContentInfo -> SignedData -> encapContentInfo.eContent. To uz je
primo MIME zprava (multipart nebo single body).
Zavislosti (instalovat v kontejneru):
pip install asn1crypto
Spusteni:
python unwrap_smime_v1.0.py # vsechny schranky (mimo SKIP_MAILBOXES)
python unwrap_smime_v1.0.py --mailbox vladimir.buzalka@buzalka.cz
python unwrap_smime_v1.0.py --limit 10 # test
SKIP_MAILBOXES (hardcoded):
vbuzalka@its.jnj.com — JNJ tenant, nemame Graph API pristup. Pri behu
bez --mailbox se tise preskoci, s --mailbox skript
skonci s exit kodem 2.
==============================================================================
"""
from __future__ import annotations
import argparse
import email
import email.policy
import logging
import sys
import time
import traceback
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import msal
import requests
from asn1crypto import cms
from pymongo import MongoClient, UpdateOne
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
# --- konfigurace ------------------------------------------------------------
GRAPH_TENANT_ID = "7d269944-37a4-43a1-8140-c7517dc426e9"
GRAPH_CLIENT_ID = "4b222bfd-78c9-4239-a53f-43006b3ed07f"
GRAPH_CLIENT_SECRET = "Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk"
GRAPH_URL = "https://graph.microsoft.com/v1.0"
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "emaily"
SKIP_COLLECTIONS = {"attachments_index", "sync_state"}
# Schranky kde NEMAME Graph API pristup — pri bezne behu se preskocia.
SKIP_MAILBOXES = {
"vbuzalka@its.jnj.com", # JNJ tenant — nemame Graph credentials
}
MAX_BODY_BYTES = 2 * 1024 * 1024 # 2 MB strop pro extrahovany text
BATCH_SIZE = 25
LOG_FILE = Path(__file__).parent / "unwrap_smime_errors.log"
logging.basicConfig(
filename=str(LOG_FILE),
level=logging.ERROR,
format="%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
encoding="utf-8",
)
# --- Graph auth -------------------------------------------------------------
_token: Optional[str] = None
def get_token() -> str:
global _token
app = msal.ConfidentialClientApplication(
GRAPH_CLIENT_ID,
authority=f"https://login.microsoftonline.com/{GRAPH_TENANT_ID}",
client_credential=GRAPH_CLIENT_SECRET,
)
res = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])
if "access_token" not in res:
raise RuntimeError(f"Graph auth failed: {res}")
_token = res["access_token"]
return _token
def graph_get_raw(url: str) -> Optional[bytes]:
"""GET na Graph endpoint, vraci raw bytes (pro $value attachment endpoint)."""
global _token
if not _token:
get_token()
for attempt in range(3):
try:
r = requests.get(url, headers={"Authorization": f"Bearer {_token}"}, timeout=60)
if r.status_code == 401:
get_token(); continue
if r.status_code == 404:
return None
if r.status_code == 429:
wait = int(r.headers.get("Retry-After", "5"))
time.sleep(wait); continue
r.raise_for_status()
return r.content
except requests.RequestException:
if attempt == 2:
raise
time.sleep(2)
return None
# --- PKCS7 / MIME unwrap ----------------------------------------------------
def extract_inner_mime(content_bytes: bytes) -> bytes:
"""Z S/MIME prilohy vytahne vnitrni MIME (signed content) jako bytes.
Dva formaty se v Graph API vyskytuji:
A) multipart/signed (detached signature) - bytes zacinaji 'Content-Type: multipart/signed'.
Obsah je rovnou citelny v prvni MIME casti (druha cast je oddeleny PKCS7 podpis).
B) application/pkcs7-mime (opaque, smime-type=signed-data) - vnitrni MIME je
schovany uvnitr PKCS7 SignedData -> encapContentInfo.eContent.
Vraci raw MIME bytes pro pripravu pro email.message_from_bytes.
"""
head = content_bytes[:300].lower()
# A) multipart/signed (detached) - nejcastejsi pro maily z Graphu
if b"content-type:" in head and b"multipart/signed" in head:
try:
outer = email.message_from_bytes(content_bytes, policy=email.policy.default)
except Exception as e:
raise RuntimeError(f"MIME parse failed: {e}")
# iteruj parts - prvni non-signature je signed payload
signed_payload = None
if outer.is_multipart():
for part in outer.iter_parts():
ct = (part.get_content_type() or "").lower()
if "pkcs7-signature" in ct or "x-pkcs7-signature" in ct:
continue
signed_payload = part
break
if signed_payload is None:
raise RuntimeError("multipart/signed: no signed payload found")
return signed_payload.as_bytes()
# B) opaque PKCS7 SignedData - DER nebo base64
data = content_bytes
try:
ci = cms.ContentInfo.load(data)
except Exception:
try:
import base64
stripped = b"".join(line for line in data.splitlines()
if not line.startswith(b"-----"))
data = base64.b64decode(stripped, validate=False)
ci = cms.ContentInfo.load(data)
except Exception as e:
raise RuntimeError(f"PKCS7/MIME parse failed: {e}")
if ci["content_type"].native != "signed_data":
raise RuntimeError(f"Not signed-data, got {ci['content_type'].native}")
sd = ci["content"]
inner = sd["encap_content_info"]["content"]
if inner is None:
raise RuntimeError("encapContentInfo.content is null (detached without MIME wrapper)")
return bytes(inner.native) if hasattr(inner, "native") else bytes(inner)
def parse_inner_mime(mime_bytes: bytes) -> dict:
"""Z MIME bytes vytahne text, html a prilohy."""
msg = email.message_from_bytes(mime_bytes, policy=email.policy.default)
text_parts: list[str] = []
html_parts: list[str] = []
inner_attachments: list[dict] = []
def walk(part):
ctype = part.get_content_type()
disp = (part.get_content_disposition() or "").lower()
filename = part.get_filename()
if part.is_multipart():
for sub in part.iter_parts():
walk(sub)
return
if disp == "attachment" or filename:
try:
payload = part.get_content()
if isinstance(payload, str):
payload_bytes = payload.encode("utf-8", errors="replace")
elif isinstance(payload, bytes):
payload_bytes = payload
else:
payload_bytes = b""
size = len(payload_bytes)
except Exception:
size = 0
inner_attachments.append({
"filename": filename or "(unnamed)",
"content_type": ctype,
"size_bytes": size,
})
return
if ctype == "text/plain":
try:
text_parts.append(part.get_content())
except Exception:
try:
text_parts.append(part.get_payload(decode=True).decode(
part.get_content_charset() or "utf-8", errors="replace"))
except Exception:
pass
elif ctype == "text/html":
try:
html_parts.append(part.get_content())
except Exception:
try:
html_parts.append(part.get_payload(decode=True).decode(
part.get_content_charset() or "utf-8", errors="replace"))
except Exception:
pass
walk(msg)
body_text = "\n\n".join(t.strip() for t in text_parts if t and t.strip())
body_html = "\n".join(h for h in html_parts if h and h.strip())
if len(body_text) > MAX_BODY_BYTES:
body_text = body_text[:MAX_BODY_BYTES]
if len(body_html) > MAX_BODY_BYTES:
body_html = body_html[:MAX_BODY_BYTES]
return {
"subject": str(msg.get("Subject") or "").strip(),
"from": str(msg.get("From") or "").strip(),
"to": str(msg.get("To") or "").strip(),
"date": str(msg.get("Date") or "").strip(),
"body_text": body_text or None,
"body_html": body_html or None,
"inner_attachments": inner_attachments,
}
# --- hlavni smycka ----------------------------------------------------------
SMIME_FILTER = {
"$and": [
{"attachments.filename": {"$regex": "^smime\\.p7m$", "$options": "i"}},
{"smime_unwrapped": {"$ne": True}},
]
}
def find_p7m_graph_att_id(doc: dict) -> Optional[str]:
for att in doc.get("attachments") or []:
if (att.get("filename") or "").lower() == "smime.p7m":
return att.get("graph_att_id")
return None
def process_mailbox(col, mailbox: str, limit: Optional[int]) -> dict:
total = col.count_documents(SMIME_FILTER)
print(f"[{mailbox}] S/MIME k rozbaleni: {total}"
+ (f" (limit {limit})" if limit else ""))
if total == 0:
return {"mailbox": mailbox, "candidates": 0, "unwrapped": 0,
"errors": 0, "no_att_id": 0, "missing": 0,
"with_inner_att": 0, "inner_att_total": 0}
cursor = col.find(SMIME_FILTER, {"_id": 1, "graph_id": 1, "attachments": 1},
no_cursor_timeout=True)
if limit:
cursor = cursor.limit(limit)
n = unwrapped = err = no_att_id = missing = with_inner = inner_total = 0
bulk: list[UpdateOne] = []
try:
for doc in cursor:
n += 1
mid = doc["_id"]
gid = doc.get("graph_id")
att_id = find_p7m_graph_att_id(doc)
if not gid or not att_id:
no_att_id += 1
continue
url = f"{GRAPH_URL}/users/{mailbox}/messages/{gid}/attachments/{att_id}/$value"
try:
p7m_bytes = graph_get_raw(url)
except Exception as e:
err += 1
logging.error("[%s] graph fetch %s: %s", mailbox, gid, e)
bulk.append(UpdateOne({"_id": mid}, {"$set": {
"smime_unwrapped": False,
"smime_error": f"fetch: {type(e).__name__}: {e}"[:300],
"smime_processed_at": datetime.now(timezone.utc).replace(tzinfo=None),
}}))
continue
if p7m_bytes is None:
missing += 1
bulk.append(UpdateOne({"_id": mid}, {"$set": {
"smime_unwrapped": False,
"smime_error": "attachment_404",
"smime_processed_at": datetime.now(timezone.utc).replace(tzinfo=None),
}}))
continue
try:
inner_bytes = extract_inner_mime(p7m_bytes)
parsed = parse_inner_mime(inner_bytes)
except Exception as e:
err += 1
logging.error("[%s] unwrap %s: %s", mailbox, mid, e)
bulk.append(UpdateOne({"_id": mid}, {"$set": {
"smime_unwrapped": False,
"smime_error": f"unwrap: {type(e).__name__}: {e}"[:300],
"smime_processed_at": datetime.now(timezone.utc).replace(tzinfo=None),
}}))
continue
inner_atts = parsed["inner_attachments"]
inner_total += len(inner_atts)
if inner_atts:
with_inner += 1
update = {
"smime_unwrapped": True,
"smime_processed_at": datetime.now(timezone.utc).replace(tzinfo=None),
"smime_body_text": parsed["body_text"],
"smime_body_html": parsed["body_html"],
"smime_subject": parsed["subject"],
"smime_from": parsed["from"],
"smime_to": parsed["to"],
"smime_date": parsed["date"],
"smime_inner_attachments": inner_atts,
"smime_error": None,
}
bulk.append(UpdateOne({"_id": mid}, {"$set": update}))
unwrapped += 1
if len(bulk) >= BATCH_SIZE:
col.bulk_write(bulk, ordered=False)
bulk.clear()
if n % 50 == 0 or n == 1:
print(f" [{n:>5}/{total}] unwrapped={unwrapped} err={err} "
f"no_att_id={no_att_id} missing={missing} "
f"inner_atts_total={inner_total}", flush=True)
finally:
cursor.close()
if bulk:
col.bulk_write(bulk, ordered=False)
print(f" [{n}/{total}] DONE unwrapped={unwrapped} err={err} "
f"no_att_id={no_att_id} missing={missing} "
f"with_inner_atts={with_inner} inner_atts_total={inner_total}")
return {"mailbox": mailbox, "candidates": total, "unwrapped": unwrapped,
"errors": err, "no_att_id": no_att_id, "missing": missing,
"with_inner_att": with_inner, "inner_att_total": inner_total}
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--mailbox", help="Jedna konkretni schranka (default: vsechny)")
ap.add_argument("--limit", type=int, help="Limit emailu na schranku (test)")
args = ap.parse_args()
t0 = time.time()
print("Pripojuji se k MongoDB...")
mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
mongo.admin.command("ping")
db = mongo[MONGO_DB]
print("Token Graph API...")
get_token()
print("OK\n")
if args.mailbox:
if args.mailbox in SKIP_MAILBOXES:
print(f"CHYBA: {args.mailbox} je v SKIP_MAILBOXES — neni Graph pristup.")
return 2
mailboxes = [args.mailbox]
else:
mailboxes = []
for c in db.list_collection_names():
if c in SKIP_COLLECTIONS:
continue
if c in SKIP_MAILBOXES:
print(f" [skip] {c} — v SKIP_MAILBOXES (neni Graph pristup)")
continue
mailboxes.append(c)
print(f"Schranky ({len(mailboxes)}): {mailboxes}\n")
results = []
for mb in mailboxes:
results.append(process_mailbox(db[mb], mb, limit=args.limit))
print()
print("=== SHRNUTI ===")
for r in results:
print(f" {r['mailbox']}: candidates={r['candidates']} unwrapped={r['unwrapped']} "
f"errors={r['errors']} no_att_id={r['no_att_id']} missing={r['missing']} "
f"with_inner_atts={r['with_inner_att']} inner_atts_total={r['inner_att_total']}")
print(f"\nCelkem trvalo: {time.time() - t0:.1f} s")
total_errors = sum(r.get("errors", 0) for r in results)
return 1 if total_errors > 0 else 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except KeyboardInterrupt:
print("\nPreruseno uzivatelem")
except Exception:
traceback.print_exc()
sys.exit(1)