""" ============================================================================== Skript: unwrap_smime_v1.0.py Verze: 1.0 Datum: 2026-06-03 Autor: vladimir.buzalka Popis: Najde v Mongo emaily s prilohou smime.p7m (S/MIME signed-data), stahne binarni obsah prilohy z Microsoft Graph API, rozbali PKCS7 SignedData (CMS), extrahuje vnitrni MIME message, a ulozi do Mongo: - smime_unwrapped: True - smime_body_text : plain text vnitrniho tela - smime_body_html : HTML vnitrniho tela (kdyz je) - smime_subject : Subject vnitrni MIME hlavicky - smime_inner_attachments : [{filename, content_type, size_bytes}] Tyto pole pak pouzije enrich_fulltext_emails_v1.2 a doplni jejich obsah do PG fulltext indexu. Typicke S/MIME odesilatele: notifikace@mojedatovaschranka.cz (844 emailu) kontakt@mbank.cz (226) payments@comgate.cz, service@payu.com (~250) info.postsignum@cpost.cz Architekturalni poznamka: S/MIME priloha smime.p7m ma Content-Type application/pkcs7-mime s parametrem smime-type=signed-data. Vnitrni obsah je v PKCS7 ContentInfo -> SignedData -> encapContentInfo.eContent. To uz je primo MIME zprava (multipart nebo single body). Zavislosti (instalovat v kontejneru): pip install asn1crypto Spusteni: python unwrap_smime_v1.0.py # vsechny schranky (mimo SKIP_MAILBOXES) python unwrap_smime_v1.0.py --mailbox vladimir.buzalka@buzalka.cz python unwrap_smime_v1.0.py --limit 10 # test SKIP_MAILBOXES (hardcoded): vbuzalka@its.jnj.com — JNJ tenant, nemame Graph API pristup. Pri behu bez --mailbox se tise preskoci, s --mailbox skript skonci s exit kodem 2. ============================================================================== """ from __future__ import annotations import argparse import email import email.policy import logging import sys import time import traceback from datetime import datetime, timezone from pathlib import Path from typing import Optional import msal import requests from asn1crypto import cms from pymongo import MongoClient, UpdateOne if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8", errors="replace") # --- konfigurace ------------------------------------------------------------ GRAPH_TENANT_ID = "7d269944-37a4-43a1-8140-c7517dc426e9" GRAPH_CLIENT_ID = "4b222bfd-78c9-4239-a53f-43006b3ed07f" GRAPH_CLIENT_SECRET = "Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk" GRAPH_URL = "https://graph.microsoft.com/v1.0" MONGO_URI = "mongodb://192.168.1.76:27017" MONGO_DB = "emaily" SKIP_COLLECTIONS = {"attachments_index", "sync_state"} # Schranky kde NEMAME Graph API pristup — pri bezne behu se preskocia. SKIP_MAILBOXES = { "vbuzalka@its.jnj.com", # JNJ tenant — nemame Graph credentials } MAX_BODY_BYTES = 2 * 1024 * 1024 # 2 MB strop pro extrahovany text BATCH_SIZE = 25 LOG_FILE = Path(__file__).parent / "unwrap_smime_errors.log" logging.basicConfig( filename=str(LOG_FILE), level=logging.ERROR, format="%(asctime)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", encoding="utf-8", ) # --- Graph auth ------------------------------------------------------------- _token: Optional[str] = None def get_token() -> str: global _token app = msal.ConfidentialClientApplication( GRAPH_CLIENT_ID, authority=f"https://login.microsoftonline.com/{GRAPH_TENANT_ID}", client_credential=GRAPH_CLIENT_SECRET, ) res = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"]) if "access_token" not in res: raise RuntimeError(f"Graph auth failed: {res}") _token = res["access_token"] return _token def graph_get_raw(url: str) -> Optional[bytes]: """GET na Graph endpoint, vraci raw bytes (pro $value attachment endpoint).""" global _token if not _token: get_token() for attempt in range(3): try: r = requests.get(url, headers={"Authorization": f"Bearer {_token}"}, timeout=60) if r.status_code == 401: get_token(); continue if r.status_code == 404: return None if r.status_code == 429: wait = int(r.headers.get("Retry-After", "5")) time.sleep(wait); continue r.raise_for_status() return r.content except requests.RequestException: if attempt == 2: raise time.sleep(2) return None # --- PKCS7 / MIME unwrap ---------------------------------------------------- def extract_inner_mime(content_bytes: bytes) -> bytes: """Z S/MIME prilohy vytahne vnitrni MIME (signed content) jako bytes. Dva formaty se v Graph API vyskytuji: A) multipart/signed (detached signature) - bytes zacinaji 'Content-Type: multipart/signed'. Obsah je rovnou citelny v prvni MIME casti (druha cast je oddeleny PKCS7 podpis). B) application/pkcs7-mime (opaque, smime-type=signed-data) - vnitrni MIME je schovany uvnitr PKCS7 SignedData -> encapContentInfo.eContent. Vraci raw MIME bytes pro pripravu pro email.message_from_bytes. """ head = content_bytes[:300].lower() # A) multipart/signed (detached) - nejcastejsi pro maily z Graphu if b"content-type:" in head and b"multipart/signed" in head: try: outer = email.message_from_bytes(content_bytes, policy=email.policy.default) except Exception as e: raise RuntimeError(f"MIME parse failed: {e}") # iteruj parts - prvni non-signature je signed payload signed_payload = None if outer.is_multipart(): for part in outer.iter_parts(): ct = (part.get_content_type() or "").lower() if "pkcs7-signature" in ct or "x-pkcs7-signature" in ct: continue signed_payload = part break if signed_payload is None: raise RuntimeError("multipart/signed: no signed payload found") return signed_payload.as_bytes() # B) opaque PKCS7 SignedData - DER nebo base64 data = content_bytes try: ci = cms.ContentInfo.load(data) except Exception: try: import base64 stripped = b"".join(line for line in data.splitlines() if not line.startswith(b"-----")) data = base64.b64decode(stripped, validate=False) ci = cms.ContentInfo.load(data) except Exception as e: raise RuntimeError(f"PKCS7/MIME parse failed: {e}") if ci["content_type"].native != "signed_data": raise RuntimeError(f"Not signed-data, got {ci['content_type'].native}") sd = ci["content"] inner = sd["encap_content_info"]["content"] if inner is None: raise RuntimeError("encapContentInfo.content is null (detached without MIME wrapper)") return bytes(inner.native) if hasattr(inner, "native") else bytes(inner) def parse_inner_mime(mime_bytes: bytes) -> dict: """Z MIME bytes vytahne text, html a prilohy.""" msg = email.message_from_bytes(mime_bytes, policy=email.policy.default) text_parts: list[str] = [] html_parts: list[str] = [] inner_attachments: list[dict] = [] def walk(part): ctype = part.get_content_type() disp = (part.get_content_disposition() or "").lower() filename = part.get_filename() if part.is_multipart(): for sub in part.iter_parts(): walk(sub) return if disp == "attachment" or filename: try: payload = part.get_content() if isinstance(payload, str): payload_bytes = payload.encode("utf-8", errors="replace") elif isinstance(payload, bytes): payload_bytes = payload else: payload_bytes = b"" size = len(payload_bytes) except Exception: size = 0 inner_attachments.append({ "filename": filename or "(unnamed)", "content_type": ctype, "size_bytes": size, }) return if ctype == "text/plain": try: text_parts.append(part.get_content()) except Exception: try: text_parts.append(part.get_payload(decode=True).decode( part.get_content_charset() or "utf-8", errors="replace")) except Exception: pass elif ctype == "text/html": try: html_parts.append(part.get_content()) except Exception: try: html_parts.append(part.get_payload(decode=True).decode( part.get_content_charset() or "utf-8", errors="replace")) except Exception: pass walk(msg) body_text = "\n\n".join(t.strip() for t in text_parts if t and t.strip()) body_html = "\n".join(h for h in html_parts if h and h.strip()) if len(body_text) > MAX_BODY_BYTES: body_text = body_text[:MAX_BODY_BYTES] if len(body_html) > MAX_BODY_BYTES: body_html = body_html[:MAX_BODY_BYTES] return { "subject": str(msg.get("Subject") or "").strip(), "from": str(msg.get("From") or "").strip(), "to": str(msg.get("To") or "").strip(), "date": str(msg.get("Date") or "").strip(), "body_text": body_text or None, "body_html": body_html or None, "inner_attachments": inner_attachments, } # --- hlavni smycka ---------------------------------------------------------- SMIME_FILTER = { "$and": [ {"attachments.filename": {"$regex": "^smime\\.p7m$", "$options": "i"}}, {"smime_unwrapped": {"$ne": True}}, ] } def find_p7m_graph_att_id(doc: dict) -> Optional[str]: for att in doc.get("attachments") or []: if (att.get("filename") or "").lower() == "smime.p7m": return att.get("graph_att_id") return None def process_mailbox(col, mailbox: str, limit: Optional[int]) -> dict: total = col.count_documents(SMIME_FILTER) print(f"[{mailbox}] S/MIME k rozbaleni: {total}" + (f" (limit {limit})" if limit else "")) if total == 0: return {"mailbox": mailbox, "candidates": 0, "unwrapped": 0, "errors": 0, "no_att_id": 0, "missing": 0, "with_inner_att": 0, "inner_att_total": 0} cursor = col.find(SMIME_FILTER, {"_id": 1, "graph_id": 1, "attachments": 1}, no_cursor_timeout=True) if limit: cursor = cursor.limit(limit) n = unwrapped = err = no_att_id = missing = with_inner = inner_total = 0 bulk: list[UpdateOne] = [] try: for doc in cursor: n += 1 mid = doc["_id"] gid = doc.get("graph_id") att_id = find_p7m_graph_att_id(doc) if not gid or not att_id: no_att_id += 1 continue url = f"{GRAPH_URL}/users/{mailbox}/messages/{gid}/attachments/{att_id}/$value" try: p7m_bytes = graph_get_raw(url) except Exception as e: err += 1 logging.error("[%s] graph fetch %s: %s", mailbox, gid, e) bulk.append(UpdateOne({"_id": mid}, {"$set": { "smime_unwrapped": False, "smime_error": f"fetch: {type(e).__name__}: {e}"[:300], "smime_processed_at": datetime.now(timezone.utc).replace(tzinfo=None), }})) continue if p7m_bytes is None: missing += 1 bulk.append(UpdateOne({"_id": mid}, {"$set": { "smime_unwrapped": False, "smime_error": "attachment_404", "smime_processed_at": datetime.now(timezone.utc).replace(tzinfo=None), }})) continue try: inner_bytes = extract_inner_mime(p7m_bytes) parsed = parse_inner_mime(inner_bytes) except Exception as e: err += 1 logging.error("[%s] unwrap %s: %s", mailbox, mid, e) bulk.append(UpdateOne({"_id": mid}, {"$set": { "smime_unwrapped": False, "smime_error": f"unwrap: {type(e).__name__}: {e}"[:300], "smime_processed_at": datetime.now(timezone.utc).replace(tzinfo=None), }})) continue inner_atts = parsed["inner_attachments"] inner_total += len(inner_atts) if inner_atts: with_inner += 1 update = { "smime_unwrapped": True, "smime_processed_at": datetime.now(timezone.utc).replace(tzinfo=None), "smime_body_text": parsed["body_text"], "smime_body_html": parsed["body_html"], "smime_subject": parsed["subject"], "smime_from": parsed["from"], "smime_to": parsed["to"], "smime_date": parsed["date"], "smime_inner_attachments": inner_atts, "smime_error": None, } bulk.append(UpdateOne({"_id": mid}, {"$set": update})) unwrapped += 1 if len(bulk) >= BATCH_SIZE: col.bulk_write(bulk, ordered=False) bulk.clear() if n % 50 == 0 or n == 1: print(f" [{n:>5}/{total}] unwrapped={unwrapped} err={err} " f"no_att_id={no_att_id} missing={missing} " f"inner_atts_total={inner_total}", flush=True) finally: cursor.close() if bulk: col.bulk_write(bulk, ordered=False) print(f" [{n}/{total}] DONE unwrapped={unwrapped} err={err} " f"no_att_id={no_att_id} missing={missing} " f"with_inner_atts={with_inner} inner_atts_total={inner_total}") return {"mailbox": mailbox, "candidates": total, "unwrapped": unwrapped, "errors": err, "no_att_id": no_att_id, "missing": missing, "with_inner_att": with_inner, "inner_att_total": inner_total} def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--mailbox", help="Jedna konkretni schranka (default: vsechny)") ap.add_argument("--limit", type=int, help="Limit emailu na schranku (test)") args = ap.parse_args() t0 = time.time() print("Pripojuji se k MongoDB...") mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) mongo.admin.command("ping") db = mongo[MONGO_DB] print("Token Graph API...") get_token() print("OK\n") if args.mailbox: if args.mailbox in SKIP_MAILBOXES: print(f"CHYBA: {args.mailbox} je v SKIP_MAILBOXES — neni Graph pristup.") return 2 mailboxes = [args.mailbox] else: mailboxes = [] for c in db.list_collection_names(): if c in SKIP_COLLECTIONS: continue if c in SKIP_MAILBOXES: print(f" [skip] {c} — v SKIP_MAILBOXES (neni Graph pristup)") continue mailboxes.append(c) print(f"Schranky ({len(mailboxes)}): {mailboxes}\n") results = [] for mb in mailboxes: results.append(process_mailbox(db[mb], mb, limit=args.limit)) print() print("=== SHRNUTI ===") for r in results: print(f" {r['mailbox']}: candidates={r['candidates']} unwrapped={r['unwrapped']} " f"errors={r['errors']} no_att_id={r['no_att_id']} missing={r['missing']} " f"with_inner_atts={r['with_inner_att']} inner_atts_total={r['inner_att_total']}") print(f"\nCelkem trvalo: {time.time() - t0:.1f} s") total_errors = sum(r.get("errors", 0) for r in results) return 1 if total_errors > 0 else 0 if __name__ == "__main__": try: raise SystemExit(main()) except KeyboardInterrupt: print("\nPreruseno uzivatelem") except Exception: traceback.print_exc() sys.exit(1)