Files
janssen/Python-runner/3_download_attachments_v1.3.py
2026-06-05 21:21:30 +02:00

547 lines
20 KiB
Python

"""
download_attachments_v1.3.py
Nazev: download_attachments_v1.3.py
Verze: 1.3
Datum: 2026-06-02
Autor: vladimir.buzalka
Popis:
Stahuje skutecne prilohy (is_inline=False) vsech emailu z MongoDB
pres Microsoft Graph API a uklada je do adresare
/mnt/Emails/<schránka>/Attachments/.
Schránka se predava jako povinny parametr --mailbox.
Deduplikace podle SHA256 hashe obsahu:
- stejny hash = soubor uz existuje -> preskoci
- prvni vyskytu souboru: ulozi pod puvodnimnazvem
- kolize nazvu (stejny nazev, jiny hash): faktura_2.pdf, faktura_3.pdf ...
Po ulozeni aktualizuje MongoDB:
- v email dokumentu: kazda priloha dostane file_hash + local_path
- kolekce emaily.attachments_index: _id=hash, filename, path, size_bytes,
mime_type, mailbox, first_seen_at, ref_count
Bezpecne prerusit a opakovat — emaily kde vsechny prilohy maji file_hash
se preskoci. --force-recheck znovu overi i uz stazene.
POZOR: Skript pouze CIST ze schranky — zadny zapis do schranky!
Spousteni:
python download_attachments_v1.3.py # VSECHNY schranky (mimo SKIP_MAILBOXES)
python download_attachments_v1.3.py --mailbox ordinace@buzalkova.cz # jedna schranka
python download_attachments_v1.3.py --mailbox ordinace@buzalkova.cz --limit 50
python download_attachments_v1.3.py --mailbox ordinace@buzalkova.cz --force-recheck
SKIP_MAILBOXES (hardcoded):
vbuzalka@its.jnj.com — JNJ tenant, nemame Graph API pristup.
Docker:
docker exec -it python-runner python /scripts/3_download_attachments_v1.3.py
Zavislosti:
msal, requests, pymongo
Python 3.10+
Historie verzi:
1.0 2026-06-02 Inicialni verze
1.1 2026-06-02 Schránka jako parametr --mailbox
1.2 2026-06-02 Oprava: Graph attachment mapa vcetne inline; normalizace nazvu;
preskoceni S/MIME; inline z Graphu -> SKIP ne ERR
1.3 2026-06-02 Primarni stazeni pres graph_att_id (prime ID bez name-matchingu);
oprava $select na attachment listu (odstranen contentId ktery
zpusoboval BadRequest a vracel prazdny seznam); name-matching
zustava jako fallback pro stare emaily bez graph_att_id
"""
import sys
import re
import hashlib
import logging
import argparse
import unicodedata
from pathlib import Path
from datetime import datetime, timezone
from typing import Optional
import msal
import requests
from pymongo import MongoClient, UpdateOne
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
# ─── KONFIGURACE ──────────────────────────────────────────────────────────────
GRAPH_TENANT_ID = "7d269944-37a4-43a1-8140-c7517dc426e9"
GRAPH_CLIENT_ID = "4b222bfd-78c9-4239-a53f-43006b3ed07f"
GRAPH_CLIENT_SECRET = "Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk"
GRAPH_URL = "https://graph.microsoft.com/v1.0"
MONGO_URI = "mongodb://192.168.1.76:27017"
MONGO_DB = "emaily"
MONGO_COL_INDEX = "attachments_index"
EMAILS_BASE_DIR = Path("/mnt/Emails")
LOG_FILE = Path(__file__).parent / "parse_emails_errors.log"
SCRIPT_VERSION = "1.3"
BATCH_SIZE = 50
# Typy příloh které přeskočíme (S/MIME podpisy, certifikáty)
SKIP_EXTENSIONS = {".p7m", ".p7s", ".p7c", ".p7b"}
# Kolekce v `emaily` ktere NEJSOU mailboxy
NON_MAILBOX_COLLECTIONS = {"attachments_index", "sync_state"}
# Schranky kde NEMAME Graph API pristup — pri behu bez --mailbox se preskocia
SKIP_MAILBOXES = {
"vbuzalka@its.jnj.com", # JNJ tenant — nemame Graph credentials
}
# ──────────────────────────────────────────────────────────────────────────────
logging.basicConfig(
filename=str(LOG_FILE),
level=logging.ERROR,
format="%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
encoding="utf-8",
)
_graph_token: Optional[str] = None
# ─── Graph API ────────────────────────────────────────────────────────────────
def get_token() -> str:
global _graph_token
app = msal.ConfidentialClientApplication(
GRAPH_CLIENT_ID,
authority=f"https://login.microsoftonline.com/{GRAPH_TENANT_ID}",
client_credential=GRAPH_CLIENT_SECRET,
)
result = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])
if "access_token" not in result:
raise RuntimeError(f"Graph auth failed: {result}")
_graph_token = result["access_token"]
return _graph_token
def graph_get_bytes(url: str) -> bytes:
global _graph_token
if not _graph_token:
get_token()
for attempt in range(2):
r = requests.get(url, headers={"Authorization": f"Bearer {_graph_token}"}, timeout=120, stream=True)
if r.status_code == 401:
get_token()
continue
r.raise_for_status()
return r.content
raise RuntimeError(f"Graph GET bytes failed: {url}")
def graph_get_json(url: str, params: dict = None) -> dict:
global _graph_token
if not _graph_token:
get_token()
for attempt in range(2):
r = requests.get(url, headers={"Authorization": f"Bearer {_graph_token}"}, params=params, timeout=30)
if r.status_code == 401:
get_token()
continue
r.raise_for_status()
return r.json()
raise RuntimeError(f"Graph GET json failed: {url}")
def fetch_message_attachments(mailbox: str, graph_message_id: str) -> list[dict]:
"""Nacte metadata vsech priloh zpravy (bez contentBytes)."""
url = f"{GRAPH_URL}/users/{mailbox}/messages/{graph_message_id}/attachments"
try:
# Pozor: contentId NENI v base attachment type — nesmi byt v $select
data = graph_get_json(url, {"$select": "id,name,contentType,size,isInline"})
return data.get("value", [])
except Exception as e:
logging.error("fetch_message_attachments failed [%s]: %s", graph_message_id, e)
return []
def fetch_attachment_content(mailbox: str, graph_message_id: str, attachment_id: str) -> Optional[bytes]:
url = f"{GRAPH_URL}/users/{mailbox}/messages/{graph_message_id}/attachments/{attachment_id}/$value"
try:
return graph_get_bytes(url)
except Exception as e:
logging.error("fetch_attachment_content failed [msg=%s att=%s]: %s",
graph_message_id, attachment_id, e)
return None
# ─── Pomocné funkce ───────────────────────────────────────────────────────────
def normalize_name(name: str) -> str:
"""Normalizuje název pro porovnání — lowercase, bez diakritiky, jen alnum+._-"""
nfkd = unicodedata.normalize("NFKD", name.lower().strip())
ascii_str = "".join(c for c in nfkd if not unicodedata.combining(c))
return re.sub(r"[^\w.\-]", "_", ascii_str)
def find_graph_att(att_name: str, att_size: int, graph_atts: list[dict]) -> Optional[dict]:
"""Fallback: hleda prilohu v Graph listu podle jmena (pro emaily bez graph_att_id)."""
# 1. Presna shoda
for ga in graph_atts:
if ga["name"] == att_name:
return ga
norm_want = normalize_name(att_name)
# 2. Normalizovana shoda
for ga in graph_atts:
if normalize_name(ga["name"]) == norm_want:
return ga
# 3. Normalizovana shoda + velikost (±10 %)
for ga in graph_atts:
if normalize_name(ga["name"]) == norm_want:
ga_size = ga.get("size", 0)
if att_size == 0 or ga_size == 0 or abs(ga_size - att_size) / max(ga_size, att_size) < 0.1:
return ga
# 4. Castecna shoda sufixu (posledních 20 znaků normalizovaného jména)
for ga in graph_atts:
if norm_want[-20:] and normalize_name(ga["name"]).endswith(norm_want[-20:]):
return ga
return None
def sha256(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def safe_filename(name: str) -> str:
safe = "".join(c if c.isalnum() or c in "._- ()" else "_" for c in name).strip()
return safe or "attachment"
def resolve_filename(desired_name: str, att_dir: Path, hash_val: str, col_index) -> str:
existing = col_index.find_one({"filename": desired_name})
if existing:
if existing["_id"] == hash_val:
return desired_name
stem = Path(desired_name).stem
suffix = Path(desired_name).suffix
n = 2
while True:
candidate = f"{stem}_{n}{suffix}"
ex2 = col_index.find_one({"filename": candidate})
if not ex2 or ex2["_id"] == hash_val:
if not (att_dir / candidate).exists() or (ex2 and ex2["_id"] == hash_val):
return candidate
n += 1
return desired_name
def save_attachment(
content: bytes,
original_name: str,
mime_type: str,
mailbox: str,
att_dir: Path,
col_index,
) -> tuple[str, str, bool]:
hash_val = sha256(content)
existing = col_index.find_one({"_id": hash_val})
if existing:
col_index.update_one({"_id": hash_val}, {"$inc": {"ref_count": 1}})
return hash_val, existing["local_path"], False
filename = resolve_filename(safe_filename(original_name), att_dir, hash_val, col_index)
file_path = att_dir / filename
file_path.write_bytes(content)
col_index.insert_one({
"_id": hash_val,
"filename": filename,
"local_path": filename,
"size_bytes": len(content),
"mime_type": mime_type,
"mailbox": mailbox,
"first_seen_at": datetime.now(timezone.utc).replace(tzinfo=None),
"ref_count": 1,
})
return hash_val, filename, True
# ─── MAIN ─────────────────────────────────────────────────────────────────────
def process_mailbox(client, mailbox: str, args) -> dict:
"""Zpracuje jednu schranku. Vraci statistiky."""
att_dir = EMAILS_BASE_DIR / mailbox / "Attachments"
mongo_col = mailbox
start = datetime.now()
print(f"\n========== {mailbox} ==========")
print(f"Cilovy adresar: {att_dir}")
att_dir.mkdir(parents=True, exist_ok=True)
col_emails = client[MONGO_DB][mongo_col]
col_index = client[MONGO_DB][MONGO_COL_INDEX]
if args.force_recheck:
query = {"has_attachments": True}
else:
query = {
"has_attachments": True,
"attachments": {
"$elemMatch": {
"is_inline": False,
"file_hash": {"$exists": False},
}
}
}
total = col_emails.count_documents(query)
print(f"Emailu ke zpracovani: {total}")
if total == 0:
print(" Neni co stahnout.")
return {"mailbox": mailbox, "ok": 0, "new": 0, "dup": 0, "skip": 0, "err": 0,
"elapsed": 0.0}
cursor = col_emails.find(query, {"_id": 1, "graph_id": 1, "subject": 1, "attachments": 1})
if args.limit:
cursor = cursor.limit(args.limit)
ok_count = 0
new_count = 0
dup_count = 0
skip_count = 0
err_count = 0
email_i = 0
batch = []
def flush():
if not batch:
return
try:
col_emails.bulk_write(batch, ordered=False)
except Exception as e:
logging.error("bulk_write: %s", e)
print(f" CHYBA bulk_write: {e}")
batch.clear()
for email_doc in cursor:
email_i += 1
email_id = email_doc["_id"]
graph_id = email_doc.get("graph_id", "")
subject = (email_doc.get("subject") or "")[:60]
att_list = email_doc.get("attachments") or []
real_atts = [a for a in att_list if not a.get("is_inline", False)]
if not real_atts:
continue
print(f"\n {email_i:>5}/{total} {subject}")
# Nacti attachment list z Graphu jen pokud nektere prilohy nemaji graph_att_id
need_listing = any(
not a.get("is_inline", False)
and not (not args.force_recheck and a.get("file_hash"))
and not a.get("graph_att_id")
for a in att_list
)
graph_atts = fetch_message_attachments(mailbox, graph_id) if need_listing else []
updated_atts = list(att_list)
email_ok = True
for i, att in enumerate(updated_atts):
if att.get("is_inline", False):
continue
if not args.force_recheck and att.get("file_hash"):
continue
att_name = att.get("filename", "")
att_size = att.get("size_bytes", 0)
graph_att_id = att.get("graph_att_id")
# Preskoc S/MIME podpisy
if Path(att_name).suffix.lower() in SKIP_EXTENSIONS:
updated_atts[i] = {**att, "file_hash": "skip", "local_path": ""}
skip_count += 1
print(f" SKIP {att_name} (S/MIME)")
continue
# Primy pristup pres graph_att_id (emaily parsovane v1.2+)
if graph_att_id:
content = fetch_attachment_content(mailbox, graph_id, graph_att_id)
if content is None:
err_count += 1
email_ok = False
print(f" ERR {att_name} (stazeni selhalo)")
continue
# Zkontroluj zda jde skutecne o inline (pro edge case)
mime_type = att.get("mime_type", "")
else:
# Fallback: name matching pro stare emaily (parsovane pred v1.2)
graph_att = find_graph_att(att_name, att_size, graph_atts)
if not graph_att:
logging.error("attachment not found [email=%s att=%s]", email_id, att_name)
print(f" ERR {att_name} (nenalezeno)")
err_count += 1
email_ok = False
continue
# Pokud Graph rika ze je inline — preskoc
if graph_att.get("isInline", False):
updated_atts[i] = {**att, "is_inline": True, "file_hash": "skip", "local_path": ""}
skip_count += 1
print(f" SKIP {att_name} (inline obrazek)")
continue
content = fetch_attachment_content(mailbox, graph_id, graph_att["id"])
if content is None:
err_count += 1
email_ok = False
print(f" ERR {att_name} (stazeni selhalo)")
continue
mime_type = att.get("mime_type") or graph_att.get("contentType", "")
hash_val, local_path, was_new = save_attachment(
content, att_name, mime_type, mailbox, att_dir, col_index
)
updated_atts[i] = {**att, "file_hash": hash_val, "local_path": local_path}
if was_new:
new_count += 1
print(f" NEW {local_path} ({len(content):,} B)")
else:
dup_count += 1
print(f" DUP {att_name} -> {local_path}")
if email_ok:
ok_count += 1
batch.append(UpdateOne({"_id": email_id}, {"$set": {"attachments": updated_atts}}))
if len(batch) >= BATCH_SIZE:
flush()
if email_i % 100 == 0:
elapsed = (datetime.now() - start).total_seconds()
print(f" {''*60}")
print(f" Průběh: emaily={email_i}/{total} nove={new_count} dup={dup_count} skip={skip_count} err={err_count}")
print(f" {''*60}")
flush()
elapsed = (datetime.now() - start).total_seconds()
print(f" -> mailbox total: emaily={ok_count} nove={new_count} dup={dup_count} "
f"skip={skip_count} err={err_count} ({elapsed:.1f} s)")
return {"mailbox": mailbox, "ok": ok_count, "new": new_count, "dup": dup_count,
"skip": skip_count, "err": err_count, "elapsed": elapsed}
def discover_mailboxes(db) -> list[str]:
"""Vrati seznam mailboxu = vsechny kolekce mimo NON_MAILBOX a SKIP_MAILBOXES."""
out = []
for name in sorted(db.list_collection_names()):
if name in NON_MAILBOX_COLLECTIONS:
continue
if name in SKIP_MAILBOXES:
print(f" [skip] {name} — v SKIP_MAILBOXES (neni Graph pristup)")
continue
out.append(name)
return out
def main():
ap = argparse.ArgumentParser(description=f"download_attachments v{SCRIPT_VERSION}")
ap.add_argument("--mailbox", default="",
help="Emailova schranka. Bez argumentu projede vsechny schranky "
"v `emaily` mimo SKIP_MAILBOXES.")
ap.add_argument("--limit", type=int, default=0,
help="Zpracovat max N emailu (0 = vse) — per schranka")
ap.add_argument("--force-recheck", action="store_true",
help="Znovu overi i emaily kde prilohy uz maji file_hash")
ap.add_argument("--no-indexes", action="store_true",
help="Nevytvorit indexy na attachments_index kolekci")
args = ap.parse_args()
start_all = datetime.now()
print(f"=== download_attachments v{SCRIPT_VERSION} ===")
print(f"Start: {start_all.strftime('%Y-%m-%d %H:%M:%S')}")
print("\nPřipojuji se k Graph API...")
try:
get_token()
print(" Graph API OK")
except Exception as e:
print(f" CHYBA: {e}")
sys.exit(1)
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
try:
client.admin.command("ping")
print(" MongoDB OK")
except Exception as e:
print(f" CHYBA: MongoDB neni dostupna -- {e}")
sys.exit(1)
col_index = client[MONGO_DB][MONGO_COL_INDEX]
if not args.no_indexes:
col_index.create_index("filename")
col_index.create_index("mime_type")
col_index.create_index("mailbox")
db = client[MONGO_DB]
if args.mailbox:
if args.mailbox in SKIP_MAILBOXES:
print(f" CHYBA: {args.mailbox} je v SKIP_MAILBOXES — neni Graph pristup.")
sys.exit(2)
mailboxes = [args.mailbox]
else:
mailboxes = discover_mailboxes(db)
print(f" Schranky ke zpracovani: {len(mailboxes)}")
for m in mailboxes:
print(f" {m}")
results = []
for mb in mailboxes:
try:
results.append(process_mailbox(client, mb, args))
except Exception as e:
logging.error("process_mailbox %s: %s", mb, e)
print(f" FATAL pri zpracovani {mb}: {e}")
results.append({"mailbox": mb, "ok": 0, "new": 0, "dup": 0,
"skip": 0, "err": 1, "elapsed": 0.0})
elapsed_total = (datetime.now() - start_all).total_seconds()
files_total = col_index.count_documents({})
size_total = sum(d.get("size_bytes", 0) for d in col_index.find({}, {"size_bytes": 1}))
grand = {k: sum(r[k] for r in results) for k in ("ok", "new", "dup", "skip", "err")}
print(f"\n{'='*60}")
print("=== SHRNUTI ===")
for r in results:
print(f" {r['mailbox']:40} ok={r['ok']:>5} nove={r['new']:>4} "
f"dup={r['dup']:>4} skip={r['skip']:>3} err={r['err']:>3}")
print(f" {'TOTAL':40} ok={grand['ok']:>5} nove={grand['new']:>4} "
f"dup={grand['dup']:>4} skip={grand['skip']:>3} err={grand['err']:>3}")
print(f"Souboru v indexu: {files_total} ({size_total / 1024 / 1024:.1f} MB)")
print(f"Celkovy cas: {int(elapsed_total//3600)}h {int((elapsed_total%3600)//60)}m {int(elapsed_total%60)}s")
print(f"\nKonec: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
if grand['err']:
print(f"Chyby logovany do: {LOG_FILE}")
client.close()
if __name__ == "__main__":
main()