""" ======================================================================= Název: import_emails_to_mongo_v1.0.py Verze: 1.0 Datum: 2026-06-04 Popis: Stáhne emaily z OWA složek, zparsuje EML a uloží do MongoDB OperativniEmailyJNJ.messages. ONLY_NEW = False → stáhne vše od DATE_FROM (28.05.2026) zastav se při emailu starším než DATE_FROM ONLY_NEW = True → stáhne jen nové (nezávislé na DATE_FROM) zastav se při prvním emailu už v DB (emaily jsou od nejnovějšího, takže vše starší už máme) Přílohy do MAX_ATTACHMENT_SIZE uloží jako BinData, větší označí downloaded=False. Deduplikace přes message_id + sha256. Používá persistent profil z outlook_login_v1.0.py. ======================================================================= """ import email as email_lib import email.utils import hashlib from datetime import datetime, timezone from email.header import decode_header from pathlib import Path from playwright.sync_api import sync_playwright from pymongo import MongoClient, ASCENDING # ── Konfigurace ──────────────────────────────────────────────────────── BASE_DIR = Path(__file__).resolve().parent PROFILE_DIR = BASE_DIR / "outlook_profile" START_URL = "https://outlook.cloud.microsoft/mail/" MONGO_URI = "mongodb://192.168.1.76:27017" DB_NAME = "OperativniEmailyJNJ" COL_NAME = "messages" # ── Hlavní přepínač ─────────────────────────────────────────────────── ONLY_NEW = True # False = od DATE_FROM; True = jen nové (zastav při 1. duplikátu) DATE_FROM = datetime(2026, 5, 28, tzinfo=timezone.utc) # platí jen pro ONLY_NEW=False MAX_PER_FOLDER = 2000 # pojistka — max emailů na složku MAX_ATTACHMENT_SIZE = 5 * 1024 * 1024 # 5 MB — větší přílohy se neuloží # (zobrazovaný název, způsob navigace, hodnota) FOLDERS = [ ("Inbox", "url", "https://outlook.cloud.microsoft/mail/"), ("TMP", "click", "TMP"), ("Sent Items", "url", "https://outlook.cloud.microsoft/mail/sentitems"), ("Deleted Items", "url", "https://outlook.cloud.microsoft/mail/deleteditems"), ("Archive", "url", "https://outlook.cloud.microsoft/mail/archive"), ] SEARCH_READY = ( '[placeholder*="Search"], [aria-label*="Search"], ' '[placeholder*="Hledat"], [aria-label*="Hledat"]' ) # ── EML parsování ────────────────────────────────────────────────────── def _decode_str(value): """Dekóduje encoded-word hlavičky (=?utf-8?...) na čistý string.""" if value is None: return None parts = decode_header(value) result = [] for part, charset in parts: if isinstance(part, bytes): result.append(part.decode(charset or "utf-8", errors="replace")) else: result.append(part) return " ".join(result).strip() def _parse_addresses(header_value): if not header_value: return [] pairs = email.utils.getaddresses([header_value]) return [{"name": name.strip(), "email": addr.strip()} for name, addr in pairs] def parse_eml(data: bytes, folder_name: str) -> dict: msg = email_lib.message_from_bytes(data) # Datum date_dt = None date_str = msg.get("date") if date_str: try: date_dt = email.utils.parsedate_to_datetime(date_str) except Exception: pass # Tělo + přílohy body_plain = None body_html = None attachments = [] for part in msg.walk(): ctype = part.get_content_type() disposition = part.get_content_disposition() or "" filename = _decode_str(part.get_filename()) is_attachment = ( "attachment" in disposition or ("inline" in disposition and filename) or (filename and ctype not in ("text/plain", "text/html")) ) if is_attachment: payload = part.get_payload(decode=True) or b"" size = len(payload) att = { "filename": filename or "unknown", "content_type": ctype, "size": size, "downloaded": size <= MAX_ATTACHMENT_SIZE, } if att["downloaded"] and payload: att["data"] = payload attachments.append(att) elif ctype == "text/plain" and body_plain is None and "attachment" not in disposition: raw = part.get_payload(decode=True) if raw: cs = part.get_content_charset() or "utf-8" body_plain = raw.decode(cs, errors="replace") elif ctype == "text/html" and body_html is None and "attachment" not in disposition: raw = part.get_payload(decode=True) if raw: cs = part.get_content_charset() or "utf-8" body_html = raw.decode(cs, errors="replace") from_parsed = _parse_addresses(msg.get("from", "")) return { "message_id": (msg.get("message-id") or "").strip(), "sha256": hashlib.sha256(data).hexdigest(), "folder": folder_name, "eml_size": len(data), "imported_at": datetime.now(timezone.utc), "subject": _decode_str(msg.get("subject")), "date": date_dt, "from": from_parsed[0] if from_parsed else {"name": "", "email": ""}, "to": _parse_addresses(msg.get("to", "")), "cc": _parse_addresses(msg.get("cc", "")), "bcc": _parse_addresses(msg.get("bcc", "")), "in_reply_to": (msg.get("in-reply-to") or "").strip() or None, "references": [r.strip() for r in (msg.get("references") or "").split() if r.strip()], "importance": (msg.get("importance") or msg.get("x-priority") or "normal").strip().lower(), "body_plain": body_plain, "body_html": body_html, "has_attachments": bool(attachments), "attachments": attachments, } # ── Playwright helpers ───────────────────────────────────────────────── def wait_ready(page): page.wait_for_load_state("domcontentloaded") page.wait_for_selector(SEARCH_READY, timeout=30_000) def navigate_to_folder(page, nav_type, value): if nav_type == "url": page.goto(value) wait_ready(page) else: loc = page.locator(f'div[role="treeitem"]:has-text("{value}")').last loc.wait_for(state="visible", timeout=10_000) loc.click() page.wait_for_timeout(1_500) def download_email_at_index(page, idx): """Stáhne email na pozici idx. Vrátí bytes nebo None.""" msgs = page.locator('div[role="option"]') # Zkus načíst dostatek položek scrollováním last_count = -1 while True: count = msgs.count() if count > idx: break if count == last_count: return None # konec složky last_count = count if count > 0: msgs.last.scroll_into_view_if_needed() page.wait_for_timeout(800) else: try: page.wait_for_selector('div[role="option"]', timeout=5_000) except Exception: return None item = msgs.nth(idx) item.scroll_into_view_if_needed() page.wait_for_timeout(400) item.click() # nejdřív vyber email page.wait_for_timeout(600) item.click(button="right") # pak kontextové menu page.wait_for_timeout(700) # Najdi Download v kontextovém menu download_parent = None for name in ("Download", "Stáhnout"): loc = page.get_by_role("menuitem", name=name).first if loc.count() and loc.is_visible(): download_parent = loc break if download_parent is None: items = page.get_by_role("menuitem").all() print(f" ! 'Download' nenalezen. Menu: {[i.inner_text() for i in items[:8]]}") page.keyboard.press("Escape") return None download_parent.hover() page.wait_for_timeout(600) eml_item = None for name in ("Download as EML", "Stáhnout jako EML", "Stáhnout jako .eml"): loc = page.get_by_role("menuitem", name=name).first if loc.count() and loc.is_visible(): eml_item = loc break try: target = eml_item if eml_item else download_parent with page.expect_download(timeout=20_000) as dl_info: target.click() dl = dl_info.value path = dl.path() if path: return Path(path).read_bytes() return None except Exception as e: print(f" ! Stažení selhalo: {e}") page.keyboard.press("Escape") return None # ── MongoDB helpers ──────────────────────────────────────────────────── def ensure_indexes(col): col.create_index([("message_id", ASCENDING)], unique=True, sparse=True, name="message_id_unique") col.create_index([("sha256", ASCENDING)], unique=True, name="sha256_unique") col.create_index([("folder", ASCENDING)], name="folder") col.create_index([("date", ASCENDING)], name="date") col.create_index([("from.email", ASCENDING)], name="from_email") col.create_index([("subject", "text"), ("body_plain", "text")], name="fulltext") def save_doc(col, doc) -> str: """Uloží dokument. Vrátí 'saved', 'duplicate_mid', 'duplicate_sha', nebo 'error:...'""" # Deduplikace přes message_id if doc["message_id"] and col.find_one({"message_id": doc["message_id"]}): return "duplicate_mid" # Deduplikace přes sha256 if col.find_one({"sha256": doc["sha256"]}): return "duplicate_sha" try: col.insert_one(doc) return "saved" except Exception as e: return f"error: {e}" # ── Hlavní smyčka ────────────────────────────────────────────────────── def main(): if not PROFILE_DIR.exists(): print(f"Profil nenalezen: {PROFILE_DIR}") print("Nejprve spusť outlook_login_v1.0.py.") return client = MongoClient(MONGO_URI) col = client[DB_NAME][COL_NAME] ensure_indexes(col) mode_label = "ONLY_NEW (zastav pri duplikatu)" if ONLY_NEW else f"od {DATE_FROM.date()} (zastav pri starsim emailu)" print(f"MongoDB: {MONGO_URI} -> {DB_NAME}.{COL_NAME}") print(f"Rezim: {mode_label} | Max attachment: {MAX_ATTACHMENT_SIZE // 1024 // 1024} MB\n") with sync_playwright() as p: context = p.chromium.launch_persistent_context( user_data_dir=str(PROFILE_DIR), headless=False, no_viewport=True, accept_downloads=True, args=[ "--disable-blink-features=AutomationControlled", "--start-maximized", ], ) page = context.pages[0] if context.pages else context.new_page() print("Otevírám Outlook...") page.goto(START_URL) wait_ready(page) results = [] for folder_name, nav_type, value in FOLDERS: print(f"\n[{folder_name}]") folder_stats = {"saved": 0, "duplicate": 0, "skip": 0, "error": 0} try: navigate_to_folder(page, nav_type, value) except Exception as e: print(f" ! Navigace selhala: {e}") results.append((folder_name, f"nav error: {e}")) continue mode_info = "ONLY_NEW" if ONLY_NEW else f"od {DATE_FROM.date()}" print(f" rezim: {mode_info}") for idx in range(MAX_PER_FOLDER): print(f" email #{idx + 1} ... ", end="", flush=True) try: data = download_email_at_index(page, idx) if data is None: print("konec slozky") break doc = parse_eml(data, folder_name) email_date = doc.get("date") # ── ONLY_NEW = False: zastav při emailu starším než DATE_FROM ── if not ONLY_NEW: if email_date: # normalizuj na aware datetime if email_date.tzinfo is None: email_date = email_date.replace(tzinfo=timezone.utc) if email_date < DATE_FROM: date_str = email_date.strftime("%Y-%m-%d") print(f"prilis stary ({date_str}) -> stop") break status = save_doc(col, doc) att_info = "" if doc["has_attachments"]: total = len(doc["attachments"]) saved_att = sum(1 for a in doc["attachments"] if a["downloaded"]) att_info = f" [{saved_att}/{total} priloh]" date_str = email_date.strftime("%Y-%m-%d") if email_date else "?" print(f"{status} {date_str} {doc['eml_size']:,} B {(doc['subject'] or '')[:45]}{att_info}") if status == "saved": folder_stats["saved"] += 1 elif status.startswith("duplicate"): folder_stats["duplicate"] += 1 # ── ONLY_NEW = True: zastav při prvním duplikátu ── if ONLY_NEW: print(f" -> prvni duplikat na #{idx + 1}, stop") break else: folder_stats["error"] += 1 except Exception as e: print(f"chyba: {e}") folder_stats["error"] += 1 page.keyboard.press("Escape") results.append((folder_name, folder_stats)) context.close() print("\n=== Výsledky ===") total_saved = 0 for name, stats in results: if isinstance(stats, dict): print(f" {name:<25} saved={stats['saved']} dup={stats['duplicate']} skip={stats['skip']} err={stats['error']}") total_saved += stats["saved"] else: print(f" {name:<25} {stats}") total_db = col.count_documents({}) print(f"\nNově uloženo: {total_saved} | Celkem v DB: {total_db}") client.close() if __name__ == "__main__": main()