e04bf6172a
01 = původní (každá DB vlastní ZIP) 02 = nový (hlavní DB vlastní ZIP, všechny externí DB → jeden ZIP) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
236 lines
8.5 KiB
Python
236 lines
8.5 KiB
Python
import subprocess
|
||
import os
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
import zipfile
|
||
import time
|
||
import traceback
|
||
|
||
from EmailMessagingGraph import send_mail
|
||
|
||
# ============================================================
|
||
# CONFIG
|
||
# ============================================================
|
||
|
||
GBAK = r"C:\Program Files\Firebird\Firebird_2_5_CGM\bin\gbak.exe"
|
||
FB_USER = "SYSDBA"
|
||
FB_PASS = "masterkey"
|
||
FB_PORT = "3050"
|
||
|
||
MAIN_DB = r"localhost/3050:C:\medicus 3\data\MEDICUS.FDB"
|
||
EXT_DIR = Path(r"U:\externi")
|
||
BACKUP_DIR = Path(r"U:\medicusbackup")
|
||
|
||
MAIL_TO = "vladimir.buzalka@buzalka.cz"
|
||
|
||
CHUNK = 8 * 1024 * 1024 # 8 MB
|
||
|
||
|
||
# ============================================================
|
||
# HELPERS
|
||
# ============================================================
|
||
|
||
def run_gbak(label: str, db_conn: str, fbk: Path, log: Path) -> dict:
|
||
"""Run gbak, return result dict (without zip info)."""
|
||
result = {
|
||
"label": label,
|
||
"ok": False,
|
||
"fbk": fbk,
|
||
"fbk_size": 0,
|
||
"zip_size": 0,
|
||
"t_gbak": 0,
|
||
"t_zip": 0,
|
||
"error": None,
|
||
}
|
||
print(f"GBAK: {label} ... ", end="", flush=True)
|
||
t0 = time.time()
|
||
cmd = [GBAK, "-b", "-user", FB_USER, "-pas", FB_PASS, db_conn, str(fbk), "-v"]
|
||
with open(log, "w", encoding="utf-8") as f:
|
||
subprocess.run(cmd, stdout=f, stderr=subprocess.STDOUT, check=True)
|
||
result["t_gbak"] = time.time() - t0
|
||
result["fbk_size"] = fbk.stat().st_size
|
||
print(f"OK ({result['t_gbak']:.0f}s, {result['fbk_size']/1024/1024:.1f} MB)")
|
||
result["ok"] = True
|
||
return result
|
||
|
||
|
||
def zip_single(label: str, fbk: Path, zipf: Path) -> tuple[int, float]:
|
||
"""ZIP one FBK into its own ZIP. Returns (zip_size, t_zip)."""
|
||
t1 = time.time()
|
||
processed = 0
|
||
fbk_size = fbk.stat().st_size
|
||
with zipfile.ZipFile(zipf, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zf:
|
||
zi = zipfile.ZipInfo(fbk.name)
|
||
zi.compress_type = zipfile.ZIP_DEFLATED
|
||
with zf.open(zi, "w", force_zip64=True) as z:
|
||
with open(fbk, "rb") as src:
|
||
while buf := src.read(CHUNK):
|
||
z.write(buf)
|
||
processed += len(buf)
|
||
pct = processed * 100 / fbk_size
|
||
print(f"\r ZIP {label}: {pct:6.2f}%", end="", flush=True)
|
||
print()
|
||
return zipf.stat().st_size, time.time() - t1
|
||
|
||
|
||
def zip_multiple(fbk_results: list[dict], zipf: Path) -> tuple[int, float]:
|
||
"""ZIP multiple FBK files into one ZIP. Returns (zip_size, t_zip)."""
|
||
t1 = time.time()
|
||
total_fbk_size = sum(r["fbk_size"] for r in fbk_results)
|
||
total_processed = 0
|
||
with zipfile.ZipFile(zipf, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zf:
|
||
for r in fbk_results:
|
||
fbk = r["fbk"]
|
||
zi = zipfile.ZipInfo(fbk.name)
|
||
zi.compress_type = zipfile.ZIP_DEFLATED
|
||
with zf.open(zi, "w", force_zip64=True) as z:
|
||
with open(fbk, "rb") as src:
|
||
while buf := src.read(CHUNK):
|
||
z.write(buf)
|
||
total_processed += len(buf)
|
||
pct = total_processed * 100 / total_fbk_size
|
||
print(f"\r ZIP {fbk.name}: {pct:6.2f}%", end="", flush=True)
|
||
print()
|
||
return zipf.stat().st_size, time.time() - t1
|
||
|
||
|
||
def format_result(r: dict) -> str:
|
||
ratio = 100 * (1 - r["zip_size"] / r["fbk_size"]) if r["fbk_size"] else 0
|
||
return (
|
||
f" {r['label']}: "
|
||
f"FBK {r['fbk_size']/1024/1024:.1f} MB → "
|
||
f"ZIP {r['zip_size']/1024/1024:.1f} MB "
|
||
f"({ratio:.0f}% komprese, "
|
||
f"gbak {r['t_gbak']:.0f}s, zip {r['t_zip']:.0f}s)"
|
||
)
|
||
|
||
|
||
# ============================================================
|
||
# MAIN
|
||
# ============================================================
|
||
|
||
def main():
|
||
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
|
||
now = datetime.now()
|
||
ts = now.strftime("%Y-%m-%d_%H-%M-%S")
|
||
|
||
backed_up = []
|
||
errors = []
|
||
|
||
# ----------------------------------------------------------
|
||
# 1) Hlavní DB – MEDICUS.FDB → vlastní ZIP
|
||
# ----------------------------------------------------------
|
||
fbk = BACKUP_DIR / f"MEDICUS_{ts}.fbk"
|
||
zipf = BACKUP_DIR / f"MEDICUS_{ts}.zip"
|
||
log = BACKUP_DIR / f"MEDICUS_{ts}.log"
|
||
try:
|
||
r = run_gbak("MEDICUS", MAIN_DB, fbk, log)
|
||
log.unlink()
|
||
zip_size, t_zip = zip_single("MEDICUS", fbk, zipf)
|
||
fbk.unlink()
|
||
r["zip_size"] = zip_size
|
||
r["t_zip"] = t_zip
|
||
backed_up.append(r)
|
||
except Exception:
|
||
errors.append({"label": "MEDICUS", "fbk_size": 0, "zip_size": 0, "t_gbak": 0, "t_zip": 0, "error": traceback.format_exc()})
|
||
for f in (fbk, log):
|
||
if f.exists():
|
||
f.unlink()
|
||
|
||
# ----------------------------------------------------------
|
||
# 2) Externí DB – MEDICUS_FILES_*.fdb → všechny do jednoho ZIP
|
||
# ----------------------------------------------------------
|
||
fdb_all = sorted(
|
||
set(EXT_DIR.glob("MEDICUS_FILES_*.fdb")) | set(EXT_DIR.glob("MEDICUS_FILES_*.FDB")),
|
||
key=lambda p: p.name.lower(),
|
||
)
|
||
|
||
ext_results = []
|
||
for fdb in fdb_all:
|
||
name = fdb.stem
|
||
fbk = BACKUP_DIR / f"{name}_{ts}.fbk"
|
||
log = BACKUP_DIR / f"{name}_{ts}.log"
|
||
db_conn = f"localhost/{FB_PORT}:{fdb}"
|
||
try:
|
||
r = run_gbak(name, db_conn, fbk, log)
|
||
log.unlink()
|
||
ext_results.append(r)
|
||
except Exception:
|
||
errors.append({"label": name, "fbk_size": 0, "zip_size": 0, "t_gbak": 0, "t_zip": 0, "error": traceback.format_exc()})
|
||
for f in (fbk, log):
|
||
if f.exists():
|
||
f.unlink()
|
||
|
||
# ZIP všechny externí FBK do jednoho souboru
|
||
if ext_results:
|
||
ext_zip = BACKUP_DIR / f"MEDICUS_FILES_{ts}.zip"
|
||
print(f"\nZIP externích DB → {ext_zip.name}")
|
||
try:
|
||
zip_size, t_zip = zip_multiple(ext_results, ext_zip)
|
||
for r in ext_results:
|
||
r["zip_size"] = zip_size # sdílená velikost výsledného ZIPu
|
||
r["t_zip"] = t_zip
|
||
r["fbk"].unlink()
|
||
backed_up.append(r)
|
||
except Exception:
|
||
errors.append({"label": "MEDICUS_FILES (zip)", "fbk_size": 0, "zip_size": 0, "t_gbak": 0, "t_zip": 0, "error": traceback.format_exc()})
|
||
for r in ext_results:
|
||
if r["fbk"].exists():
|
||
r["fbk"].unlink()
|
||
|
||
# ----------------------------------------------------------
|
||
# Report
|
||
# ----------------------------------------------------------
|
||
total = 1 + len(fdb_all)
|
||
report = [
|
||
f"Backup Medicus – {now.strftime('%d.%m.%Y %H:%M')}",
|
||
f"Celkem DB: {total} | OK: {len(backed_up)} | Chyby: {len(errors)}",
|
||
f"Výstupní adresář: {BACKUP_DIR}",
|
||
"",
|
||
]
|
||
|
||
if backed_up:
|
||
report.append("--- Zálohováno ---")
|
||
# Hlavní DB
|
||
main_results = [r for r in backed_up if r["label"] == "MEDICUS"]
|
||
ext_backed = [r for r in backed_up if r["label"] != "MEDICUS"]
|
||
for r in main_results:
|
||
report.append(format_result(r))
|
||
if ext_backed:
|
||
total_ext_fbk = sum(r["fbk_size"] for r in ext_backed)
|
||
ext_zip_size = ext_backed[0]["zip_size"] if ext_backed else 0
|
||
ratio = 100 * (1 - ext_zip_size / total_ext_fbk) if total_ext_fbk else 0
|
||
report.append(f" Externí DB ({len(ext_backed)} souborů):")
|
||
for r in ext_backed:
|
||
report.append(f" {r['label']}: FBK {r['fbk_size']/1024/1024:.1f} MB (gbak {r['t_gbak']:.0f}s)")
|
||
report.append(
|
||
f" → společný ZIP: {ext_zip_size/1024/1024:.1f} MB "
|
||
f"({ratio:.0f}% komprese, zip {ext_backed[0]['t_zip']:.0f}s)"
|
||
)
|
||
total_zip = sum(r["zip_size"] for r in main_results) + (ext_backed[0]["zip_size"] if ext_backed else 0)
|
||
report.append(f" Celková velikost ZIP: {total_zip/1024/1024:.1f} MB")
|
||
report.append("")
|
||
|
||
if errors:
|
||
report.append("--- CHYBY ---")
|
||
for e in errors:
|
||
report.append(f" {e['label']}:\n{e['error']}")
|
||
report.append("")
|
||
|
||
has_errors = bool(errors)
|
||
subject = (
|
||
f"{'X' if has_errors else 'OK'} MEDICUS backup "
|
||
f"{len(backed_up)}/{total}"
|
||
+ (f" – {len(errors)} chyb" if has_errors else "")
|
||
)
|
||
|
||
send_mail(MAIL_TO, subject, "\n".join(report))
|
||
print("\n" + "\n".join(report))
|
||
|
||
if errors:
|
||
raise RuntimeError(f"{len(errors)} backup(s) failed")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|