714 lines
27 KiB
Python
714 lines
27 KiB
Python
"""
|
|
download_attachments_v1.4.py
|
|
Nazev: download_attachments_v1.4.py
|
|
Verze: 1.4
|
|
Datum: 2026-06-04
|
|
Autor: vladimir.buzalka
|
|
|
|
Popis:
|
|
Stahuje skutecne prilohy (is_inline=False) vsech emailu z MongoDB
|
|
pres Microsoft Graph API a uklada je do adresare
|
|
/mnt/Emails/<schranka>/Attachments/.
|
|
|
|
Bez argumentu --mailbox projede vsechny kolekce v `emaily` mimo
|
|
NON_MAILBOX_COLLECTIONS a SKIP_MAILBOXES.
|
|
|
|
Deduplikace podle SHA256 hashe obsahu:
|
|
- stejny hash = soubor uz existuje -> preskoci
|
|
- prvni vyskyt: ulozi pod puvodnim nazvem
|
|
- kolize nazvu: faktura_2.pdf, faktura_3.pdf ...
|
|
|
|
Po ulozeni aktualizuje MongoDB:
|
|
- v email dokumentu: kazda priloha dostane file_hash + local_path
|
|
- kolekce emaily.attachments_index: _id=hash, filename, ...
|
|
|
|
NOVE v 1.4:
|
|
- Spravne zpracovani vsech typu priloh:
|
|
* fileAttachment -> /$value (jako predtim)
|
|
* itemAttachment -> /$expand=microsoft.graph.itemAttachment/item
|
|
-> sestavi .eml z vnitrni zpravy
|
|
* referenceAttachment -> ulozi jen URL, neexistuje content
|
|
- Retry s exponencialnim backoffem (1s, 2s, 4s) na 429/5xx
|
|
- Permanentni tagging chyb v Mongo per-attachment:
|
|
* attachment_missing: True (404, email/att uz neexistuje)
|
|
* attachment_reference: True (referenceAttachment, jen URL)
|
|
* reference_url, attachment_type — diagnosticke metadata
|
|
- Tagovane prilohy se pri dalsim behu preskocia (bez --force-recheck)
|
|
|
|
POZOR: Skript pouze CIST ze schranky — zadny zapis do schranky!
|
|
|
|
Spousteni:
|
|
python download_attachments_v1.4.py
|
|
python download_attachments_v1.4.py --mailbox ordinace@buzalkova.cz
|
|
python download_attachments_v1.4.py --mailbox ordinace@buzalkova.cz --limit 50
|
|
python download_attachments_v1.4.py --mailbox ordinace@buzalkova.cz --force-recheck
|
|
|
|
SKIP_MAILBOXES (hardcoded):
|
|
vbuzalka@its.jnj.com — JNJ tenant, nemame Graph API pristup.
|
|
|
|
Docker:
|
|
docker exec -it python-runner python /scripts/3_download_attachments_v1.4.py
|
|
|
|
Zavislosti:
|
|
msal, requests, pymongo
|
|
Python 3.10+
|
|
|
|
Historie verzi:
|
|
1.0 2026-06-02 Inicialni verze
|
|
1.1 2026-06-02 Schranka jako parametr --mailbox
|
|
1.2 2026-06-02 Oprava: Graph attachment mapa vcetne inline; normalizace nazvu
|
|
1.3 2026-06-02 Primarni stazeni pres graph_att_id; --mailbox volitelny
|
|
1.4 2026-06-04 itemAttachment/referenceAttachment handling; retry s backoffem;
|
|
permanentni tagging chyb (attachment_missing / attachment_reference)
|
|
"""
|
|
|
|
import sys
|
|
import re
|
|
import time
|
|
import json
|
|
import hashlib
|
|
import logging
|
|
import argparse
|
|
import unicodedata
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
import msal
|
|
import requests
|
|
from pymongo import MongoClient, UpdateOne
|
|
|
|
if hasattr(sys.stdout, "reconfigure"):
|
|
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
|
|
|
|
# ─── KONFIGURACE ──────────────────────────────────────────────────────────────
|
|
GRAPH_TENANT_ID = "7d269944-37a4-43a1-8140-c7517dc426e9"
|
|
GRAPH_CLIENT_ID = "4b222bfd-78c9-4239-a53f-43006b3ed07f"
|
|
GRAPH_CLIENT_SECRET = "Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk"
|
|
GRAPH_URL = "https://graph.microsoft.com/v1.0"
|
|
|
|
MONGO_URI = "mongodb://192.168.1.76:27017"
|
|
MONGO_DB = "emaily"
|
|
MONGO_COL_INDEX = "attachments_index"
|
|
|
|
EMAILS_BASE_DIR = Path("/mnt/Emails")
|
|
LOG_FILE = Path(__file__).parent / "parse_emails_errors.log"
|
|
SCRIPT_VERSION = "1.4"
|
|
BATCH_SIZE = 50
|
|
|
|
# Typy příloh které přeskočíme (S/MIME podpisy, certifikáty)
|
|
SKIP_EXTENSIONS = {".p7m", ".p7s", ".p7c", ".p7b"}
|
|
|
|
# Kolekce v `emaily` ktere NEJSOU mailboxy
|
|
NON_MAILBOX_COLLECTIONS = {"attachments_index", "sync_state"}
|
|
|
|
# Schranky kde NEMAME Graph API pristup
|
|
SKIP_MAILBOXES = {
|
|
"vbuzalka@its.jnj.com",
|
|
}
|
|
|
|
# Retry konfigurace pro tranzientni chyby
|
|
RETRY_STATUSES = {429, 500, 502, 503, 504}
|
|
RETRY_BACKOFF_S = [1, 2, 4] # max 3 pokusy
|
|
|
|
# Sentinel hodnoty pro fetch_attachment_smart
|
|
FETCH_MISSING = "__MISSING__" # 404
|
|
FETCH_REFERENCE = "__REFERENCE__" # referenceAttachment
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
logging.basicConfig(
|
|
filename=str(LOG_FILE),
|
|
level=logging.ERROR,
|
|
format="%(asctime)s | %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
_graph_token: Optional[str] = None
|
|
|
|
|
|
# ─── Graph API ────────────────────────────────────────────────────────────────
|
|
|
|
def get_token() -> str:
|
|
global _graph_token
|
|
app = msal.ConfidentialClientApplication(
|
|
GRAPH_CLIENT_ID,
|
|
authority=f"https://login.microsoftonline.com/{GRAPH_TENANT_ID}",
|
|
client_credential=GRAPH_CLIENT_SECRET,
|
|
)
|
|
result = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])
|
|
if "access_token" not in result:
|
|
raise RuntimeError(f"Graph auth failed: {result}")
|
|
_graph_token = result["access_token"]
|
|
return _graph_token
|
|
|
|
|
|
def _graph_request(method: str, url: str, *, params: dict = None,
|
|
stream: bool = False, timeout: int = 60):
|
|
"""Nizko-urovnova HTTP volani s retry na 429/5xx a auto-reauth na 401.
|
|
Vraci requests.Response (pro stream=True pred .content); pro 404 vraci Response."""
|
|
global _graph_token
|
|
if not _graph_token:
|
|
get_token()
|
|
|
|
last_exc = None
|
|
for attempt in range(len(RETRY_BACKOFF_S) + 1):
|
|
try:
|
|
r = requests.request(
|
|
method, url,
|
|
headers={"Authorization": f"Bearer {_graph_token}"},
|
|
params=params, timeout=timeout, stream=stream,
|
|
)
|
|
if r.status_code == 401:
|
|
get_token()
|
|
continue
|
|
if r.status_code in RETRY_STATUSES and attempt < len(RETRY_BACKOFF_S):
|
|
# Retry-After hlavicka ma prednost
|
|
ra = r.headers.get("Retry-After")
|
|
sleep_s = float(ra) if ra and ra.replace(".", "").isdigit() else RETRY_BACKOFF_S[attempt]
|
|
time.sleep(sleep_s)
|
|
continue
|
|
return r
|
|
except (requests.ConnectionError, requests.Timeout) as e:
|
|
last_exc = e
|
|
if attempt < len(RETRY_BACKOFF_S):
|
|
time.sleep(RETRY_BACKOFF_S[attempt])
|
|
continue
|
|
raise
|
|
raise RuntimeError(f"Graph request exhausted retries: {url} (last_exc={last_exc})")
|
|
|
|
|
|
def graph_get_json(url: str, params: dict = None) -> dict:
|
|
r = _graph_request("GET", url, params=params, timeout=30)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
|
|
def graph_get_bytes(url: str) -> bytes:
|
|
r = _graph_request("GET", url, stream=True, timeout=120)
|
|
r.raise_for_status()
|
|
return r.content
|
|
|
|
|
|
def fetch_message_attachments(mailbox: str, graph_message_id: str) -> list[dict]:
|
|
"""Nacte metadata vsech priloh zpravy. Vraci i @odata.type."""
|
|
url = f"{GRAPH_URL}/users/{mailbox}/messages/{graph_message_id}/attachments"
|
|
try:
|
|
# @odata.type se vraci automaticky (neni v base $select)
|
|
data = graph_get_json(url, {"$select": "id,name,contentType,size,isInline"})
|
|
return data.get("value", [])
|
|
except Exception as e:
|
|
logging.error("fetch_message_attachments failed [%s]: %s", graph_message_id, e)
|
|
return []
|
|
|
|
|
|
def _build_eml_from_item(item: dict) -> bytes:
|
|
"""Sestavi minimalni RFC822 .eml z itemAttachment.item (message)."""
|
|
def hdr(name, val):
|
|
return f"{name}: {val}\r\n" if val else ""
|
|
|
|
def addrs(field):
|
|
rec = item.get(field) or []
|
|
out = []
|
|
for r in rec:
|
|
ea = r.get("emailAddress") or {}
|
|
name = ea.get("name", "")
|
|
addr = ea.get("address", "")
|
|
if name and addr:
|
|
out.append(f'"{name}" <{addr}>')
|
|
elif addr:
|
|
out.append(addr)
|
|
return ", ".join(out)
|
|
|
|
subj = item.get("subject", "")
|
|
sender = item.get("from") or item.get("sender") or {}
|
|
sender_ea = sender.get("emailAddress") or {}
|
|
from_str = (f'"{sender_ea.get("name","")}" <{sender_ea.get("address","")}>'
|
|
if sender_ea.get("address") else "")
|
|
sent = item.get("sentDateTime") or item.get("receivedDateTime") or ""
|
|
|
|
body = item.get("body") or {}
|
|
content_type = body.get("contentType", "text") # 'text' | 'html'
|
|
body_content = body.get("content", "") or ""
|
|
|
|
mime_type = "text/html" if content_type.lower() == "html" else "text/plain"
|
|
|
|
headers = (
|
|
hdr("From", from_str)
|
|
+ hdr("To", addrs("toRecipients"))
|
|
+ hdr("Cc", addrs("ccRecipients"))
|
|
+ hdr("Subject", subj)
|
|
+ hdr("Date", sent)
|
|
+ f"Content-Type: {mime_type}; charset=utf-8\r\n"
|
|
+ "MIME-Version: 1.0\r\n"
|
|
+ "\r\n"
|
|
)
|
|
return (headers + body_content).encode("utf-8", errors="replace")
|
|
|
|
|
|
def fetch_attachment_smart(mailbox: str, graph_message_id: str,
|
|
attachment_id: str, odata_type: str = "") -> tuple:
|
|
"""Smart fetch: rozezna typ prilohy a vrati (content_bytes, type_str, extra).
|
|
type_str: 'file' | 'item' | 'reference' | FETCH_MISSING | FETCH_REFERENCE
|
|
extra: pri 'reference' = sourceUrl; pri 'item' = puvodni subject (info)
|
|
Vraci (None, FETCH_MISSING, None) pri 404.
|
|
Vyhazuje exception pri jinych failures po vycerpani retry.
|
|
"""
|
|
base = f"{GRAPH_URL}/users/{mailbox}/messages/{graph_message_id}/attachments/{attachment_id}"
|
|
|
|
# Zname typ → optimalni cesta
|
|
if odata_type == "#microsoft.graph.fileAttachment":
|
|
r = _graph_request("GET", base + "/$value", stream=True, timeout=120)
|
|
if r.status_code == 404:
|
|
return (None, FETCH_MISSING, None)
|
|
r.raise_for_status()
|
|
return (r.content, "file", None)
|
|
|
|
if odata_type == "#microsoft.graph.itemAttachment":
|
|
r = _graph_request("GET", base,
|
|
params={"$expand": "microsoft.graph.itemAttachment/item"},
|
|
timeout=60)
|
|
if r.status_code == 404:
|
|
return (None, FETCH_MISSING, None)
|
|
r.raise_for_status()
|
|
obj = r.json()
|
|
item = obj.get("item") or {}
|
|
return (_build_eml_from_item(item), "item", item.get("subject"))
|
|
|
|
if odata_type == "#microsoft.graph.referenceAttachment":
|
|
r = _graph_request("GET", base, timeout=30)
|
|
if r.status_code == 404:
|
|
return (None, FETCH_MISSING, None)
|
|
r.raise_for_status()
|
|
obj = r.json()
|
|
return (None, FETCH_REFERENCE, obj.get("sourceUrl") or obj.get("name"))
|
|
|
|
# Neznamy typ — zkus $value, pri 405 detekuj typ a rekurzivne zpracuj
|
|
r = _graph_request("GET", base + "/$value", stream=True, timeout=120)
|
|
if r.status_code == 404:
|
|
return (None, FETCH_MISSING, None)
|
|
if r.status_code == 405:
|
|
# Method Not Allowed -> neni fileAttachment; zjisti typ
|
|
r2 = _graph_request("GET", base, timeout=30)
|
|
if r2.status_code == 404:
|
|
return (None, FETCH_MISSING, None)
|
|
r2.raise_for_status()
|
|
obj = r2.json()
|
|
ot = obj.get("@odata.type", "")
|
|
if ot == "#microsoft.graph.itemAttachment":
|
|
# objekt nema item bez expand → druhy request
|
|
return fetch_attachment_smart(mailbox, graph_message_id, attachment_id, ot)
|
|
if ot == "#microsoft.graph.referenceAttachment":
|
|
return (None, FETCH_REFERENCE, obj.get("sourceUrl") or obj.get("name"))
|
|
# fallback: fileAttachment ale jeho contentBytes je v JSON
|
|
if ot == "#microsoft.graph.fileAttachment":
|
|
import base64
|
|
cb = obj.get("contentBytes")
|
|
if cb:
|
|
return (base64.b64decode(cb), "file", None)
|
|
raise RuntimeError(f"unknown attachment odata.type={ot}")
|
|
r.raise_for_status()
|
|
return (r.content, "file", None)
|
|
|
|
|
|
# ─── Pomocne funkce ───────────────────────────────────────────────────────────
|
|
|
|
def normalize_name(name: str) -> str:
|
|
nfkd = unicodedata.normalize("NFKD", name.lower().strip())
|
|
ascii_str = "".join(c for c in nfkd if not unicodedata.combining(c))
|
|
return re.sub(r"[^\w.\-]", "_", ascii_str)
|
|
|
|
|
|
def find_graph_att(att_name: str, att_size: int, graph_atts: list[dict]) -> Optional[dict]:
|
|
for ga in graph_atts:
|
|
if ga["name"] == att_name:
|
|
return ga
|
|
norm_want = normalize_name(att_name)
|
|
for ga in graph_atts:
|
|
if normalize_name(ga["name"]) == norm_want:
|
|
return ga
|
|
for ga in graph_atts:
|
|
if normalize_name(ga["name"]) == norm_want:
|
|
ga_size = ga.get("size", 0)
|
|
if att_size == 0 or ga_size == 0 or abs(ga_size - att_size) / max(ga_size, att_size) < 0.1:
|
|
return ga
|
|
for ga in graph_atts:
|
|
if norm_want[-20:] and normalize_name(ga["name"]).endswith(norm_want[-20:]):
|
|
return ga
|
|
return None
|
|
|
|
|
|
def sha256(data: bytes) -> str:
|
|
return hashlib.sha256(data).hexdigest()
|
|
|
|
|
|
def safe_filename(name: str) -> str:
|
|
safe = "".join(c if c.isalnum() or c in "._- ()" else "_" for c in name).strip()
|
|
return safe or "attachment"
|
|
|
|
|
|
def resolve_filename(desired_name: str, att_dir: Path, hash_val: str, col_index) -> str:
|
|
existing = col_index.find_one({"filename": desired_name})
|
|
if existing:
|
|
if existing["_id"] == hash_val:
|
|
return desired_name
|
|
stem = Path(desired_name).stem
|
|
suffix = Path(desired_name).suffix
|
|
n = 2
|
|
while True:
|
|
candidate = f"{stem}_{n}{suffix}"
|
|
ex2 = col_index.find_one({"filename": candidate})
|
|
if not ex2 or ex2["_id"] == hash_val:
|
|
if not (att_dir / candidate).exists() or (ex2 and ex2["_id"] == hash_val):
|
|
return candidate
|
|
n += 1
|
|
return desired_name
|
|
|
|
|
|
def save_attachment(content: bytes, original_name: str, mime_type: str,
|
|
mailbox: str, att_dir: Path, col_index) -> tuple[str, str, bool]:
|
|
hash_val = sha256(content)
|
|
existing = col_index.find_one({"_id": hash_val})
|
|
if existing:
|
|
col_index.update_one({"_id": hash_val}, {"$inc": {"ref_count": 1}})
|
|
return hash_val, existing["local_path"], False
|
|
|
|
filename = resolve_filename(safe_filename(original_name), att_dir, hash_val, col_index)
|
|
file_path = att_dir / filename
|
|
file_path.write_bytes(content)
|
|
|
|
col_index.insert_one({
|
|
"_id": hash_val,
|
|
"filename": filename,
|
|
"local_path": filename,
|
|
"size_bytes": len(content),
|
|
"mime_type": mime_type,
|
|
"mailbox": mailbox,
|
|
"first_seen_at": datetime.now(timezone.utc).replace(tzinfo=None),
|
|
"ref_count": 1,
|
|
})
|
|
return hash_val, filename, True
|
|
|
|
|
|
# ─── MAIN ─────────────────────────────────────────────────────────────────────
|
|
|
|
def process_mailbox(client, mailbox: str, args) -> dict:
|
|
att_dir = EMAILS_BASE_DIR / mailbox / "Attachments"
|
|
mongo_col = mailbox
|
|
|
|
start = datetime.now()
|
|
print(f"\n========== {mailbox} ==========")
|
|
print(f"Cilovy adresar: {att_dir}")
|
|
|
|
att_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
col_emails = client[MONGO_DB][mongo_col]
|
|
col_index = client[MONGO_DB][MONGO_COL_INDEX]
|
|
|
|
if args.force_recheck:
|
|
query = {"has_attachments": True}
|
|
else:
|
|
# priloha "ke zpracovani" = neni inline, nema file_hash, neni oznacena
|
|
# jako missing/reference
|
|
query = {
|
|
"has_attachments": True,
|
|
"attachments": {
|
|
"$elemMatch": {
|
|
"is_inline": False,
|
|
"file_hash": {"$exists": False},
|
|
"attachment_missing": {"$ne": True},
|
|
"attachment_reference": {"$ne": True},
|
|
}
|
|
}
|
|
}
|
|
|
|
total = col_emails.count_documents(query)
|
|
print(f"Emailu ke zpracovani: {total}")
|
|
if total == 0:
|
|
print(" Neni co stahnout.")
|
|
return {"mailbox": mailbox, "ok": 0, "new": 0, "dup": 0, "skip": 0,
|
|
"miss": 0, "ref": 0, "err": 0, "elapsed": 0.0}
|
|
|
|
cursor = col_emails.find(query, {"_id": 1, "graph_id": 1, "subject": 1, "attachments": 1})
|
|
if args.limit:
|
|
cursor = cursor.limit(args.limit)
|
|
|
|
ok_count = 0
|
|
new_count = 0
|
|
dup_count = 0
|
|
skip_count = 0
|
|
miss_count = 0
|
|
ref_count = 0
|
|
err_count = 0
|
|
email_i = 0
|
|
batch = []
|
|
|
|
def flush():
|
|
if not batch:
|
|
return
|
|
try:
|
|
col_emails.bulk_write(batch, ordered=False)
|
|
except Exception as e:
|
|
logging.error("bulk_write: %s", e)
|
|
print(f" CHYBA bulk_write: {e}")
|
|
batch.clear()
|
|
|
|
for email_doc in cursor:
|
|
email_i += 1
|
|
email_id = email_doc["_id"]
|
|
graph_id = email_doc.get("graph_id", "")
|
|
subject = (email_doc.get("subject") or "")[:60]
|
|
att_list = email_doc.get("attachments") or []
|
|
|
|
real_atts = [a for a in att_list if not a.get("is_inline", False)
|
|
and not a.get("attachment_missing")
|
|
and not a.get("attachment_reference")]
|
|
if not real_atts:
|
|
continue
|
|
|
|
print(f"\n {email_i:>5}/{total} {subject}")
|
|
|
|
need_listing = any(
|
|
not a.get("is_inline", False)
|
|
and not (not args.force_recheck and a.get("file_hash"))
|
|
and not a.get("graph_att_id")
|
|
for a in att_list
|
|
)
|
|
graph_atts = fetch_message_attachments(mailbox, graph_id) if need_listing else []
|
|
|
|
# mapa graph_att_id -> @odata.type (z listingu pokud byl)
|
|
type_map = {ga["id"]: ga.get("@odata.type", "") for ga in graph_atts}
|
|
|
|
updated_atts = list(att_list)
|
|
email_ok = True
|
|
|
|
for i, att in enumerate(updated_atts):
|
|
if att.get("is_inline", False):
|
|
continue
|
|
if att.get("attachment_missing") or att.get("attachment_reference"):
|
|
continue
|
|
if not args.force_recheck and att.get("file_hash"):
|
|
continue
|
|
|
|
att_name = att.get("filename", "")
|
|
att_size = att.get("size_bytes", 0)
|
|
graph_att_id = att.get("graph_att_id")
|
|
|
|
if Path(att_name).suffix.lower() in SKIP_EXTENSIONS:
|
|
updated_atts[i] = {**att, "file_hash": "skip", "local_path": ""}
|
|
skip_count += 1
|
|
print(f" SKIP {att_name} (S/MIME)")
|
|
continue
|
|
|
|
# Resolve graph_att_id + odata_type
|
|
resolved_id = graph_att_id
|
|
odata_type = type_map.get(graph_att_id, "") if graph_att_id else ""
|
|
|
|
if not resolved_id:
|
|
# Fallback: name matching (legacy)
|
|
graph_att = find_graph_att(att_name, att_size, graph_atts)
|
|
if not graph_att:
|
|
logging.error("attachment not found [email=%s att=%s]", email_id, att_name)
|
|
print(f" ERR {att_name} (nenalezeno)")
|
|
err_count += 1
|
|
email_ok = False
|
|
continue
|
|
if graph_att.get("isInline", False):
|
|
updated_atts[i] = {**att, "is_inline": True, "file_hash": "skip", "local_path": ""}
|
|
skip_count += 1
|
|
print(f" SKIP {att_name} (inline obrazek)")
|
|
continue
|
|
resolved_id = graph_att["id"]
|
|
odata_type = graph_att.get("@odata.type", "")
|
|
|
|
# Smart fetch
|
|
try:
|
|
content, kind, extra = fetch_attachment_smart(
|
|
mailbox, graph_id, resolved_id, odata_type
|
|
)
|
|
except Exception as e:
|
|
logging.error("fetch_attachment_smart failed [msg=%s att=%s type=%s]: %s",
|
|
graph_id, resolved_id, odata_type, e)
|
|
err_count += 1
|
|
email_ok = False
|
|
print(f" ERR {att_name} (stazeni selhalo)")
|
|
continue
|
|
|
|
now_utc = datetime.now(timezone.utc).replace(tzinfo=None)
|
|
|
|
if kind == FETCH_MISSING:
|
|
updated_atts[i] = {
|
|
**att,
|
|
"attachment_missing": True,
|
|
"attachment_missing_at": now_utc,
|
|
}
|
|
miss_count += 1
|
|
print(f" MISS {att_name} (404 — oznaceno jako missing)")
|
|
continue
|
|
|
|
if kind == FETCH_REFERENCE:
|
|
updated_atts[i] = {
|
|
**att,
|
|
"attachment_reference": True,
|
|
"attachment_type": "reference",
|
|
"reference_url": extra,
|
|
}
|
|
ref_count += 1
|
|
print(f" REF {att_name} -> {extra}")
|
|
continue
|
|
|
|
# kind in ('file', 'item') — mame bytes
|
|
mime_type = att.get("mime_type") or (
|
|
"message/rfc822" if kind == "item" else "application/octet-stream"
|
|
)
|
|
|
|
# Pro itemAttachment vyrobime .eml priponu pokud chybi
|
|
save_name = att_name
|
|
if kind == "item" and not save_name.lower().endswith(".eml"):
|
|
save_name = (save_name or "embedded_email") + ".eml"
|
|
|
|
hash_val, local_path, was_new = save_attachment(
|
|
content, save_name, mime_type, mailbox, att_dir, col_index
|
|
)
|
|
|
|
updated_atts[i] = {
|
|
**att,
|
|
"file_hash": hash_val,
|
|
"local_path": local_path,
|
|
"attachment_type": kind,
|
|
}
|
|
|
|
if was_new:
|
|
new_count += 1
|
|
tag = "NEW(eml)" if kind == "item" else "NEW"
|
|
print(f" {tag} {local_path} ({len(content):,} B)")
|
|
else:
|
|
dup_count += 1
|
|
print(f" DUP {att_name} -> {local_path}")
|
|
|
|
if email_ok:
|
|
ok_count += 1
|
|
|
|
batch.append(UpdateOne({"_id": email_id}, {"$set": {"attachments": updated_atts}}))
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
flush()
|
|
|
|
if email_i % 100 == 0:
|
|
elapsed = (datetime.now() - start).total_seconds()
|
|
print(f" {'─'*60}")
|
|
print(f" Průběh: emaily={email_i}/{total} nove={new_count} dup={dup_count} "
|
|
f"skip={skip_count} miss={miss_count} ref={ref_count} err={err_count}")
|
|
print(f" {'─'*60}")
|
|
|
|
flush()
|
|
|
|
elapsed = (datetime.now() - start).total_seconds()
|
|
print(f" -> mailbox total: emaily={ok_count} nove={new_count} dup={dup_count} "
|
|
f"skip={skip_count} miss={miss_count} ref={ref_count} err={err_count} ({elapsed:.1f} s)")
|
|
return {"mailbox": mailbox, "ok": ok_count, "new": new_count, "dup": dup_count,
|
|
"skip": skip_count, "miss": miss_count, "ref": ref_count, "err": err_count,
|
|
"elapsed": elapsed}
|
|
|
|
|
|
def discover_mailboxes(db) -> list[str]:
|
|
out = []
|
|
for name in sorted(db.list_collection_names()):
|
|
if name in NON_MAILBOX_COLLECTIONS:
|
|
continue
|
|
if name in SKIP_MAILBOXES:
|
|
print(f" [skip] {name} — v SKIP_MAILBOXES (neni Graph pristup)")
|
|
continue
|
|
out.append(name)
|
|
return out
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description=f"download_attachments v{SCRIPT_VERSION}")
|
|
ap.add_argument("--mailbox", default="",
|
|
help="Emailova schranka. Bez argumentu projede vsechny schranky.")
|
|
ap.add_argument("--limit", type=int, default=0,
|
|
help="Zpracovat max N emailu (0 = vse) — per schranka")
|
|
ap.add_argument("--force-recheck", action="store_true",
|
|
help="Znovu overi i emaily kde prilohy uz maji file_hash / missing / reference")
|
|
ap.add_argument("--no-indexes", action="store_true",
|
|
help="Nevytvorit indexy na attachments_index kolekci")
|
|
args = ap.parse_args()
|
|
|
|
start_all = datetime.now()
|
|
print(f"=== download_attachments v{SCRIPT_VERSION} ===")
|
|
print(f"Start: {start_all.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
|
print("\nPřipojuji se k Graph API...")
|
|
try:
|
|
get_token()
|
|
print(" Graph API OK")
|
|
except Exception as e:
|
|
print(f" CHYBA: {e}")
|
|
sys.exit(1)
|
|
|
|
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
|
|
try:
|
|
client.admin.command("ping")
|
|
print(" MongoDB OK")
|
|
except Exception as e:
|
|
print(f" CHYBA: MongoDB neni dostupna -- {e}")
|
|
sys.exit(1)
|
|
|
|
col_index = client[MONGO_DB][MONGO_COL_INDEX]
|
|
if not args.no_indexes:
|
|
col_index.create_index("filename")
|
|
col_index.create_index("mime_type")
|
|
col_index.create_index("mailbox")
|
|
|
|
db = client[MONGO_DB]
|
|
if args.mailbox:
|
|
if args.mailbox in SKIP_MAILBOXES:
|
|
print(f" CHYBA: {args.mailbox} je v SKIP_MAILBOXES — neni Graph pristup.")
|
|
sys.exit(2)
|
|
mailboxes = [args.mailbox]
|
|
else:
|
|
mailboxes = discover_mailboxes(db)
|
|
print(f" Schranky ke zpracovani: {len(mailboxes)}")
|
|
for m in mailboxes:
|
|
print(f" {m}")
|
|
|
|
results = []
|
|
for mb in mailboxes:
|
|
try:
|
|
results.append(process_mailbox(client, mb, args))
|
|
except Exception as e:
|
|
logging.error("process_mailbox %s: %s", mb, e)
|
|
print(f" FATAL pri zpracovani {mb}: {e}")
|
|
results.append({"mailbox": mb, "ok": 0, "new": 0, "dup": 0,
|
|
"skip": 0, "miss": 0, "ref": 0, "err": 1, "elapsed": 0.0})
|
|
|
|
elapsed_total = (datetime.now() - start_all).total_seconds()
|
|
files_total = col_index.count_documents({})
|
|
size_total = sum(d.get("size_bytes", 0) for d in col_index.find({}, {"size_bytes": 1}))
|
|
|
|
grand = {k: sum(r.get(k, 0) for r in results)
|
|
for k in ("ok", "new", "dup", "skip", "miss", "ref", "err")}
|
|
|
|
print(f"\n{'='*60}")
|
|
print("=== SHRNUTI ===")
|
|
for r in results:
|
|
print(f" {r['mailbox']:40} ok={r['ok']:>5} nove={r['new']:>4} "
|
|
f"dup={r['dup']:>4} skip={r['skip']:>3} miss={r.get('miss',0):>3} "
|
|
f"ref={r.get('ref',0):>3} err={r['err']:>3}")
|
|
print(f" {'TOTAL':40} ok={grand['ok']:>5} nove={grand['new']:>4} "
|
|
f"dup={grand['dup']:>4} skip={grand['skip']:>3} miss={grand['miss']:>3} "
|
|
f"ref={grand['ref']:>3} err={grand['err']:>3}")
|
|
print(f"Souboru v indexu: {files_total} ({size_total / 1024 / 1024:.1f} MB)")
|
|
print(f"Celkovy cas: {int(elapsed_total//3600)}h {int((elapsed_total%3600)//60)}m {int(elapsed_total%60)}s")
|
|
print(f"\nKonec: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
if grand['err']:
|
|
print(f"Chyby logovany do: {LOG_FILE}")
|
|
|
|
client.close()
|
|
return 1 if grand['err'] > 0 else 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main() or 0)
|