Backup/BackupAll: přidány verze 01 a 02

01 = původní (každá DB vlastní ZIP)
02 = nový (hlavní DB vlastní ZIP, všechny externí DB → jeden ZIP)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-03 08:29:15 +02:00
parent 91edb6f084
commit e04bf6172a
2 changed files with 420 additions and 0 deletions
+185
View File
@@ -0,0 +1,185 @@
import subprocess
import os
from pathlib import Path
from datetime import datetime
import zipfile
import time
import traceback
from EmailMessagingGraph import send_mail
# ============================================================
# CONFIG
# ============================================================
GBAK = r"C:\Program Files\Firebird\Firebird_2_5_CGM\bin\gbak.exe"
FB_USER = "SYSDBA"
FB_PASS = "masterkey"
FB_PORT = "3050"
MAIN_DB = r"localhost/3050:C:\medicus 3\data\MEDICUS.FDB"
EXT_DIR = Path(r"U:\externi")
BACKUP_DIR = Path(r"U:\medicusbackup")
MAIL_TO = "vladimir.buzalka@buzalka.cz"
CHUNK = 8 * 1024 * 1024 # 8 MB
# ============================================================
# HELPERS
# ============================================================
def gbak_and_zip(label: str, db_conn: str, fbk: Path, zipf: Path, log: Path) -> dict:
"""
Run gbak backup and ZIP the result.
Returns a result dict.
"""
result = {
"label": label,
"ok": False,
"fbk_size": 0,
"zip_size": 0,
"t_gbak": 0,
"t_zip": 0,
"error": None,
}
# 1) GBAK
print(f"GBAK: {label} ... ", end="", flush=True)
t0 = time.time()
cmd = [GBAK, "-b", "-user", FB_USER, "-pas", FB_PASS, db_conn, str(fbk), "-v"]
with open(log, "w", encoding="utf-8") as f:
subprocess.run(cmd, stdout=f, stderr=subprocess.STDOUT, check=True)
result["t_gbak"] = time.time() - t0
result["fbk_size"] = fbk.stat().st_size
print(f"OK ({result['t_gbak']:.0f}s, {result['fbk_size']/1024/1024:.1f} MB)")
# 2) ZIP
t1 = time.time()
processed = 0
fbk_size = result["fbk_size"]
with zipfile.ZipFile(zipf, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zf:
zi = zipfile.ZipInfo(fbk.name)
zi.compress_type = zipfile.ZIP_DEFLATED
with zf.open(zi, "w", force_zip64=True) as z:
with open(fbk, "rb") as src:
while buf := src.read(CHUNK):
z.write(buf)
processed += len(buf)
pct = processed * 100 / fbk_size
print(f"\r ZIP {label}: {pct:6.2f}%", end="", flush=True)
print()
result["t_zip"] = time.time() - t1
result["zip_size"] = zipf.stat().st_size
# 3) Delete FBK + LOG
fbk.unlink()
log.unlink()
result["ok"] = True
return result
def format_result(r: dict) -> str:
ratio = 100 * (1 - r["zip_size"] / r["fbk_size"]) if r["fbk_size"] else 0
return (
f" {r['label']}: "
f"FBK {r['fbk_size']/1024/1024:.1f} MB → "
f"ZIP {r['zip_size']/1024/1024:.1f} MB "
f"({ratio:.0f}% komprese, "
f"gbak {r['t_gbak']:.0f}s, zip {r['t_zip']:.0f}s)"
)
# ============================================================
# MAIN
# ============================================================
def main():
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
now = datetime.now()
ts = now.strftime("%Y-%m-%d_%H-%M-%S")
backed_up = []
errors = []
# ----------------------------------------------------------
# 1) Hlavní DB MEDICUS.FDB
# ----------------------------------------------------------
fbk = BACKUP_DIR / f"MEDICUS_{ts}.fbk"
zipf = BACKUP_DIR / f"MEDICUS_{ts}.zip"
log = BACKUP_DIR / f"MEDICUS_{ts}.log"
try:
r = gbak_and_zip("MEDICUS", MAIN_DB, fbk, zipf, log)
backed_up.append(r)
except Exception:
errors.append({"label": "MEDICUS", "error": traceback.format_exc()})
for f in (fbk, log):
if f.exists():
f.unlink()
# ----------------------------------------------------------
# 2) Externí DB MEDICUS_FILES_*.fdb
# ----------------------------------------------------------
fdb_all = sorted(
set(EXT_DIR.glob("MEDICUS_FILES_*.fdb")) | set(EXT_DIR.glob("MEDICUS_FILES_*.FDB")),
key=lambda p: p.name.lower(),
)
for fdb in fdb_all:
name = fdb.stem
fbk = BACKUP_DIR / f"{name}_{ts}.fbk"
zipf = BACKUP_DIR / f"{name}_{ts}.zip"
log = BACKUP_DIR / f"{name}_{ts}.log"
db_conn = f"localhost/{FB_PORT}:{fdb}"
try:
r = gbak_and_zip(name, db_conn, fbk, zipf, log)
backed_up.append(r)
except Exception:
errors.append({"label": name, "error": traceback.format_exc()})
for f in (fbk, log):
if f.exists():
f.unlink()
# ----------------------------------------------------------
# Report
# ----------------------------------------------------------
total = 1 + len(fdb_all)
report = [
f"Backup Medicus {now.strftime('%d.%m.%Y %H:%M')}",
f"Celkem DB: {total} | OK: {len(backed_up)} | Chyby: {len(errors)}",
f"Výstupní adresář: {BACKUP_DIR}",
"",
]
if backed_up:
report.append("--- Zálohováno ---")
total_zip = sum(r["zip_size"] for r in backed_up)
for r in backed_up:
report.append(format_result(r))
report.append(f" Celková velikost ZIP: {total_zip/1024/1024:.1f} MB")
report.append("")
if errors:
report.append("--- CHYBY ---")
for e in errors:
report.append(f" {e['label']}:\n{e['error']}")
report.append("")
has_errors = bool(errors)
subject = (
f"{'X' if has_errors else 'OK'} MEDICUS backup "
f"{len(backed_up)}/{total}"
+ (f" {len(errors)} chyb" if has_errors else "")
)
send_mail(MAIL_TO, subject, "\n".join(report))
print("\n" + "\n".join(report))
if errors:
raise RuntimeError(f"{len(errors)} backup(s) failed")
if __name__ == "__main__":
main()
+235
View File
@@ -0,0 +1,235 @@
import subprocess
import os
from pathlib import Path
from datetime import datetime
import zipfile
import time
import traceback
from EmailMessagingGraph import send_mail
# ============================================================
# CONFIG
# ============================================================
GBAK = r"C:\Program Files\Firebird\Firebird_2_5_CGM\bin\gbak.exe"
FB_USER = "SYSDBA"
FB_PASS = "masterkey"
FB_PORT = "3050"
MAIN_DB = r"localhost/3050:C:\medicus 3\data\MEDICUS.FDB"
EXT_DIR = Path(r"U:\externi")
BACKUP_DIR = Path(r"U:\medicusbackup")
MAIL_TO = "vladimir.buzalka@buzalka.cz"
CHUNK = 8 * 1024 * 1024 # 8 MB
# ============================================================
# HELPERS
# ============================================================
def run_gbak(label: str, db_conn: str, fbk: Path, log: Path) -> dict:
"""Run gbak, return result dict (without zip info)."""
result = {
"label": label,
"ok": False,
"fbk": fbk,
"fbk_size": 0,
"zip_size": 0,
"t_gbak": 0,
"t_zip": 0,
"error": None,
}
print(f"GBAK: {label} ... ", end="", flush=True)
t0 = time.time()
cmd = [GBAK, "-b", "-user", FB_USER, "-pas", FB_PASS, db_conn, str(fbk), "-v"]
with open(log, "w", encoding="utf-8") as f:
subprocess.run(cmd, stdout=f, stderr=subprocess.STDOUT, check=True)
result["t_gbak"] = time.time() - t0
result["fbk_size"] = fbk.stat().st_size
print(f"OK ({result['t_gbak']:.0f}s, {result['fbk_size']/1024/1024:.1f} MB)")
result["ok"] = True
return result
def zip_single(label: str, fbk: Path, zipf: Path) -> tuple[int, float]:
"""ZIP one FBK into its own ZIP. Returns (zip_size, t_zip)."""
t1 = time.time()
processed = 0
fbk_size = fbk.stat().st_size
with zipfile.ZipFile(zipf, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zf:
zi = zipfile.ZipInfo(fbk.name)
zi.compress_type = zipfile.ZIP_DEFLATED
with zf.open(zi, "w", force_zip64=True) as z:
with open(fbk, "rb") as src:
while buf := src.read(CHUNK):
z.write(buf)
processed += len(buf)
pct = processed * 100 / fbk_size
print(f"\r ZIP {label}: {pct:6.2f}%", end="", flush=True)
print()
return zipf.stat().st_size, time.time() - t1
def zip_multiple(fbk_results: list[dict], zipf: Path) -> tuple[int, float]:
"""ZIP multiple FBK files into one ZIP. Returns (zip_size, t_zip)."""
t1 = time.time()
total_fbk_size = sum(r["fbk_size"] for r in fbk_results)
total_processed = 0
with zipfile.ZipFile(zipf, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zf:
for r in fbk_results:
fbk = r["fbk"]
zi = zipfile.ZipInfo(fbk.name)
zi.compress_type = zipfile.ZIP_DEFLATED
with zf.open(zi, "w", force_zip64=True) as z:
with open(fbk, "rb") as src:
while buf := src.read(CHUNK):
z.write(buf)
total_processed += len(buf)
pct = total_processed * 100 / total_fbk_size
print(f"\r ZIP {fbk.name}: {pct:6.2f}%", end="", flush=True)
print()
return zipf.stat().st_size, time.time() - t1
def format_result(r: dict) -> str:
ratio = 100 * (1 - r["zip_size"] / r["fbk_size"]) if r["fbk_size"] else 0
return (
f" {r['label']}: "
f"FBK {r['fbk_size']/1024/1024:.1f} MB → "
f"ZIP {r['zip_size']/1024/1024:.1f} MB "
f"({ratio:.0f}% komprese, "
f"gbak {r['t_gbak']:.0f}s, zip {r['t_zip']:.0f}s)"
)
# ============================================================
# MAIN
# ============================================================
def main():
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
now = datetime.now()
ts = now.strftime("%Y-%m-%d_%H-%M-%S")
backed_up = []
errors = []
# ----------------------------------------------------------
# 1) Hlavní DB MEDICUS.FDB → vlastní ZIP
# ----------------------------------------------------------
fbk = BACKUP_DIR / f"MEDICUS_{ts}.fbk"
zipf = BACKUP_DIR / f"MEDICUS_{ts}.zip"
log = BACKUP_DIR / f"MEDICUS_{ts}.log"
try:
r = run_gbak("MEDICUS", MAIN_DB, fbk, log)
log.unlink()
zip_size, t_zip = zip_single("MEDICUS", fbk, zipf)
fbk.unlink()
r["zip_size"] = zip_size
r["t_zip"] = t_zip
backed_up.append(r)
except Exception:
errors.append({"label": "MEDICUS", "fbk_size": 0, "zip_size": 0, "t_gbak": 0, "t_zip": 0, "error": traceback.format_exc()})
for f in (fbk, log):
if f.exists():
f.unlink()
# ----------------------------------------------------------
# 2) Externí DB MEDICUS_FILES_*.fdb → všechny do jednoho ZIP
# ----------------------------------------------------------
fdb_all = sorted(
set(EXT_DIR.glob("MEDICUS_FILES_*.fdb")) | set(EXT_DIR.glob("MEDICUS_FILES_*.FDB")),
key=lambda p: p.name.lower(),
)
ext_results = []
for fdb in fdb_all:
name = fdb.stem
fbk = BACKUP_DIR / f"{name}_{ts}.fbk"
log = BACKUP_DIR / f"{name}_{ts}.log"
db_conn = f"localhost/{FB_PORT}:{fdb}"
try:
r = run_gbak(name, db_conn, fbk, log)
log.unlink()
ext_results.append(r)
except Exception:
errors.append({"label": name, "fbk_size": 0, "zip_size": 0, "t_gbak": 0, "t_zip": 0, "error": traceback.format_exc()})
for f in (fbk, log):
if f.exists():
f.unlink()
# ZIP všechny externí FBK do jednoho souboru
if ext_results:
ext_zip = BACKUP_DIR / f"MEDICUS_FILES_{ts}.zip"
print(f"\nZIP externích DB → {ext_zip.name}")
try:
zip_size, t_zip = zip_multiple(ext_results, ext_zip)
for r in ext_results:
r["zip_size"] = zip_size # sdílená velikost výsledného ZIPu
r["t_zip"] = t_zip
r["fbk"].unlink()
backed_up.append(r)
except Exception:
errors.append({"label": "MEDICUS_FILES (zip)", "fbk_size": 0, "zip_size": 0, "t_gbak": 0, "t_zip": 0, "error": traceback.format_exc()})
for r in ext_results:
if r["fbk"].exists():
r["fbk"].unlink()
# ----------------------------------------------------------
# Report
# ----------------------------------------------------------
total = 1 + len(fdb_all)
report = [
f"Backup Medicus {now.strftime('%d.%m.%Y %H:%M')}",
f"Celkem DB: {total} | OK: {len(backed_up)} | Chyby: {len(errors)}",
f"Výstupní adresář: {BACKUP_DIR}",
"",
]
if backed_up:
report.append("--- Zálohováno ---")
# Hlavní DB
main_results = [r for r in backed_up if r["label"] == "MEDICUS"]
ext_backed = [r for r in backed_up if r["label"] != "MEDICUS"]
for r in main_results:
report.append(format_result(r))
if ext_backed:
total_ext_fbk = sum(r["fbk_size"] for r in ext_backed)
ext_zip_size = ext_backed[0]["zip_size"] if ext_backed else 0
ratio = 100 * (1 - ext_zip_size / total_ext_fbk) if total_ext_fbk else 0
report.append(f" Externí DB ({len(ext_backed)} souborů):")
for r in ext_backed:
report.append(f" {r['label']}: FBK {r['fbk_size']/1024/1024:.1f} MB (gbak {r['t_gbak']:.0f}s)")
report.append(
f" → společný ZIP: {ext_zip_size/1024/1024:.1f} MB "
f"({ratio:.0f}% komprese, zip {ext_backed[0]['t_zip']:.0f}s)"
)
total_zip = sum(r["zip_size"] for r in main_results) + (ext_backed[0]["zip_size"] if ext_backed else 0)
report.append(f" Celková velikost ZIP: {total_zip/1024/1024:.1f} MB")
report.append("")
if errors:
report.append("--- CHYBY ---")
for e in errors:
report.append(f" {e['label']}:\n{e['error']}")
report.append("")
has_errors = bool(errors)
subject = (
f"{'X' if has_errors else 'OK'} MEDICUS backup "
f"{len(backed_up)}/{total}"
+ (f" {len(errors)} chyb" if has_errors else "")
)
send_mail(MAIL_TO, subject, "\n".join(report))
print("\n" + "\n".join(report))
if errors:
raise RuntimeError(f"{len(errors)} backup(s) failed")
if __name__ == "__main__":
main()