Files
janssen/Python-runner/0_run_pipeline_v1.0.py
T
2026-06-08 07:20:37 +02:00

256 lines
8.6 KiB
Python

"""
==============================================================================
Skript: 0_run_pipeline_v1.0.py
Verze: 1.0
Datum: 2026-06-04
Autor: vladimir.buzalka
Popis:
Wrapper kolem cele emailove pipeline. Spousti postupne:
1b. parse_emails_graph_delta -> delta sync z Graph API do Mongo
3. download_attachments -> stahne pripojeny soubory
4. unwrap_smime -> rozbali S/MIME wrapper zpravy
5. enrich_fulltext_emails -> doindexuje do PG fulltext
Vzdy projizdi VSECHNY schranky (mimo SKIP_MAILBOXES v jednotlivych skriptech).
Per-krok merici cas + exit code. Pokud krok selze, default pokracuje dal
(aby se downstream nezasekl) — viz --stop-on-error.
Vsechny vystupy a chyby kazdeho kroku jsou ulozeny do /scripts/pipeline_<step>.log
Spousteni:
python 0_run_pipeline_v1.0.py # vse, vsechny schranky
python 0_run_pipeline_v1.0.py --only 5 # jen krok 5 (enrich)
python 0_run_pipeline_v1.0.py --skip 4 # vse mimo smime unwrap
python 0_run_pipeline_v1.0.py --stop-on-error # zastavit pri prvni chybe
python 0_run_pipeline_v1.0.py --quiet # bez tee na konzoli, jen logy
Docker:
docker exec -it python-runner python /scripts/0_run_pipeline_v1.0.py
==============================================================================
"""
from __future__ import annotations
import argparse
import subprocess
import sys
import time
from datetime import datetime
from pathlib import Path
SCRIPTS_DIR = Path("/scripts")
LOGS_DIR = SCRIPTS_DIR # vse do /scripts/
# --- Auto-install dependencies ---
_REQ_FILE = SCRIPTS_DIR / "requirements.txt"
if _REQ_FILE.exists():
_ret = subprocess.run(
[sys.executable, "-m", "pip", "install", "-q", "-r", str(_REQ_FILE)],
capture_output=True, text=True,
)
if _ret.returncode != 0:
print(f"[WARN] pip install selhal:\n{_ret.stderr.strip()}")
# ---------------------------------
# Definice pipeline (step_id, label, executable filename)
STEPS = [
("1b", "Graph delta sync", "1b_parse_emails_graph_delta_v1.0.py"),
("3", "Download attachments", "3_download_attachments_v1.4.py"),
("4", "Unwrap S/MIME", "4_unwrap_smime_v1.0.py"),
("5", "Enrich fulltext (PG)", "5_enrich_fulltext_emails_v1.3.py"),
]
def fmt_dur(s: float) -> str:
if s < 60:
return f"{s:.1f}s"
m, s = divmod(int(s), 60)
if m < 60:
return f"{m}m{s:02d}s"
h, m = divmod(m, 60)
return f"{h}h{m:02d}m{s:02d}s"
def run_step(step_id: str, label: str, script: str, *,
quiet: bool = False) -> tuple[int, float]:
script_path = SCRIPTS_DIR / script
log_path = LOGS_DIR / f"pipeline_{step_id}.log"
if not script_path.exists():
print(f" CHYBA: {script_path} neexistuje!")
return 127, 0.0
print(f"\n{'='*70}")
print(f" KROK {step_id}: {label}")
print(f" script: {script_path}")
print(f" log: {log_path}")
print(f" start: {datetime.now().strftime('%H:%M:%S')}")
print(f"{'='*70}")
t0 = time.time()
# Tee: zaroven do konzole i do logu (pokud ne --quiet)
with open(log_path, "w", encoding="utf-8") as logf:
proc = subprocess.Popen(
[sys.executable, str(script_path)],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
encoding="utf-8",
errors="replace",
)
for line in proc.stdout:
logf.write(line)
if not quiet:
print(line, end="", flush=True)
ret = proc.wait()
dur = time.time() - t0
print(f"\n KROK {step_id} {'OK' if ret == 0 else f'FAILED ({ret})'} za {fmt_dur(dur)}")
return ret, dur
def main() -> int:
ap = argparse.ArgumentParser(description="Email pipeline wrapper v1.0")
ap.add_argument("--only", nargs="+", default=None,
help="Spustit jen tyto kroky (napr. --only 3 4 5)")
ap.add_argument("--skip", nargs="+", default=None,
help="Preskocit tyto kroky")
ap.add_argument("--stop-on-error", action="store_true",
help="Zastavit pipeline pri prvni nenulovem exit kodu")
ap.add_argument("--quiet", action="store_true",
help="Necpat stdout kroku na konzoli, jen do logu")
args = ap.parse_args()
# Filter step set
only_set = set(args.only) if args.only else None
skip_set = set(args.skip) if args.skip else set()
to_run = []
for sid, label, script in STEPS:
if only_set and sid not in only_set:
continue
if sid in skip_set:
continue
to_run.append((sid, label, script))
if not to_run:
print("Zadny krok k spusteni.")
return 2
print(f"=== Email Pipeline Wrapper v1.0 ===")
print(f"Start: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Kroku k spusteni: {len(to_run)}")
for sid, label, _ in to_run:
print(f" {sid}: {label}")
if args.stop_on_error:
print("Politika: stop-on-error")
else:
print("Politika: continue-on-error (default)")
t_all = time.time()
results = []
for sid, label, script in to_run:
ret, dur = run_step(sid, label, script, quiet=args.quiet)
results.append((sid, label, ret, dur))
if ret != 0 and args.stop_on_error:
print(f"\n!!! Pipeline zastavena na kroku {sid} (--stop-on-error)")
break
total_dur = time.time() - t_all
print(f"\n{'='*70}")
print("=== SHRNUTI PIPELINE ===")
print(f"{'='*70}")
failed = 0
for sid, label, ret, dur in results:
status = "OK" if ret == 0 else f"FAIL({ret})"
if ret != 0:
failed += 1
print(f" [{sid:>2}] {label:30} {status:>8} {fmt_dur(dur):>10}")
print(f"{'='*70}")
print(f" Celkem: {len(results)} kroku, {failed} chyb, {fmt_dur(total_dur)}")
print(f" Konec: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Per-krok logy: {LOGS_DIR}/pipeline_<id>.log")
_send_report(results, failed, total_dur)
return 1 if failed else 0
def _send_report(results: list, failed: int, total_dur: float) -> None:
try:
import importlib.util, sys as _sys
_lib = SCRIPTS_DIR / "EmailMessagingGraph.py"
spec = importlib.util.spec_from_file_location("EmailMessagingGraph", _lib)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
except Exception as e:
print(f"[report] Nelze nacist EmailMessagingGraph: {e}")
return
ok_icon = ""
err_icon = ""
overall = ok_icon if failed == 0 else err_icon
rows = ""
for sid, label, ret, dur in results:
icon = ok_icon if ret == 0 else err_icon
color = "#d4edda" if ret == 0 else "#f8d7da"
status = "OK" if ret == 0 else f"FAIL ({ret})"
rows += (
f"<tr style='background:{color}'>"
f"<td style='padding:4px 10px'>{icon} {label}</td>"
f"<td style='padding:4px 10px;text-align:center'>{status}</td>"
f"<td style='padding:4px 10px;text-align:right'>{fmt_dur(dur)}</td>"
f"</tr>"
)
body = f"""
<html><body style="font-family:sans-serif;font-size:14px">
<p>{overall} <b>Email pipeline</b> — {datetime.now().strftime('%Y-%m-%d %H:%M')}
&nbsp;|&nbsp; celkem {fmt_dur(total_dur)}
&nbsp;|&nbsp; {len(results)} kroků, {failed} chyb</p>
<table border="0" cellspacing="1" cellpadding="0" style="border-collapse:collapse">
<tr style="background:#343a40;color:white">
<th style="padding:4px 10px;text-align:left">Krok</th>
<th style="padding:4px 10px">Status</th>
<th style="padding:4px 10px;text-align:right">Čas</th>
</tr>
{rows}
</table>
</body></html>
"""
# Attach logs of failed steps
attachments = []
for sid, label, ret, dur in results:
if ret != 0:
log_path = LOGS_DIR / f"pipeline_{sid}.log"
if log_path.exists() and log_path.stat().st_size > 0:
attachments.append(log_path)
subject = f"{overall} Email pipeline — {datetime.now().strftime('%Y-%m-%d %H:%M')}"
try:
mod.send_mail(
"vladimir.buzalka@buzalka.cz",
subject,
body,
html=True,
attachments=attachments or None,
)
print(f"[report] Email odeslan na vladimir.buzalka@buzalka.cz")
except Exception as e:
print(f"[report] Chyba pri odesilani: {e}")
if __name__ == "__main__":
try:
raise SystemExit(main())
except KeyboardInterrupt:
print("\nPreruseno uzivatelem")
sys.exit(130)