""" ============================================================================== Skript: 0_run_pipeline_v1.0.py Verze: 1.0 Datum: 2026-06-04 Autor: vladimir.buzalka Popis: Wrapper kolem cele emailove pipeline. Spousti postupne: 1b. parse_emails_graph_delta -> delta sync z Graph API do Mongo 3. download_attachments -> stahne pripojeny soubory 4. unwrap_smime -> rozbali S/MIME wrapper zpravy 5. enrich_fulltext_emails -> doindexuje do PG fulltext Vzdy projizdi VSECHNY schranky (mimo SKIP_MAILBOXES v jednotlivych skriptech). Per-krok merici cas + exit code. Pokud krok selze, default pokracuje dal (aby se downstream nezasekl) — viz --stop-on-error. Vsechny vystupy a chyby kazdeho kroku jsou ulozeny do /scripts/pipeline_.log Spousteni: python 0_run_pipeline_v1.0.py # vse, vsechny schranky python 0_run_pipeline_v1.0.py --only 5 # jen krok 5 (enrich) python 0_run_pipeline_v1.0.py --skip 4 # vse mimo smime unwrap python 0_run_pipeline_v1.0.py --stop-on-error # zastavit pri prvni chybe python 0_run_pipeline_v1.0.py --quiet # bez tee na konzoli, jen logy Docker: docker exec -it python-runner python /scripts/0_run_pipeline_v1.0.py ============================================================================== """ from __future__ import annotations import argparse import subprocess import sys import time from datetime import datetime from pathlib import Path SCRIPTS_DIR = Path("/scripts") LOGS_DIR = SCRIPTS_DIR # vse do /scripts/ # --- Auto-install dependencies --- _REQ_FILE = SCRIPTS_DIR / "requirements.txt" if _REQ_FILE.exists(): _ret = subprocess.run( [sys.executable, "-m", "pip", "install", "-q", "-r", str(_REQ_FILE)], capture_output=True, text=True, ) if _ret.returncode != 0: print(f"[WARN] pip install selhal:\n{_ret.stderr.strip()}") # --------------------------------- # Definice pipeline (step_id, label, executable filename) STEPS = [ ("1b", "Graph delta sync", "1b_parse_emails_graph_delta_v1.1.py"), ("3", "Download attachments", "3_download_attachments_v1.5.py"), ("4", "Unwrap S/MIME", "4_unwrap_smime_v1.0.py"), ("5", "Enrich fulltext (PG)", "5_enrich_fulltext_emails_v1.4.py"), ] def fmt_dur(s: float) -> str: if s < 60: return f"{s:.1f}s" m, s = divmod(int(s), 60) if m < 60: return f"{m}m{s:02d}s" h, m = divmod(m, 60) return f"{h}h{m:02d}m{s:02d}s" def run_step(step_id: str, label: str, script: str, *, quiet: bool = False) -> tuple[int, float]: script_path = SCRIPTS_DIR / script log_path = LOGS_DIR / f"pipeline_{step_id}.log" if not script_path.exists(): print(f" CHYBA: {script_path} neexistuje!") return 127, 0.0 print(f"\n{'='*70}") print(f" KROK {step_id}: {label}") print(f" script: {script_path}") print(f" log: {log_path}") print(f" start: {datetime.now().strftime('%H:%M:%S')}") print(f"{'='*70}") t0 = time.time() # Tee: zaroven do konzole i do logu (pokud ne --quiet) with open(log_path, "w", encoding="utf-8") as logf: proc = subprocess.Popen( [sys.executable, str(script_path)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, encoding="utf-8", errors="replace", ) for line in proc.stdout: logf.write(line) if not quiet: print(line, end="", flush=True) ret = proc.wait() dur = time.time() - t0 print(f"\n KROK {step_id} {'OK' if ret == 0 else f'FAILED ({ret})'} za {fmt_dur(dur)}") return ret, dur def main() -> int: ap = argparse.ArgumentParser(description="Email pipeline wrapper v1.0") ap.add_argument("--only", nargs="+", default=None, help="Spustit jen tyto kroky (napr. --only 3 4 5)") ap.add_argument("--skip", nargs="+", default=None, help="Preskocit tyto kroky") ap.add_argument("--stop-on-error", action="store_true", help="Zastavit pipeline pri prvni nenulovem exit kodu") ap.add_argument("--quiet", action="store_true", help="Necpat stdout kroku na konzoli, jen do logu") args = ap.parse_args() # Filter step set only_set = set(args.only) if args.only else None skip_set = set(args.skip) if args.skip else set() to_run = [] for sid, label, script in STEPS: if only_set and sid not in only_set: continue if sid in skip_set: continue to_run.append((sid, label, script)) if not to_run: print("Zadny krok k spusteni.") return 2 print(f"=== Email Pipeline Wrapper v1.0 ===") print(f"Start: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"Kroku k spusteni: {len(to_run)}") for sid, label, _ in to_run: print(f" {sid}: {label}") if args.stop_on_error: print("Politika: stop-on-error") else: print("Politika: continue-on-error (default)") t_all = time.time() results = [] for sid, label, script in to_run: ret, dur = run_step(sid, label, script, quiet=args.quiet) results.append((sid, label, ret, dur)) if ret != 0 and args.stop_on_error: print(f"\n!!! Pipeline zastavena na kroku {sid} (--stop-on-error)") break total_dur = time.time() - t_all print(f"\n{'='*70}") print("=== SHRNUTI PIPELINE ===") print(f"{'='*70}") failed = 0 for sid, label, ret, dur in results: status = "OK" if ret == 0 else f"FAIL({ret})" if ret != 0: failed += 1 print(f" [{sid:>2}] {label:30} {status:>8} {fmt_dur(dur):>10}") print(f"{'='*70}") print(f" Celkem: {len(results)} kroku, {failed} chyb, {fmt_dur(total_dur)}") print(f" Konec: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f" Per-krok logy: {LOGS_DIR}/pipeline_.log") _send_report(results, failed, total_dur) return 1 if failed else 0 def _send_report(results: list, failed: int, total_dur: float) -> None: try: import importlib.util, sys as _sys _lib = SCRIPTS_DIR / "EmailMessagingGraph.py" spec = importlib.util.spec_from_file_location("EmailMessagingGraph", _lib) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) except Exception as e: print(f"[report] Nelze nacist EmailMessagingGraph: {e}") return ok_icon = "✅" err_icon = "❌" overall = ok_icon if failed == 0 else err_icon rows = "" for sid, label, ret, dur in results: icon = ok_icon if ret == 0 else err_icon color = "#d4edda" if ret == 0 else "#f8d7da" status = "OK" if ret == 0 else f"FAIL ({ret})" rows += ( f"" f"{icon} {label}" f"{status}" f"{fmt_dur(dur)}" f"" ) body = f"""

{overall} Email pipeline — {datetime.now().strftime('%Y-%m-%d %H:%M')}  |  celkem {fmt_dur(total_dur)}  |  {len(results)} kroků, {failed} chyb

{rows}
Krok Status Čas
""" # Attach logs of failed steps attachments = [] for sid, label, ret, dur in results: if ret != 0: log_path = LOGS_DIR / f"pipeline_{sid}.log" if log_path.exists() and log_path.stat().st_size > 0: attachments.append(log_path) subject = f"{overall} Email pipeline — {datetime.now().strftime('%Y-%m-%d %H:%M')}" try: mod.send_mail( "vladimir.buzalka@buzalka.cz", subject, body, html=True, attachments=attachments or None, ) print(f"[report] Email odeslan na vladimir.buzalka@buzalka.cz") except Exception as e: print(f"[report] Chyba pri odesilani: {e}") if __name__ == "__main__": try: raise SystemExit(main()) except KeyboardInterrupt: print("\nPreruseno uzivatelem") sys.exit(130)