import subprocess import os from pathlib import Path from datetime import datetime import zipfile import time import traceback from EmailMessagingGraph import send_mail # ============================================================ # CONFIG # ============================================================ GBAK = r"C:\Program Files\Firebird\Firebird_2_5_CGM\bin\gbak.exe" FB_USER = "SYSDBA" FB_PASS = "masterkey" FB_PORT = "3050" MAIN_DB = r"localhost/3050:C:\medicus 3\data\MEDICUS.FDB" EXT_DIR = Path(r"c:\medicusext") BACKUP_DIR = Path(r"U:\medicusbackup") MAIL_TO = "vladimir.buzalka@buzalka.cz" CHUNK = 8 * 1024 * 1024 # 8 MB # ============================================================ # HELPERS # ============================================================ def run_gbak(label: str, db_conn: str, fbk: Path, log: Path) -> dict: """Run gbak, return result dict (without zip info).""" result = { "label": label, "ok": False, "fbk": fbk, "fbk_size": 0, "zip_size": 0, "t_gbak": 0, "t_zip": 0, "error": None, } print(f"GBAK: {label} ... ", end="", flush=True) t0 = time.time() cmd = [GBAK, "-b", "-user", FB_USER, "-pas", FB_PASS, db_conn, str(fbk), "-v"] with open(log, "w", encoding="utf-8") as f: subprocess.run(cmd, stdout=f, stderr=subprocess.STDOUT, check=True) result["t_gbak"] = time.time() - t0 result["fbk_size"] = fbk.stat().st_size print(f"OK ({result['t_gbak']:.0f}s, {result['fbk_size']/1024/1024:.1f} MB)") result["ok"] = True return result def zip_single(label: str, fbk: Path, zipf: Path) -> tuple[int, float]: """ZIP one FBK into its own ZIP. Returns (zip_size, t_zip).""" t1 = time.time() processed = 0 fbk_size = fbk.stat().st_size with zipfile.ZipFile(zipf, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zf: zi = zipfile.ZipInfo(fbk.name) zi.compress_type = zipfile.ZIP_DEFLATED with zf.open(zi, "w", force_zip64=True) as z: with open(fbk, "rb") as src: while buf := src.read(CHUNK): z.write(buf) processed += len(buf) pct = processed * 100 / fbk_size print(f"\r ZIP {label}: {pct:6.2f}%", end="", flush=True) print() return zipf.stat().st_size, time.time() - t1 def zip_multiple(fbk_results: list[dict], zipf: Path) -> tuple[int, float]: """ZIP multiple FBK files into one ZIP. Returns (zip_size, t_zip).""" t1 = time.time() total_fbk_size = sum(r["fbk_size"] for r in fbk_results) total_processed = 0 with zipfile.ZipFile(zipf, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zf: for r in fbk_results: fbk = r["fbk"] zi = zipfile.ZipInfo(fbk.name) zi.compress_type = zipfile.ZIP_DEFLATED with zf.open(zi, "w", force_zip64=True) as z: with open(fbk, "rb") as src: while buf := src.read(CHUNK): z.write(buf) total_processed += len(buf) pct = total_processed * 100 / total_fbk_size print(f"\r ZIP {fbk.name}: {pct:6.2f}%", end="", flush=True) print() return zipf.stat().st_size, time.time() - t1 def format_result(r: dict) -> str: ratio = 100 * (1 - r["zip_size"] / r["fbk_size"]) if r["fbk_size"] else 0 return ( f" {r['label']}: " f"FBK {r['fbk_size']/1024/1024:.1f} MB → " f"ZIP {r['zip_size']/1024/1024:.1f} MB " f"({ratio:.0f}% komprese, " f"gbak {r['t_gbak']:.0f}s, zip {r['t_zip']:.0f}s)" ) # ============================================================ # MAIN # ============================================================ def main(): BACKUP_DIR.mkdir(parents=True, exist_ok=True) now = datetime.now() ts = now.strftime("%Y-%m-%d_%H-%M-%S") backed_up = [] errors = [] # ---------------------------------------------------------- # 1) Hlavní DB – MEDICUS.FDB → vlastní ZIP # ---------------------------------------------------------- fbk = BACKUP_DIR / f"MEDICUS_{ts}.fbk" zipf = BACKUP_DIR / f"MEDICUS_{ts}.zip" log = BACKUP_DIR / f"MEDICUS_{ts}.log" try: r = run_gbak("MEDICUS", MAIN_DB, fbk, log) log.unlink() zip_size, t_zip = zip_single("MEDICUS", fbk, zipf) fbk.unlink() r["zip_size"] = zip_size r["t_zip"] = t_zip backed_up.append(r) except Exception: errors.append({"label": "MEDICUS", "fbk_size": 0, "zip_size": 0, "t_gbak": 0, "t_zip": 0, "error": traceback.format_exc()}) for f in (fbk, log): if f.exists(): f.unlink() # ---------------------------------------------------------- # 2) Externí DB – MEDICUS_FILES_*.fdb → všechny do jednoho ZIP # ---------------------------------------------------------- fdb_all = sorted( set(EXT_DIR.glob("MEDICUS_FILES_*.fdb")) | set(EXT_DIR.glob("MEDICUS_FILES_*.FDB")), key=lambda p: p.name.lower(), ) ext_results = [] for fdb in fdb_all: name = fdb.stem fbk = BACKUP_DIR / f"{name}_{ts}.fbk" log = BACKUP_DIR / f"{name}_{ts}.log" db_conn = f"localhost/{FB_PORT}:{fdb}" try: r = run_gbak(name, db_conn, fbk, log) log.unlink() ext_results.append(r) except Exception: errors.append({"label": name, "fbk_size": 0, "zip_size": 0, "t_gbak": 0, "t_zip": 0, "error": traceback.format_exc()}) for f in (fbk, log): if f.exists(): f.unlink() # ZIP všechny externí FBK do jednoho souboru if ext_results: ext_zip = BACKUP_DIR / f"MEDICUS_FILES_{ts}.zip" print(f"\nZIP externích DB → {ext_zip.name}") try: zip_size, t_zip = zip_multiple(ext_results, ext_zip) for r in ext_results: r["zip_size"] = zip_size # sdílená velikost výsledného ZIPu r["t_zip"] = t_zip r["fbk"].unlink() backed_up.append(r) except Exception: errors.append({"label": "MEDICUS_FILES (zip)", "fbk_size": 0, "zip_size": 0, "t_gbak": 0, "t_zip": 0, "error": traceback.format_exc()}) for r in ext_results: if r["fbk"].exists(): r["fbk"].unlink() # ---------------------------------------------------------- # Report # ---------------------------------------------------------- total = 1 + len(fdb_all) report = [ f"Backup Medicus – {now.strftime('%d.%m.%Y %H:%M')}", f"Celkem DB: {total} | OK: {len(backed_up)} | Chyby: {len(errors)}", f"Výstupní adresář: {BACKUP_DIR}", "", ] if backed_up: report.append("--- Zálohováno ---") # Hlavní DB main_results = [r for r in backed_up if r["label"] == "MEDICUS"] ext_backed = [r for r in backed_up if r["label"] != "MEDICUS"] for r in main_results: report.append(format_result(r)) if ext_backed: total_ext_fbk = sum(r["fbk_size"] for r in ext_backed) ext_zip_size = ext_backed[0]["zip_size"] if ext_backed else 0 ratio = 100 * (1 - ext_zip_size / total_ext_fbk) if total_ext_fbk else 0 report.append(f" Externí DB ({len(ext_backed)} souborů):") for r in ext_backed: report.append(f" {r['label']}: FBK {r['fbk_size']/1024/1024:.1f} MB (gbak {r['t_gbak']:.0f}s)") report.append( f" → společný ZIP: {ext_zip_size/1024/1024:.1f} MB " f"({ratio:.0f}% komprese, zip {ext_backed[0]['t_zip']:.0f}s)" ) total_zip = sum(r["zip_size"] for r in main_results) + (ext_backed[0]["zip_size"] if ext_backed else 0) report.append(f" Celková velikost ZIP: {total_zip/1024/1024:.1f} MB") report.append("") if errors: report.append("--- CHYBY ---") for e in errors: report.append(f" {e['label']}:\n{e['error']}") report.append("") has_errors = bool(errors) subject = ( f"{'X' if has_errors else 'OK'} MEDICUS backup " f"{len(backed_up)}/{total}" + (f" – {len(errors)} chyb" if has_errors else "") ) send_mail(MAIL_TO, subject, "\n".join(report)) print("\n" + "\n".join(report)) if errors: raise RuntimeError(f"{len(errors)} backup(s) failed") if __name__ == "__main__": main()