Files
medicus/Backup/BackupAll02.py
2026-04-03 08:55:10 +02:00

236 lines
8.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import subprocess
import os
from pathlib import Path
from datetime import datetime
import zipfile
import time
import traceback
from EmailMessagingGraph import send_mail
# ============================================================
# CONFIG
# ============================================================
GBAK = r"C:\Program Files\Firebird\Firebird_2_5_CGM\bin\gbak.exe"
FB_USER = "SYSDBA"
FB_PASS = "masterkey"
FB_PORT = "3050"
MAIN_DB = r"localhost/3050:C:\medicus 3\data\MEDICUS.FDB"
EXT_DIR = Path(r"c:\medicusext")
BACKUP_DIR = Path(r"U:\medicusbackup")
MAIL_TO = "vladimir.buzalka@buzalka.cz"
CHUNK = 8 * 1024 * 1024 # 8 MB
# ============================================================
# HELPERS
# ============================================================
def run_gbak(label: str, db_conn: str, fbk: Path, log: Path) -> dict:
"""Run gbak, return result dict (without zip info)."""
result = {
"label": label,
"ok": False,
"fbk": fbk,
"fbk_size": 0,
"zip_size": 0,
"t_gbak": 0,
"t_zip": 0,
"error": None,
}
print(f"GBAK: {label} ... ", end="", flush=True)
t0 = time.time()
cmd = [GBAK, "-b", "-user", FB_USER, "-pas", FB_PASS, db_conn, str(fbk), "-v"]
with open(log, "w", encoding="utf-8") as f:
subprocess.run(cmd, stdout=f, stderr=subprocess.STDOUT, check=True)
result["t_gbak"] = time.time() - t0
result["fbk_size"] = fbk.stat().st_size
print(f"OK ({result['t_gbak']:.0f}s, {result['fbk_size']/1024/1024:.1f} MB)")
result["ok"] = True
return result
def zip_single(label: str, fbk: Path, zipf: Path) -> tuple[int, float]:
"""ZIP one FBK into its own ZIP. Returns (zip_size, t_zip)."""
t1 = time.time()
processed = 0
fbk_size = fbk.stat().st_size
with zipfile.ZipFile(zipf, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zf:
zi = zipfile.ZipInfo(fbk.name)
zi.compress_type = zipfile.ZIP_DEFLATED
with zf.open(zi, "w", force_zip64=True) as z:
with open(fbk, "rb") as src:
while buf := src.read(CHUNK):
z.write(buf)
processed += len(buf)
pct = processed * 100 / fbk_size
print(f"\r ZIP {label}: {pct:6.2f}%", end="", flush=True)
print()
return zipf.stat().st_size, time.time() - t1
def zip_multiple(fbk_results: list[dict], zipf: Path) -> tuple[int, float]:
"""ZIP multiple FBK files into one ZIP. Returns (zip_size, t_zip)."""
t1 = time.time()
total_fbk_size = sum(r["fbk_size"] for r in fbk_results)
total_processed = 0
with zipfile.ZipFile(zipf, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zf:
for r in fbk_results:
fbk = r["fbk"]
zi = zipfile.ZipInfo(fbk.name)
zi.compress_type = zipfile.ZIP_DEFLATED
with zf.open(zi, "w", force_zip64=True) as z:
with open(fbk, "rb") as src:
while buf := src.read(CHUNK):
z.write(buf)
total_processed += len(buf)
pct = total_processed * 100 / total_fbk_size
print(f"\r ZIP {fbk.name}: {pct:6.2f}%", end="", flush=True)
print()
return zipf.stat().st_size, time.time() - t1
def format_result(r: dict) -> str:
ratio = 100 * (1 - r["zip_size"] / r["fbk_size"]) if r["fbk_size"] else 0
return (
f" {r['label']}: "
f"FBK {r['fbk_size']/1024/1024:.1f} MB → "
f"ZIP {r['zip_size']/1024/1024:.1f} MB "
f"({ratio:.0f}% komprese, "
f"gbak {r['t_gbak']:.0f}s, zip {r['t_zip']:.0f}s)"
)
# ============================================================
# MAIN
# ============================================================
def main():
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
now = datetime.now()
ts = now.strftime("%Y-%m-%d_%H-%M-%S")
backed_up = []
errors = []
# ----------------------------------------------------------
# 1) Hlavní DB MEDICUS.FDB → vlastní ZIP
# ----------------------------------------------------------
fbk = BACKUP_DIR / f"MEDICUS_{ts}.fbk"
zipf = BACKUP_DIR / f"MEDICUS_{ts}.zip"
log = BACKUP_DIR / f"MEDICUS_{ts}.log"
try:
r = run_gbak("MEDICUS", MAIN_DB, fbk, log)
log.unlink()
zip_size, t_zip = zip_single("MEDICUS", fbk, zipf)
fbk.unlink()
r["zip_size"] = zip_size
r["t_zip"] = t_zip
backed_up.append(r)
except Exception:
errors.append({"label": "MEDICUS", "fbk_size": 0, "zip_size": 0, "t_gbak": 0, "t_zip": 0, "error": traceback.format_exc()})
for f in (fbk, log):
if f.exists():
f.unlink()
# ----------------------------------------------------------
# 2) Externí DB MEDICUS_FILES_*.fdb → všechny do jednoho ZIP
# ----------------------------------------------------------
fdb_all = sorted(
set(EXT_DIR.glob("MEDICUS_FILES_*.fdb")) | set(EXT_DIR.glob("MEDICUS_FILES_*.FDB")),
key=lambda p: p.name.lower(),
)
ext_results = []
for fdb in fdb_all:
name = fdb.stem
fbk = BACKUP_DIR / f"{name}_{ts}.fbk"
log = BACKUP_DIR / f"{name}_{ts}.log"
db_conn = f"localhost/{FB_PORT}:{fdb}"
try:
r = run_gbak(name, db_conn, fbk, log)
log.unlink()
ext_results.append(r)
except Exception:
errors.append({"label": name, "fbk_size": 0, "zip_size": 0, "t_gbak": 0, "t_zip": 0, "error": traceback.format_exc()})
for f in (fbk, log):
if f.exists():
f.unlink()
# ZIP všechny externí FBK do jednoho souboru
if ext_results:
ext_zip = BACKUP_DIR / f"MEDICUS_FILES_{ts}.zip"
print(f"\nZIP externích DB → {ext_zip.name}")
try:
zip_size, t_zip = zip_multiple(ext_results, ext_zip)
for r in ext_results:
r["zip_size"] = zip_size # sdílená velikost výsledného ZIPu
r["t_zip"] = t_zip
r["fbk"].unlink()
backed_up.append(r)
except Exception:
errors.append({"label": "MEDICUS_FILES (zip)", "fbk_size": 0, "zip_size": 0, "t_gbak": 0, "t_zip": 0, "error": traceback.format_exc()})
for r in ext_results:
if r["fbk"].exists():
r["fbk"].unlink()
# ----------------------------------------------------------
# Report
# ----------------------------------------------------------
total = 1 + len(fdb_all)
report = [
f"Backup Medicus {now.strftime('%d.%m.%Y %H:%M')}",
f"Celkem DB: {total} | OK: {len(backed_up)} | Chyby: {len(errors)}",
f"Výstupní adresář: {BACKUP_DIR}",
"",
]
if backed_up:
report.append("--- Zálohováno ---")
# Hlavní DB
main_results = [r for r in backed_up if r["label"] == "MEDICUS"]
ext_backed = [r for r in backed_up if r["label"] != "MEDICUS"]
for r in main_results:
report.append(format_result(r))
if ext_backed:
total_ext_fbk = sum(r["fbk_size"] for r in ext_backed)
ext_zip_size = ext_backed[0]["zip_size"] if ext_backed else 0
ratio = 100 * (1 - ext_zip_size / total_ext_fbk) if total_ext_fbk else 0
report.append(f" Externí DB ({len(ext_backed)} souborů):")
for r in ext_backed:
report.append(f" {r['label']}: FBK {r['fbk_size']/1024/1024:.1f} MB (gbak {r['t_gbak']:.0f}s)")
report.append(
f" → společný ZIP: {ext_zip_size/1024/1024:.1f} MB "
f"({ratio:.0f}% komprese, zip {ext_backed[0]['t_zip']:.0f}s)"
)
total_zip = sum(r["zip_size"] for r in main_results) + (ext_backed[0]["zip_size"] if ext_backed else 0)
report.append(f" Celková velikost ZIP: {total_zip/1024/1024:.1f} MB")
report.append("")
if errors:
report.append("--- CHYBY ---")
for e in errors:
report.append(f" {e['label']}:\n{e['error']}")
report.append("")
has_errors = bool(errors)
subject = (
f"{'X' if has_errors else 'OK'} MEDICUS backup "
f"{len(backed_up)}/{total}"
+ (f" {len(errors)} chyb" if has_errors else "")
)
send_mail(MAIL_TO, subject, "\n".join(report))
print("\n" + "\n".join(report))
if errors:
raise RuntimeError(f"{len(errors)} backup(s) failed")
if __name__ == "__main__":
main()