446 lines
16 KiB
Python
446 lines
16 KiB
Python
"""
|
|
==============================================================================
|
|
Skript: unwrap_smime_v1.0.py
|
|
Verze: 1.0
|
|
Datum: 2026-06-03
|
|
Autor: vladimir.buzalka
|
|
|
|
Popis:
|
|
Najde v Mongo emaily s prilohou smime.p7m (S/MIME signed-data),
|
|
stahne binarni obsah prilohy z Microsoft Graph API, rozbali PKCS7
|
|
SignedData (CMS), extrahuje vnitrni MIME message, a ulozi do Mongo:
|
|
- smime_unwrapped: True
|
|
- smime_body_text : plain text vnitrniho tela
|
|
- smime_body_html : HTML vnitrniho tela (kdyz je)
|
|
- smime_subject : Subject vnitrni MIME hlavicky
|
|
- smime_inner_attachments : [{filename, content_type, size_bytes}]
|
|
|
|
Tyto pole pak pouzije enrich_fulltext_emails_v1.2 a doplni jejich
|
|
obsah do PG fulltext indexu.
|
|
|
|
Typicke S/MIME odesilatele:
|
|
notifikace@mojedatovaschranka.cz (844 emailu)
|
|
kontakt@mbank.cz (226)
|
|
payments@comgate.cz, service@payu.com (~250)
|
|
info.postsignum@cpost.cz
|
|
|
|
Architekturalni poznamka:
|
|
S/MIME priloha smime.p7m ma Content-Type application/pkcs7-mime
|
|
s parametrem smime-type=signed-data. Vnitrni obsah je v PKCS7
|
|
ContentInfo -> SignedData -> encapContentInfo.eContent. To uz je
|
|
primo MIME zprava (multipart nebo single body).
|
|
|
|
Zavislosti (instalovat v kontejneru):
|
|
pip install asn1crypto
|
|
|
|
Spusteni:
|
|
python unwrap_smime_v1.0.py # vsechny schranky (mimo SKIP_MAILBOXES)
|
|
python unwrap_smime_v1.0.py --mailbox vladimir.buzalka@buzalka.cz
|
|
python unwrap_smime_v1.0.py --limit 10 # test
|
|
|
|
SKIP_MAILBOXES (hardcoded):
|
|
vbuzalka@its.jnj.com — JNJ tenant, nemame Graph API pristup. Pri behu
|
|
bez --mailbox se tise preskoci, s --mailbox skript
|
|
skonci s exit kodem 2.
|
|
==============================================================================
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import email
|
|
import email.policy
|
|
import logging
|
|
import sys
|
|
import time
|
|
import traceback
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import msal
|
|
import requests
|
|
from asn1crypto import cms
|
|
from pymongo import MongoClient, UpdateOne
|
|
|
|
if hasattr(sys.stdout, "reconfigure"):
|
|
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
|
|
|
|
# --- konfigurace ------------------------------------------------------------
|
|
GRAPH_TENANT_ID = "7d269944-37a4-43a1-8140-c7517dc426e9"
|
|
GRAPH_CLIENT_ID = "4b222bfd-78c9-4239-a53f-43006b3ed07f"
|
|
GRAPH_CLIENT_SECRET = "Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk"
|
|
GRAPH_URL = "https://graph.microsoft.com/v1.0"
|
|
|
|
MONGO_URI = "mongodb://192.168.1.76:27017"
|
|
MONGO_DB = "emaily"
|
|
SKIP_COLLECTIONS = {"attachments_index", "sync_state"}
|
|
|
|
# Schranky kde NEMAME Graph API pristup — pri bezne behu se preskocia.
|
|
SKIP_MAILBOXES = {
|
|
"vbuzalka@its.jnj.com", # JNJ tenant — nemame Graph credentials
|
|
}
|
|
|
|
MAX_BODY_BYTES = 2 * 1024 * 1024 # 2 MB strop pro extrahovany text
|
|
BATCH_SIZE = 25
|
|
LOG_FILE = Path(__file__).parent / "unwrap_smime_errors.log"
|
|
|
|
logging.basicConfig(
|
|
filename=str(LOG_FILE),
|
|
level=logging.ERROR,
|
|
format="%(asctime)s | %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
|
|
# --- Graph auth -------------------------------------------------------------
|
|
_token: Optional[str] = None
|
|
|
|
|
|
def get_token() -> str:
|
|
global _token
|
|
app = msal.ConfidentialClientApplication(
|
|
GRAPH_CLIENT_ID,
|
|
authority=f"https://login.microsoftonline.com/{GRAPH_TENANT_ID}",
|
|
client_credential=GRAPH_CLIENT_SECRET,
|
|
)
|
|
res = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])
|
|
if "access_token" not in res:
|
|
raise RuntimeError(f"Graph auth failed: {res}")
|
|
_token = res["access_token"]
|
|
return _token
|
|
|
|
|
|
def graph_get_raw(url: str) -> Optional[bytes]:
|
|
"""GET na Graph endpoint, vraci raw bytes (pro $value attachment endpoint)."""
|
|
global _token
|
|
if not _token:
|
|
get_token()
|
|
for attempt in range(3):
|
|
try:
|
|
r = requests.get(url, headers={"Authorization": f"Bearer {_token}"}, timeout=60)
|
|
if r.status_code == 401:
|
|
get_token(); continue
|
|
if r.status_code == 404:
|
|
return None
|
|
if r.status_code == 429:
|
|
wait = int(r.headers.get("Retry-After", "5"))
|
|
time.sleep(wait); continue
|
|
r.raise_for_status()
|
|
return r.content
|
|
except requests.RequestException:
|
|
if attempt == 2:
|
|
raise
|
|
time.sleep(2)
|
|
return None
|
|
|
|
|
|
# --- PKCS7 / MIME unwrap ----------------------------------------------------
|
|
|
|
def extract_inner_mime(content_bytes: bytes) -> bytes:
|
|
"""Z S/MIME prilohy vytahne vnitrni MIME (signed content) jako bytes.
|
|
|
|
Dva formaty se v Graph API vyskytuji:
|
|
A) multipart/signed (detached signature) - bytes zacinaji 'Content-Type: multipart/signed'.
|
|
Obsah je rovnou citelny v prvni MIME casti (druha cast je oddeleny PKCS7 podpis).
|
|
B) application/pkcs7-mime (opaque, smime-type=signed-data) - vnitrni MIME je
|
|
schovany uvnitr PKCS7 SignedData -> encapContentInfo.eContent.
|
|
|
|
Vraci raw MIME bytes pro pripravu pro email.message_from_bytes.
|
|
"""
|
|
head = content_bytes[:300].lower()
|
|
|
|
# A) multipart/signed (detached) - nejcastejsi pro maily z Graphu
|
|
if b"content-type:" in head and b"multipart/signed" in head:
|
|
try:
|
|
outer = email.message_from_bytes(content_bytes, policy=email.policy.default)
|
|
except Exception as e:
|
|
raise RuntimeError(f"MIME parse failed: {e}")
|
|
# iteruj parts - prvni non-signature je signed payload
|
|
signed_payload = None
|
|
if outer.is_multipart():
|
|
for part in outer.iter_parts():
|
|
ct = (part.get_content_type() or "").lower()
|
|
if "pkcs7-signature" in ct or "x-pkcs7-signature" in ct:
|
|
continue
|
|
signed_payload = part
|
|
break
|
|
if signed_payload is None:
|
|
raise RuntimeError("multipart/signed: no signed payload found")
|
|
return signed_payload.as_bytes()
|
|
|
|
# B) opaque PKCS7 SignedData - DER nebo base64
|
|
data = content_bytes
|
|
try:
|
|
ci = cms.ContentInfo.load(data)
|
|
except Exception:
|
|
try:
|
|
import base64
|
|
stripped = b"".join(line for line in data.splitlines()
|
|
if not line.startswith(b"-----"))
|
|
data = base64.b64decode(stripped, validate=False)
|
|
ci = cms.ContentInfo.load(data)
|
|
except Exception as e:
|
|
raise RuntimeError(f"PKCS7/MIME parse failed: {e}")
|
|
|
|
if ci["content_type"].native != "signed_data":
|
|
raise RuntimeError(f"Not signed-data, got {ci['content_type'].native}")
|
|
sd = ci["content"]
|
|
inner = sd["encap_content_info"]["content"]
|
|
if inner is None:
|
|
raise RuntimeError("encapContentInfo.content is null (detached without MIME wrapper)")
|
|
return bytes(inner.native) if hasattr(inner, "native") else bytes(inner)
|
|
|
|
|
|
def parse_inner_mime(mime_bytes: bytes) -> dict:
|
|
"""Z MIME bytes vytahne text, html a prilohy."""
|
|
msg = email.message_from_bytes(mime_bytes, policy=email.policy.default)
|
|
|
|
text_parts: list[str] = []
|
|
html_parts: list[str] = []
|
|
inner_attachments: list[dict] = []
|
|
|
|
def walk(part):
|
|
ctype = part.get_content_type()
|
|
disp = (part.get_content_disposition() or "").lower()
|
|
filename = part.get_filename()
|
|
|
|
if part.is_multipart():
|
|
for sub in part.iter_parts():
|
|
walk(sub)
|
|
return
|
|
|
|
if disp == "attachment" or filename:
|
|
try:
|
|
payload = part.get_content()
|
|
if isinstance(payload, str):
|
|
payload_bytes = payload.encode("utf-8", errors="replace")
|
|
elif isinstance(payload, bytes):
|
|
payload_bytes = payload
|
|
else:
|
|
payload_bytes = b""
|
|
size = len(payload_bytes)
|
|
except Exception:
|
|
size = 0
|
|
inner_attachments.append({
|
|
"filename": filename or "(unnamed)",
|
|
"content_type": ctype,
|
|
"size_bytes": size,
|
|
})
|
|
return
|
|
|
|
if ctype == "text/plain":
|
|
try:
|
|
text_parts.append(part.get_content())
|
|
except Exception:
|
|
try:
|
|
text_parts.append(part.get_payload(decode=True).decode(
|
|
part.get_content_charset() or "utf-8", errors="replace"))
|
|
except Exception:
|
|
pass
|
|
elif ctype == "text/html":
|
|
try:
|
|
html_parts.append(part.get_content())
|
|
except Exception:
|
|
try:
|
|
html_parts.append(part.get_payload(decode=True).decode(
|
|
part.get_content_charset() or "utf-8", errors="replace"))
|
|
except Exception:
|
|
pass
|
|
|
|
walk(msg)
|
|
|
|
body_text = "\n\n".join(t.strip() for t in text_parts if t and t.strip())
|
|
body_html = "\n".join(h for h in html_parts if h and h.strip())
|
|
if len(body_text) > MAX_BODY_BYTES:
|
|
body_text = body_text[:MAX_BODY_BYTES]
|
|
if len(body_html) > MAX_BODY_BYTES:
|
|
body_html = body_html[:MAX_BODY_BYTES]
|
|
|
|
return {
|
|
"subject": str(msg.get("Subject") or "").strip(),
|
|
"from": str(msg.get("From") or "").strip(),
|
|
"to": str(msg.get("To") or "").strip(),
|
|
"date": str(msg.get("Date") or "").strip(),
|
|
"body_text": body_text or None,
|
|
"body_html": body_html or None,
|
|
"inner_attachments": inner_attachments,
|
|
}
|
|
|
|
|
|
# --- hlavni smycka ----------------------------------------------------------
|
|
|
|
SMIME_FILTER = {
|
|
"$and": [
|
|
{"attachments.filename": {"$regex": "^smime\\.p7m$", "$options": "i"}},
|
|
{"smime_unwrapped": {"$ne": True}},
|
|
]
|
|
}
|
|
|
|
|
|
def find_p7m_graph_att_id(doc: dict) -> Optional[str]:
|
|
for att in doc.get("attachments") or []:
|
|
if (att.get("filename") or "").lower() == "smime.p7m":
|
|
return att.get("graph_att_id")
|
|
return None
|
|
|
|
|
|
def process_mailbox(col, mailbox: str, limit: Optional[int]) -> dict:
|
|
total = col.count_documents(SMIME_FILTER)
|
|
print(f"[{mailbox}] S/MIME k rozbaleni: {total}"
|
|
+ (f" (limit {limit})" if limit else ""))
|
|
if total == 0:
|
|
return {"mailbox": mailbox, "candidates": 0, "unwrapped": 0,
|
|
"errors": 0, "no_att_id": 0, "missing": 0,
|
|
"with_inner_att": 0, "inner_att_total": 0}
|
|
|
|
cursor = col.find(SMIME_FILTER, {"_id": 1, "graph_id": 1, "attachments": 1},
|
|
no_cursor_timeout=True)
|
|
if limit:
|
|
cursor = cursor.limit(limit)
|
|
|
|
n = unwrapped = err = no_att_id = missing = with_inner = inner_total = 0
|
|
bulk: list[UpdateOne] = []
|
|
|
|
try:
|
|
for doc in cursor:
|
|
n += 1
|
|
mid = doc["_id"]
|
|
gid = doc.get("graph_id")
|
|
att_id = find_p7m_graph_att_id(doc)
|
|
if not gid or not att_id:
|
|
no_att_id += 1
|
|
continue
|
|
|
|
url = f"{GRAPH_URL}/users/{mailbox}/messages/{gid}/attachments/{att_id}/$value"
|
|
try:
|
|
p7m_bytes = graph_get_raw(url)
|
|
except Exception as e:
|
|
err += 1
|
|
logging.error("[%s] graph fetch %s: %s", mailbox, gid, e)
|
|
bulk.append(UpdateOne({"_id": mid}, {"$set": {
|
|
"smime_unwrapped": False,
|
|
"smime_error": f"fetch: {type(e).__name__}: {e}"[:300],
|
|
"smime_processed_at": datetime.now(timezone.utc).replace(tzinfo=None),
|
|
}}))
|
|
continue
|
|
if p7m_bytes is None:
|
|
missing += 1
|
|
bulk.append(UpdateOne({"_id": mid}, {"$set": {
|
|
"smime_unwrapped": False,
|
|
"smime_error": "attachment_404",
|
|
"smime_processed_at": datetime.now(timezone.utc).replace(tzinfo=None),
|
|
}}))
|
|
continue
|
|
|
|
try:
|
|
inner_bytes = extract_inner_mime(p7m_bytes)
|
|
parsed = parse_inner_mime(inner_bytes)
|
|
except Exception as e:
|
|
err += 1
|
|
logging.error("[%s] unwrap %s: %s", mailbox, mid, e)
|
|
bulk.append(UpdateOne({"_id": mid}, {"$set": {
|
|
"smime_unwrapped": False,
|
|
"smime_error": f"unwrap: {type(e).__name__}: {e}"[:300],
|
|
"smime_processed_at": datetime.now(timezone.utc).replace(tzinfo=None),
|
|
}}))
|
|
continue
|
|
|
|
inner_atts = parsed["inner_attachments"]
|
|
inner_total += len(inner_atts)
|
|
if inner_atts:
|
|
with_inner += 1
|
|
|
|
update = {
|
|
"smime_unwrapped": True,
|
|
"smime_processed_at": datetime.now(timezone.utc).replace(tzinfo=None),
|
|
"smime_body_text": parsed["body_text"],
|
|
"smime_body_html": parsed["body_html"],
|
|
"smime_subject": parsed["subject"],
|
|
"smime_from": parsed["from"],
|
|
"smime_to": parsed["to"],
|
|
"smime_date": parsed["date"],
|
|
"smime_inner_attachments": inner_atts,
|
|
"smime_error": None,
|
|
}
|
|
bulk.append(UpdateOne({"_id": mid}, {"$set": update}))
|
|
unwrapped += 1
|
|
|
|
if len(bulk) >= BATCH_SIZE:
|
|
col.bulk_write(bulk, ordered=False)
|
|
bulk.clear()
|
|
|
|
if n % 50 == 0 or n == 1:
|
|
print(f" [{n:>5}/{total}] unwrapped={unwrapped} err={err} "
|
|
f"no_att_id={no_att_id} missing={missing} "
|
|
f"inner_atts_total={inner_total}", flush=True)
|
|
finally:
|
|
cursor.close()
|
|
if bulk:
|
|
col.bulk_write(bulk, ordered=False)
|
|
|
|
print(f" [{n}/{total}] DONE unwrapped={unwrapped} err={err} "
|
|
f"no_att_id={no_att_id} missing={missing} "
|
|
f"with_inner_atts={with_inner} inner_atts_total={inner_total}")
|
|
return {"mailbox": mailbox, "candidates": total, "unwrapped": unwrapped,
|
|
"errors": err, "no_att_id": no_att_id, "missing": missing,
|
|
"with_inner_att": with_inner, "inner_att_total": inner_total}
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--mailbox", help="Jedna konkretni schranka (default: vsechny)")
|
|
ap.add_argument("--limit", type=int, help="Limit emailu na schranku (test)")
|
|
args = ap.parse_args()
|
|
|
|
t0 = time.time()
|
|
print("Pripojuji se k MongoDB...")
|
|
mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
|
|
mongo.admin.command("ping")
|
|
db = mongo[MONGO_DB]
|
|
|
|
print("Token Graph API...")
|
|
get_token()
|
|
print("OK\n")
|
|
|
|
if args.mailbox:
|
|
if args.mailbox in SKIP_MAILBOXES:
|
|
print(f"CHYBA: {args.mailbox} je v SKIP_MAILBOXES — neni Graph pristup.")
|
|
return 2
|
|
mailboxes = [args.mailbox]
|
|
else:
|
|
mailboxes = []
|
|
for c in db.list_collection_names():
|
|
if c in SKIP_COLLECTIONS:
|
|
continue
|
|
if c in SKIP_MAILBOXES:
|
|
print(f" [skip] {c} — v SKIP_MAILBOXES (neni Graph pristup)")
|
|
continue
|
|
mailboxes.append(c)
|
|
print(f"Schranky ({len(mailboxes)}): {mailboxes}\n")
|
|
|
|
results = []
|
|
for mb in mailboxes:
|
|
results.append(process_mailbox(db[mb], mb, limit=args.limit))
|
|
print()
|
|
|
|
print("=== SHRNUTI ===")
|
|
for r in results:
|
|
print(f" {r['mailbox']}: candidates={r['candidates']} unwrapped={r['unwrapped']} "
|
|
f"errors={r['errors']} no_att_id={r['no_att_id']} missing={r['missing']} "
|
|
f"with_inner_atts={r['with_inner_att']} inner_atts_total={r['inner_att_total']}")
|
|
print(f"\nCelkem trvalo: {time.time() - t0:.1f} s")
|
|
total_errors = sum(r.get("errors", 0) for r in results)
|
|
return 1 if total_errors > 0 else 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
raise SystemExit(main())
|
|
except KeyboardInterrupt:
|
|
print("\nPreruseno uzivatelem")
|
|
except Exception:
|
|
traceback.print_exc()
|
|
sys.exit(1)
|