""" janssenpc_email_send_new v1.5 Verze: 1.5.2 Datum: 2026-06-01 Popis: Prochází složky Inbox, Deleted Items a Sent Items v Outlooku (MAPI), ukládá emailové zprávy jako .msg soubory a uploaduje je na https://msgs.buzalka.cz. Zaznamenává zpracované zprávy do SQLite DB (jnjemails.db) a DB uploaduje na server jednou za 24 hodin (ne při každém běhu). Podporuje pokračování od posledního zpracovaného emailu (resume). Folder cesta obsahuje celé jméno schránky (např. /vbuzalka@its.jnj.com/Inbox). Chyby se logují do jnjemails_errors.log. v1.5: tracking entry_id, graph_id, is_read, jnj_folder. Sync průchod posledních 30 dní: detekce smazání, změny přečtení, přesunu složky. v1.5.2: SQLite logging — tabulky runs + log (flat event log per run). """ import win32com.client import requests import sqlite3 import urllib3 import logging from pathlib import Path from datetime import datetime, timedelta import tempfile urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) TOKEN = "13e1bb01-9fd5-44a8-8ce9-4ee27133d340" UPLOAD_URL = "https://msgs.buzalka.cz/upload" DB_PATH = r"C:\Users\vbuzalka\SQLITE\jnjemails.db" DB_UPLOAD_MARKER = r"C:\Users\vbuzalka\SQLITE\jnjemails_last_db_upload.txt" DB_UPLOAD_INTERVAL_H = 24 LOG_PATH = r"C:\Users\vbuzalka\SQLITE\jnjemails_errors.log" PR_INTERNET_MESSAGE_ID = "http://schemas.microsoft.com/mapi/proptag/0x1035001E" SYNC_DAYS = 30 SCRIPT_NAME = "email_send" SCRIPT_VERSION = "1.5.2" # olFolderInbox=6, olFolderDeletedItems=3, olFolderSentMail=5 FOLDERS_TO_PROCESS = [6, 3, 5] UPLOAD_LOG_PATH = r"C:\Users\vbuzalka\SQLITE\jnjemails_uploads.log" logging.basicConfig( filename=LOG_PATH, level=logging.ERROR, format="%(asctime)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", encoding="utf-8", ) _upload_log = logging.getLogger("uploads") _upload_log.setLevel(logging.DEBUG) _uh = logging.FileHandler(UPLOAD_LOG_PATH, encoding="utf-8") _uh.setFormatter(logging.Formatter("%(asctime)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S")) _upload_log.addHandler(_uh) def init_db(conn): conn.execute(""" CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, message_id TEXT NOT NULL, subject TEXT, sender TEXT, received_at TEXT, folder TEXT, source TEXT, uploaded_at TEXT DEFAULT (datetime('now')), entry_id TEXT, graph_id TEXT, is_read INTEGER DEFAULT 0, jnj_folder TEXT ) """) conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_message_id ON messages(message_id)") conn.execute(""" CREATE TABLE IF NOT EXISTS runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, script TEXT NOT NULL, version TEXT, started_at TEXT NOT NULL, finished_at TEXT, transferred INTEGER DEFAULT 0, skipped INTEGER DEFAULT 0, sync_updated INTEGER DEFAULT 0, sync_deleted INTEGER DEFAULT 0, errors INTEGER DEFAULT 0 ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS log ( id INTEGER PRIMARY KEY AUTOINCREMENT, run_id INTEGER REFERENCES runs(id), level TEXT NOT NULL, event TEXT NOT NULL, subject TEXT, folder TEXT, graph_id TEXT, detail TEXT, created_at TEXT DEFAULT (datetime('now')) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_log_run_id ON log(run_id)") # Migrate existing DB for col, definition in [ ("entry_id", "TEXT"), ("graph_id", "TEXT"), ("is_read", "INTEGER DEFAULT 0"), ("jnj_folder", "TEXT"), ]: try: conn.execute(f"ALTER TABLE messages ADD COLUMN {col} {definition}") except Exception: pass conn.commit() def start_run(conn): cur = conn.execute( "INSERT INTO runs (script, version, started_at) VALUES (?, ?, datetime('now'))", (SCRIPT_NAME, SCRIPT_VERSION) ) conn.commit() return cur.lastrowid def finish_run(conn, run_id, transferred, skipped, sync_updated=0, sync_deleted=0, errors=0): conn.execute(""" UPDATE runs SET finished_at=datetime('now'), transferred=?, skipped=?, sync_updated=?, sync_deleted=?, errors=? WHERE id=? """, (transferred, skipped, sync_updated, sync_deleted, errors, run_id)) conn.commit() def db_log(conn, run_id, level, event, subject=None, folder=None, graph_id=None, detail=None): conn.execute(""" INSERT INTO log (run_id, level, event, subject, folder, graph_id, detail) VALUES (?, ?, ?, ?, ?, ?, ?) """, (run_id, level, event, subject, folder, graph_id, detail)) conn.commit() def is_uploaded(conn, message_id): row = conn.execute( "SELECT 1 FROM messages WHERE message_id = ? LIMIT 1", (message_id,) ).fetchone() return row is not None def save_to_db(conn, message_id, subject, sender, received_at, folder, source, entry_id=None, graph_id=None, is_read=0): conn.execute(""" INSERT OR IGNORE INTO messages (message_id, subject, sender, received_at, folder, source, entry_id, graph_id, is_read, jnj_folder) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (message_id, subject, sender, received_at, folder, source, entry_id, graph_id, is_read, folder)) conn.commit() def _db_upload_due() -> bool: marker = Path(DB_UPLOAD_MARKER) if not marker.exists(): return True try: last = datetime.fromisoformat(marker.read_text().strip()) return (datetime.now() - last).total_seconds() >= DB_UPLOAD_INTERVAL_H * 3600 except Exception: return True def _db_upload_mark(): Path(DB_UPLOAD_MARKER).write_text(datetime.now().isoformat()) def upload_db(db_path, force=False): if not force and not _db_upload_due(): return timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"jnjemails_{timestamp}.db" with open(db_path, "rb") as f: resp = requests.post( "https://msgs.buzalka.cz/upload-db", headers={"Authorization": f"Bearer {TOKEN}"}, files={"file": (filename, f, "application/octet-stream")}, timeout=60 ) print(f" DB upload: {resp.json()}") _db_upload_mark() def upload_msg(msg_path, filename, folder=""): _upload_log.info("UPLOAD %s | folder=%s", filename, folder) with open(msg_path, "rb") as f: resp = requests.post( UPLOAD_URL, headers={"Authorization": f"Bearer {TOKEN}"}, files={"file": (filename, f, "application/octet-stream")}, data={"folder": folder}, timeout=60 ) resp.raise_for_status() result = resp.json() _upload_log.info("RESPONSE %s | %s", filename, result) return result def get_folder_resume_date(conn, folder_path): row = conn.execute( "SELECT MAX(received_at) FROM messages WHERE folder = ?", (folder_path,) ).fetchone() if not row or not row[0]: return None last_dt = datetime.fromisoformat(row[0]) return last_dt - timedelta(hours=1) def get_item_folder_path(item): """Reconstruct full folder path from an Outlook item, e.g. /vbuzalka@its.jnj.com/Inbox/Sub.""" parts = [] obj = item.Parent while True: try: parts.insert(0, obj.Name) obj = obj.Parent except Exception: break return "/" + "/".join(parts) def process_folder(conn, run_id, folder, source, folder_path="", counter=None, error_counter=None): if counter is None: counter = [0] if error_counter is None: error_counter = [0] current_path = f"{folder_path}/{folder.Name}" try: resume_dt = get_folder_resume_date(conn, current_path) items = folder.Items if resume_dt: resume_str = resume_dt.strftime("%Y/%m/%d %H:%M:%S") filter_str = f"@SQL=\"urn:schemas:httpmail:datereceived\" > '{resume_str}'" items = folder.Items.Restrict(filter_str) print(f"\n Složka: {current_path} | pokračuji od: {resume_str}") else: print(f"\n Složka: {current_path} | od začátku") items.Sort("[ReceivedTime]", False) count = 0 skipped = 0 for item in items: subject = getattr(item, 'Subject', '?') try: if not item.MessageClass.upper().startswith("IPM.NOTE"): continue try: mid = item.PropertyAccessor.GetProperty(PR_INTERNET_MESSAGE_ID) except Exception: mid = None if not mid: mid = f"entryid:{item.EntryID}" if is_uploaded(conn, mid): skipped += 1 continue graph_id = None try: with tempfile.TemporaryDirectory() as tmp: safe_name = f"{item.EntryID[-20:]}.msg" tmp_path = Path(tmp) / safe_name item.SaveAs(str(tmp_path), 3) result = upload_msg(tmp_path, safe_name, current_path) status = result.get("status", "?") graph_id = result.get("graph_id") is_read = 0 if item.UnRead else 1 received = item.ReceivedTime.isoformat() if item.ReceivedTime else None save_to_db(conn, mid, subject, item.SenderEmailAddress, received, current_path, source, entry_id=item.EntryID, graph_id=graph_id, is_read=is_read) db_log(conn, run_id, "INFO", f"upload_{status}", subject=subject, folder=current_path, graph_id=graph_id) counter[0] += 1 count += 1 if counter[0] % 1000 == 0: print(f" → celkem {counter[0]} emailů přeneseno, uploaduji DB...") upload_db(DB_PATH) print(f" {status.upper():6} | {subject[:60]}") except Exception as e: db_log(conn, run_id, "ERROR", "upload_error", subject=subject, folder=current_path, detail=str(e)) raise except Exception as e: sender = getattr(item, 'SenderEmailAddress', '?') received = getattr(item, 'ReceivedTime', '?') print(f" CHYBA | {subject[:40]} | {e}") logging.error("folder=%s | sender=%s | received=%s | subject=%s | error=%s", current_path, sender, received, subject, e) error_counter[0] += 1 print(f" → složka hotova: přeneseno {count} | skip {skipped}") except Exception as e: print(f" CHYBA složka {current_path}: {e}") logging.error("folder=%s | CHYBA SLOŽKY | error=%s", current_path, e) error_counter[0] += 1 for subfolder in folder.Folders: process_folder(conn, run_id, subfolder, source, current_path, counter, error_counter) def sync_recent(conn, run_id, ns): """Sync last SYNC_DAYS days: detect deleted emails, read-status changes, folder moves.""" cutoff = (datetime.now() - timedelta(days=SYNC_DAYS)).isoformat() rows = conn.execute( """SELECT message_id, entry_id, graph_id, is_read, jnj_folder FROM messages WHERE graph_id IS NOT NULL AND entry_id IS NOT NULL AND received_at > ?""", (cutoff,) ).fetchall() print(f"\n=== Sync průchod: {len(rows)} záznamů za posledních {SYNC_DAYS} dní ===") deleted = 0 updated = 0 errors = 0 for message_id, entry_id, graph_id, is_read, jnj_folder in rows: found = False current_read = None current_folder = None try: item = ns.GetItemFromID(entry_id) current_read = 0 if item.UnRead else 1 current_folder = get_item_folder_path(item) found = True except Exception: pass if found: read_changed = current_read != (is_read or 0) folder_changed = current_folder != (jnj_folder or "") if not read_changed and not folder_changed: continue payload = {"graph_id": graph_id} if read_changed: payload["is_read"] = bool(current_read) if folder_changed: payload["folder"] = current_folder try: resp = requests.post( "https://msgs.buzalka.cz/message-update", headers={"Authorization": f"Bearer {TOKEN}"}, json=payload, timeout=30, ) resp.raise_for_status() result = resp.json() new_graph_id = result.get("graph_id", graph_id) conn.execute( "UPDATE messages SET is_read=?, jnj_folder=?, graph_id=? WHERE message_id=?", (current_read, current_folder, new_graph_id, message_id) ) conn.commit() updated += 1 if read_changed: db_log(conn, run_id, "INFO", "sync_read_update", graph_id=new_graph_id, detail=f"{is_read} → {current_read}") if folder_changed: db_log(conn, run_id, "INFO", "sync_folder_move", graph_id=new_graph_id, folder=current_folder, detail=f"{jnj_folder} → {current_folder}") changes = [] if read_changed: changes.append(f"read={current_read}") if folder_changed: changes.append(f"folder={current_folder}") print(f" UPDATE | {', '.join(changes)}") except Exception as e: db_log(conn, run_id, "ERROR", "sync_update_failed", graph_id=graph_id, detail=str(e)) logging.error("sync update failed | graph_id=%s | error=%s", graph_id, e) errors += 1 else: try: resp = requests.post( "https://msgs.buzalka.cz/message-delete", headers={"Authorization": f"Bearer {TOKEN}"}, json={"graph_id": graph_id}, timeout=30, ) resp.raise_for_status() db_log(conn, run_id, "INFO", "sync_delete", graph_id=graph_id, folder=jnj_folder) conn.execute("DELETE FROM messages WHERE message_id=?", (message_id,)) conn.commit() deleted += 1 print(f" DELETE | graph_id={graph_id[:20]}...") except Exception as e: db_log(conn, run_id, "ERROR", "sync_delete_failed", graph_id=graph_id, detail=str(e)) logging.error("sync delete failed | graph_id=%s | error=%s", graph_id, e) errors += 1 print(f" → sync hotov: {updated} aktualizováno | {deleted} smazáno | {errors} chyb") return updated, deleted, errors # --- MAIN --- Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(DB_PATH) init_db(conn) run_id = start_run(conn) outlook = win32com.client.Dispatch("Outlook.Application") ns = outlook.GetNamespace("MAPI") counter = [0] error_counter = [0] skipped_total = [0] for folder_id in FOLDERS_TO_PROCESS: folder = ns.GetDefaultFolder(folder_id) mailbox_name = folder.Parent.Name print(f"\n=== {folder.Name} ({mailbox_name}) ===") process_folder(conn, run_id, folder, "mailbox", f"/{mailbox_name}", counter, error_counter) sync_updated, sync_deleted, sync_errors = sync_recent(conn, run_id, ns) error_counter[0] += sync_errors finish_run(conn, run_id, transferred=counter[0], skipped=skipped_total[0], sync_updated=sync_updated, sync_deleted=sync_deleted, errors=error_counter[0]) print("\nFinální upload DB...") upload_db(DB_PATH, force=True) conn.close() print(f"\nHotovo. Chyby logovány do: {LOG_PATH}")