450 lines
16 KiB
Python
450 lines
16 KiB
Python
"""
|
|
download_attachments_v1.0.py
|
|
Nazev: download_attachments_v1.0.py
|
|
Verze: 1.0
|
|
Datum: 2026-06-02
|
|
Autor: vladimir.buzalka
|
|
|
|
Popis:
|
|
Stahuje skutecne prilohy (is_inline=False) vsech emailu z MongoDB kolekce
|
|
ordinace@buzalkova.cz primo pres Microsoft Graph API a uklada je do
|
|
adresare /mnt/Emails/ordinace@buzalkova.cz/Attachments/.
|
|
|
|
Deduplikace podle SHA256 hashe obsahu:
|
|
- stejny hash = soubor uz existuje -> preskoci
|
|
- prvni vyskytu souboru: ulozi pod puvodnimnazvem
|
|
- kolize nazvu (stejny nazev, jiny hash): faktura_2.pdf, faktura_3.pdf ...
|
|
|
|
Po ulozeni aktualizuje MongoDB:
|
|
- v email dokumentu: kazda priloha dostane file_hash + local_path
|
|
- kolekce emaily.attachments_index: _id=hash, filename, path, size_bytes,
|
|
mime_type, first_seen_at, ref_count (pocet emailu ktery ji obsahuje)
|
|
|
|
Bezpecne prerusit a opakovat:
|
|
- zpravy kde jsou vsechny prilohy uz stazene (maji file_hash) se preskoci
|
|
- --force-recheck znovu overi i uz stazene (pro pripad zmen na disku)
|
|
|
|
POZOR: Skript pouze CIST ze schranky — zadny zapis do schranky!
|
|
|
|
Spousteni:
|
|
python download_attachments_v1.0.py # stahni vse co chybi
|
|
python download_attachments_v1.0.py --limit 50 # test na prvnich 50 emailech
|
|
python download_attachments_v1.0.py --force-recheck # overi i uz stazene
|
|
|
|
Docker (po pridani mountu /mnt/user/Emails -> /mnt/Emails):
|
|
docker exec -it python-runner python /scripts/download_attachments_v1.0.py
|
|
|
|
Zavislosti:
|
|
msal, requests, pymongo, python-dateutil
|
|
Python 3.10+
|
|
|
|
Struktura na disku:
|
|
/mnt/Emails/
|
|
└── ordinace@buzalkova.cz/
|
|
└── Attachments/
|
|
├── faktura_2026.pdf
|
|
├── vysledky_lab.pdf
|
|
├── vysledky_lab_2.pdf <- kolize nazvu, jiny obsah
|
|
└── ...
|
|
|
|
Kolekce emaily.attachments_index:
|
|
_id SHA256 hash (hex)
|
|
filename nazev souboru na disku (prvni vyskytu)
|
|
local_path relativni cesta od Attachments/ (zatim = filename)
|
|
size_bytes velikost souboru
|
|
mime_type MIME typ
|
|
first_seen_at datetime UTC
|
|
ref_count v kolika emailech se tato priloha vyskytuje
|
|
|
|
Aktualizace v email dokumentu (kolekce ordinace@buzalkova.cz):
|
|
attachments[i].file_hash SHA256 hash
|
|
attachments[i].local_path cesta relativni od Attachments/
|
|
|
|
Historie verzi:
|
|
1.0 2026-06-02 Inicialni verze
|
|
"""
|
|
|
|
import sys
|
|
import hashlib
|
|
import logging
|
|
import argparse
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
import msal
|
|
import requests
|
|
from pymongo import MongoClient, UpdateOne
|
|
|
|
if hasattr(sys.stdout, "reconfigure"):
|
|
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
|
|
|
|
# ─── KONFIGURACE ──────────────────────────────────────────────────────────────
|
|
GRAPH_TENANT_ID = "7d269944-37a4-43a1-8140-c7517dc426e9"
|
|
GRAPH_CLIENT_ID = "4b222bfd-78c9-4239-a53f-43006b3ed07f"
|
|
GRAPH_CLIENT_SECRET = "Txg8Q~MjhocuopxsJyJBhPmDfMxZ2r5WpTFj1dfk"
|
|
GRAPH_MAILBOX = "ordinace@buzalkova.cz"
|
|
GRAPH_URL = "https://graph.microsoft.com/v1.0"
|
|
|
|
MONGO_URI = "mongodb://192.168.1.76:27017"
|
|
MONGO_DB = "emaily"
|
|
MONGO_COL_EMAILS = "ordinace@buzalkova.cz"
|
|
MONGO_COL_INDEX = "attachments_index"
|
|
|
|
ATTACHMENTS_DIR = Path("/mnt/Emails/ordinace@buzalkova.cz/Attachments")
|
|
LOG_FILE = Path(__file__).parent / "parse_emails_errors.log"
|
|
SCRIPT_VERSION = "1.0"
|
|
BATCH_SIZE = 50
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
logging.basicConfig(
|
|
filename=str(LOG_FILE),
|
|
level=logging.ERROR,
|
|
format="%(asctime)s | %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
_graph_token: Optional[str] = None
|
|
|
|
|
|
# ─── Graph API ────────────────────────────────────────────────────────────────
|
|
|
|
def get_token() -> str:
|
|
global _graph_token
|
|
app = msal.ConfidentialClientApplication(
|
|
GRAPH_CLIENT_ID,
|
|
authority=f"https://login.microsoftonline.com/{GRAPH_TENANT_ID}",
|
|
client_credential=GRAPH_CLIENT_SECRET,
|
|
)
|
|
result = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])
|
|
if "access_token" not in result:
|
|
raise RuntimeError(f"Graph auth failed: {result}")
|
|
_graph_token = result["access_token"]
|
|
return _graph_token
|
|
|
|
|
|
def graph_get_bytes(url: str) -> bytes:
|
|
"""Stahne binarni obsah prilohy."""
|
|
global _graph_token
|
|
if not _graph_token:
|
|
get_token()
|
|
for attempt in range(2):
|
|
r = requests.get(url, headers={"Authorization": f"Bearer {_graph_token}"}, timeout=120, stream=True)
|
|
if r.status_code == 401:
|
|
get_token()
|
|
continue
|
|
r.raise_for_status()
|
|
return r.content
|
|
raise RuntimeError(f"Graph GET bytes failed: {url}")
|
|
|
|
|
|
def graph_get_json(url: str, params: dict = None) -> dict:
|
|
global _graph_token
|
|
if not _graph_token:
|
|
get_token()
|
|
for attempt in range(2):
|
|
r = requests.get(url, headers={"Authorization": f"Bearer {_graph_token}"}, params=params, timeout=30)
|
|
if r.status_code == 401:
|
|
get_token()
|
|
continue
|
|
r.raise_for_status()
|
|
return r.json()
|
|
raise RuntimeError(f"Graph GET json failed: {url}")
|
|
|
|
|
|
def fetch_attachment_content(graph_message_id: str, attachment_id: str) -> Optional[bytes]:
|
|
"""Stahne obsah prilohy pres Graph API."""
|
|
url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/messages/{graph_message_id}/attachments/{attachment_id}/$value"
|
|
try:
|
|
return graph_get_bytes(url)
|
|
except Exception as e:
|
|
logging.error("fetch_attachment_content failed [msg=%s att=%s]: %s", graph_message_id, attachment_id, e)
|
|
return None
|
|
|
|
|
|
def fetch_message_attachments(graph_message_id: str) -> list[dict]:
|
|
"""Nacte seznam priloh zpravy z Graph API (metadata vcetne attachment ID)."""
|
|
url = f"{GRAPH_URL}/users/{GRAPH_MAILBOX}/messages/{graph_message_id}/attachments"
|
|
try:
|
|
data = graph_get_json(url, {"$select": "id,name,contentType,size,isInline,contentId"})
|
|
return data.get("value", [])
|
|
except Exception as e:
|
|
logging.error("fetch_message_attachments failed [%s]: %s", graph_message_id, e)
|
|
return []
|
|
|
|
|
|
# ─── Dedup + ukládání ─────────────────────────────────────────────────────────
|
|
|
|
def sha256(data: bytes) -> str:
|
|
return hashlib.sha256(data).hexdigest()
|
|
|
|
|
|
def resolve_filename(desired_name: str, att_dir: Path, hash_val: str, index_col) -> str:
|
|
"""
|
|
Vrati nazev souboru ktery pouzit pro ulozeni.
|
|
Pokud desired_name jiz existuje s jinym hashem, prida suffix _2, _3 ...
|
|
"""
|
|
# Zkontroluj jestli existujici soubor se stejnym nazvem ma stejny hash
|
|
existing = index_col.find_one({"filename": desired_name})
|
|
if existing:
|
|
if existing["_id"] == hash_val:
|
|
return desired_name # Stejny hash, stejne jmeno — dedup hit
|
|
# Jiny hash — hledej volny suffix
|
|
stem = Path(desired_name).stem
|
|
suffix = Path(desired_name).suffix
|
|
n = 2
|
|
while True:
|
|
candidate = f"{stem}_{n}{suffix}"
|
|
if not (att_dir / candidate).exists():
|
|
# Overi ze ani v indexu neni tento kandidat s jinym hashem
|
|
ex2 = index_col.find_one({"filename": candidate})
|
|
if not ex2 or ex2["_id"] == hash_val:
|
|
return candidate
|
|
n += 1
|
|
return desired_name
|
|
|
|
|
|
def save_attachment(content: bytes, original_name: str, att_dir: Path, index_col) -> tuple[str, str, bool]:
|
|
"""
|
|
Ulozi prilohu s deduplikaci.
|
|
Vraci (hash, local_path, was_new):
|
|
was_new=True -> soubor byl ulozen
|
|
was_new=False -> hash uz existoval, soubor preskocen
|
|
"""
|
|
hash_val = sha256(content)
|
|
|
|
# Zkontroluj index — pokud hash uz existuje, vrat existujici zaznam
|
|
existing = index_col.find_one({"_id": hash_val})
|
|
if existing:
|
|
# Zvys pocitadlo referenci
|
|
index_col.update_one({"_id": hash_val}, {"$inc": {"ref_count": 1}})
|
|
return hash_val, existing["local_path"], False
|
|
|
|
# Novy soubor — urcit nazev
|
|
safe_name = "".join(c if c.isalnum() or c in "._- " else "_" for c in original_name).strip()
|
|
if not safe_name:
|
|
safe_name = f"attachment_{hash_val[:8]}"
|
|
|
|
filename = resolve_filename(safe_name, att_dir, hash_val, index_col)
|
|
file_path = att_dir / filename
|
|
|
|
# Uloz soubor
|
|
file_path.write_bytes(content)
|
|
|
|
# Zaznamenej do indexu
|
|
index_col.insert_one({
|
|
"_id": hash_val,
|
|
"filename": filename,
|
|
"local_path": filename,
|
|
"size_bytes": len(content),
|
|
"mime_type": "",
|
|
"first_seen_at": datetime.now(timezone.utc).replace(tzinfo=None),
|
|
"ref_count": 1,
|
|
})
|
|
|
|
return hash_val, filename, True
|
|
|
|
|
|
# ─── MAIN ─────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description=f"download_attachments v{SCRIPT_VERSION}")
|
|
ap.add_argument("--limit", type=int, default=0,
|
|
help="Zpracovat max N emailu (0 = vse)")
|
|
ap.add_argument("--force-recheck", action="store_true",
|
|
help="Znovu overi i emaily kde prilohy uz maji file_hash")
|
|
ap.add_argument("--no-indexes", action="store_true",
|
|
help="Nevytvorit indexy na konci")
|
|
args = ap.parse_args()
|
|
|
|
start = datetime.now()
|
|
print(f"=== download_attachments v{SCRIPT_VERSION} ===")
|
|
print(f"Start: {start.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print(f"Schránka: {GRAPH_MAILBOX}")
|
|
print(f"Cilovy adresar: {ATTACHMENTS_DIR}")
|
|
print(f"MongoDB: {MONGO_URI} -> {MONGO_DB}")
|
|
|
|
# Adresar
|
|
ATTACHMENTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
print(f" Adresar OK")
|
|
|
|
# Graph
|
|
print("\nPřipojuji se k Graph API...")
|
|
try:
|
|
get_token()
|
|
print(" Graph API OK")
|
|
except Exception as e:
|
|
print(f" CHYBA: {e}")
|
|
sys.exit(1)
|
|
|
|
# MongoDB
|
|
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
|
|
try:
|
|
client.admin.command("ping")
|
|
print(" MongoDB OK")
|
|
except Exception as e:
|
|
print(f" CHYBA: MongoDB neni dostupna -- {e}")
|
|
sys.exit(1)
|
|
|
|
col_emails = client[MONGO_DB][MONGO_COL_EMAILS]
|
|
col_index = client[MONGO_DB][MONGO_COL_INDEX]
|
|
|
|
# Indexy na attachment index kolekci
|
|
if not args.no_indexes:
|
|
col_index.create_index("filename")
|
|
col_index.create_index("mime_type")
|
|
|
|
# Dotaz — emaily s prilohou ktere jeste nebyly zpracovany
|
|
if args.force_recheck:
|
|
query = {"has_attachments": True}
|
|
else:
|
|
query = {
|
|
"has_attachments": True,
|
|
"attachments": {
|
|
"$elemMatch": {
|
|
"is_inline": False,
|
|
"file_hash": {"$exists": False},
|
|
}
|
|
}
|
|
}
|
|
|
|
total = col_emails.count_documents(query)
|
|
print(f"\nEmailu ke zpracovani: {total}")
|
|
if total == 0:
|
|
print("Neni co stahnout.")
|
|
client.close()
|
|
return
|
|
|
|
cursor = col_emails.find(query, {"_id": 1, "graph_id": 1, "subject": 1, "attachments": 1})
|
|
if args.limit:
|
|
cursor = cursor.limit(args.limit)
|
|
|
|
ok_count = 0
|
|
new_count = 0
|
|
skip_count = 0
|
|
err_count = 0
|
|
email_i = 0
|
|
batch = []
|
|
|
|
def flush():
|
|
if not batch:
|
|
return
|
|
try:
|
|
col_emails.bulk_write(batch, ordered=False)
|
|
except Exception as e:
|
|
logging.error("bulk_write: %s", e)
|
|
print(f" CHYBA bulk_write: {e}")
|
|
batch.clear()
|
|
|
|
for email_doc in cursor:
|
|
email_i += 1
|
|
email_id = email_doc["_id"]
|
|
graph_id = email_doc.get("graph_id", "")
|
|
subject = (email_doc.get("subject") or "")[:60]
|
|
att_list = email_doc.get("attachments") or []
|
|
|
|
# Jen skutecne prilohy
|
|
real_atts = [a for a in att_list if not a.get("is_inline", False)]
|
|
if not real_atts:
|
|
continue
|
|
|
|
print(f"\n {email_i:>5}/{total} {subject}")
|
|
|
|
# Nacti attachment IDs z Graph API
|
|
graph_atts = fetch_message_attachments(graph_id)
|
|
graph_att_map = {a["name"]: a for a in graph_atts if not a.get("isInline", False)}
|
|
|
|
updated_atts = list(att_list)
|
|
email_ok = True
|
|
|
|
for i, att in enumerate(updated_atts):
|
|
if att.get("is_inline", False):
|
|
continue
|
|
if not args.force_recheck and att.get("file_hash"):
|
|
skip_count += 1
|
|
print(f" SKIP {att['filename']}")
|
|
continue
|
|
|
|
att_name = att.get("filename", "")
|
|
graph_att = graph_att_map.get(att_name)
|
|
|
|
if not graph_att:
|
|
# Zkus najit podle casti nazvu
|
|
for gname, ga in graph_att_map.items():
|
|
if att_name.lower() in gname.lower():
|
|
graph_att = ga
|
|
break
|
|
|
|
if not graph_att:
|
|
logging.error("attachment not found in Graph [email=%s att=%s]", email_id, att_name)
|
|
print(f" ERR {att_name} (nenalezeno v Graph)")
|
|
err_count += 1
|
|
email_ok = False
|
|
continue
|
|
|
|
# Stahni obsah
|
|
content = fetch_attachment_content(graph_id, graph_att["id"])
|
|
if content is None:
|
|
err_count += 1
|
|
email_ok = False
|
|
print(f" ERR {att_name} (stazeni selhalo)")
|
|
continue
|
|
|
|
# Uloz s dedupem
|
|
hash_val, local_path, was_new = save_attachment(content, att_name, ATTACHMENTS_DIR, col_index)
|
|
|
|
# Aktualizuj MIME typ v indexu
|
|
col_index.update_one(
|
|
{"_id": hash_val},
|
|
{"$set": {"mime_type": att.get("mime_type", graph_att.get("contentType", ""))}},
|
|
)
|
|
|
|
# Zaznamenej do emailu
|
|
updated_atts[i] = {**att, "file_hash": hash_val, "local_path": local_path}
|
|
|
|
if was_new:
|
|
new_count += 1
|
|
print(f" NEW {local_path} ({len(content):,} B)")
|
|
else:
|
|
skip_count += 1
|
|
print(f" DUP {att_name} -> {local_path}")
|
|
|
|
if email_ok:
|
|
ok_count += 1
|
|
|
|
# Uloz aktualizovane prilohy zpet do emailu
|
|
batch.append(UpdateOne(
|
|
{"_id": email_id},
|
|
{"$set": {"attachments": updated_atts}}
|
|
))
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
flush()
|
|
|
|
if email_i % 100 == 0:
|
|
elapsed = (datetime.now() - start).total_seconds()
|
|
print(f" {'─'*60}")
|
|
print(f" Průběh: emaily={email_i}/{total} nove={new_count} dup={skip_count} err={err_count}")
|
|
print(f" {'─'*60}")
|
|
|
|
flush()
|
|
|
|
elapsed_total = (datetime.now() - start).total_seconds()
|
|
files_total = col_index.count_documents({})
|
|
size_total = sum(d.get("size_bytes", 0) for d in col_index.find({}, {"size_bytes": 1}))
|
|
|
|
print(f"\n{'='*52}")
|
|
print(f"Vysledek: emaily={ok_count} | nove soubory={new_count} | duplikaty={skip_count} | err={err_count}")
|
|
print(f"Souboru v indexu: {files_total} ({size_total/1024/1024:.1f} MB)")
|
|
print(f"Celkovy cas: {int(elapsed_total//3600)}h {int((elapsed_total%3600)//60)}m {int(elapsed_total%60)}s")
|
|
print(f"\nKonec: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
if err_count:
|
|
print(f"Chyby logovany do: {LOG_FILE}")
|
|
|
|
client.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|