Files
ordinaceprojekt/Medicus/DBrestore/restore.py
T
2026-05-16 07:58:18 +02:00

222 lines
7.7 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
r"""
Rozbalí nejnovější zálohu Medicus z \\tower\ordinacesynology\MedicusBackup\ a obnoví Firebird DB do C:\Medicus\medicus.fdb
"""
import re
import sys
import zipfile
import subprocess
import time
import fdb
from io import StringIO
from pathlib import Path
from datetime import datetime
import socket
if socket.gethostname().upper() != "REPORTER":
raise SystemExit(f"Tento skript lze spustit pouze na pocitaci REPORTER (aktualni: {socket.gethostname()})")
# --- zachytavani logu ---
class Tee:
"""Pise na stdout i do bufferu zaroven."""
def __init__(self):
self.buffer = StringIO()
def write(self, msg):
sys.__stdout__.write(msg)
self.buffer.write(msg)
def flush(self):
sys.__stdout__.flush()
def getvalue(self):
return self.buffer.getvalue()
tee = Tee()
sys.stdout = tee
# -------------------------
start_time = time.time()
BACKUP_DIR = Path(r"\\tower\ordinacesynology\MedicusBackup")
EXTRACT_DIR = Path(r"C:\Medicus\restore")
TARGET_DB = Path(r"C:\Medicus\medicus.fdb")
GBAK = Path(r"C:\Program Files\Firebird\Firebird_2_5_CGM\bin\gbak.exe")
LOG_FILE = Path(r"C:\Medicus\restore\restore.log")
LAST_RESTORED = Path(r"C:\Medicus\restore\last_restored.txt")
EMAIL_TO = "vladimir.buzalka@buzalka.cz"
USER = "SYSDBA"
PASSWORD = "masterkey"
success = False
pocet_pacientu = None
try:
# 1) Najdi nejnovejsi ZIP
pattern = re.compile(r"MEDICUS_(\d{6})_(\d{4})\.zip$", re.IGNORECASE)
candidates = []
for f in BACKUP_DIR.iterdir():
m = pattern.match(f.name)
if m:
ts = datetime.strptime("20" + m.group(1) + m.group(2), "%Y%m%d%H%M")
candidates.append((ts, f))
if not candidates:
raise SystemExit("Zadny MEDICUS_*.zip nenalezen v " + str(BACKUP_DIR))
candidates.sort(reverse=True)
latest_ts, latest_zip = candidates[0]
print(f"Nejnovejsi zaloha: {latest_zip.name} ({latest_ts})")
# Zkontroluj jestli uz byl tento ZIP obnoven
if LAST_RESTORED.exists():
last = LAST_RESTORED.read_text(encoding="utf-8").strip()
if last == latest_zip.name:
print("Tento backup byl jiz obnoven, novy zip nenalezen. Konec.")
sys.stdout = sys.__stdout__
raise SystemExit(0)
# Zkontroluj jestli je ZIP kompletne prenesen (rsync)
size1 = latest_zip.stat().st_size
print(f"Velikost ZIP: {size1 / 1024 / 1024:.1f} MB - cekam 60s...")
sys.stdout.flush()
time.sleep(60)
size2 = latest_zip.stat().st_size
if size1 != size2:
print(f"ZIP se stale meni ({size2 / 1024 / 1024:.1f} MB), prenos nedokoncen. Konec.")
sys.stdout = sys.__stdout__
raise SystemExit(0)
print(f"Velikost stabilni, prenos dokoncen.")
# 2) Rozbal ZIP - hledej .fbk
EXTRACT_DIR.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(latest_zip) as zf:
fbk_entries = [n for n in zf.namelist() if n.lower().endswith(".fbk")]
if not fbk_entries:
raise SystemExit("ZIP neobsahuje .fbk soubor!")
fbk_entry = fbk_entries[0]
fbk_name = Path(fbk_entry).name
zf.extract(fbk_entry, EXTRACT_DIR)
extracted = next(EXTRACT_DIR.rglob(fbk_name))
print(f"Rozbaleno: {extracted}")
# 3) Odpoj aktivni klienty a obnov DB
try:
conn = fdb.connect(
dsn=r'localhost:c:\medicus\medicus.fdb',
user='SYSDBA', password='masterkey', charset='win1250'
)
cur = conn.cursor()
cur.execute("""
SELECT MON$ATTACHMENT_ID, MON$USER, MON$REMOTE_PROCESS
FROM MON$ATTACHMENTS
WHERE MON$ATTACHMENT_ID <> CURRENT_CONNECTION
AND MON$USER <> 'GARBAGE COLLECTOR'
""")
attachments = cur.fetchall()
if attachments:
print(f"Aktivni pripojeni ({len(attachments)}), odpojuji:")
for att_id, user, process in attachments:
print(f" - ID={att_id} user={user} process={process}")
cur.execute("DELETE FROM MON$ATTACHMENTS WHERE MON$ATTACHMENT_ID = ?", (att_id,))
conn.commit()
print("Vsechna pripojeni odpojena.\n")
else:
print("Zadna aktivni pripojeni.\n")
conn.close()
except Exception as e:
print(f"Varovani: nepodarilo se zkontrolovat pripojeni ({e})\n")
TARGET_DB.parent.mkdir(parents=True, exist_ok=True)
if TARGET_DB.exists():
TARGET_DB.unlink()
print(f"Smazana stara DB: {TARGET_DB}")
cmd = [str(GBAK), "-rep", "-v", str(extracted), str(TARGET_DB), "-user", USER, "-pas", PASSWORD]
print(f"\nSpoustim: {' '.join(cmd)}\n")
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
for line in proc.stdout:
print(line, end="")
proc.wait()
returncode = proc.returncode
elapsed = time.time() - start_time
minutes, seconds = divmod(int(elapsed), 60)
if returncode == 0:
success = True
print(f"\nObnova dokoncena. Cas: {minutes}:{seconds:02d} (min:sec)")
LAST_RESTORED.write_text(latest_zip.name, encoding="utf-8")
latest_zip.unlink()
print(f"Smazan ZIP: {latest_zip.name}")
extracted.unlink()
print(f"Smazan FBK: {extracted.name}")
# Test DB - pocet registrovanych pacientu
try:
conn_test = fdb.connect(
dsn=r'localhost:c:\medicus\medicus.fdb',
user='SYSDBA', password='masterkey', charset='win1250'
)
cur_test = conn_test.cursor()
cur_test.execute("""
SELECT COUNT(*) FROM KAR
WHERE vyrazen = 'N'
AND EXISTS (
SELECT id FROM registr r
JOIN icp i ON r.idicp = i.idicp
WHERE r.idpac = kar.idpac
AND r.datum <= CURRENT_DATE
AND (r.datum_zruseni IS NULL OR r.datum_zruseni >= CURRENT_DATE)
AND r.priznak IN ('V','D','A')
AND i.icp = '09305001'
AND i.odb = '001'
)
""")
pocet_pacientu = cur_test.fetchone()[0]
conn_test.close()
print(f"Test DB OK - registrovanych pacientu: {pocet_pacientu}")
except Exception as e:
pocet_pacientu = None
print(f"Test DB selhal: {e}")
else:
print(f"\nCHYBA (navratovy kod {returncode}). Cas: {minutes}:{seconds:02d} (min:sec)")
except Exception as e:
elapsed = time.time() - start_time
minutes, seconds = divmod(int(elapsed), 60)
print(f"\nVYJIMKA: {e}. Cas: {minutes}:{seconds:02d} (min:sec)")
finally:
sys.stdout = sys.__stdout__
log_text = tee.getvalue()
elapsed = time.time() - start_time
minutes, seconds = divmod(int(elapsed), 60)
# Uloz log
LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
LOG_FILE.write_text(log_text, encoding="utf-8")
# Odeslat email
try:
sys.path.insert(0, r"C:\Reporting\knihovny")
from EmailMessagingGraph import send_mail
status = "OK" if success else "CHYBA"
subject = f"Medicus DB restore [{status}] - {minutes}:{seconds:02d} min"
pac_info = f"\nRegistrovanych pacientu: {pocet_pacientu}" if success and pocet_pacientu is not None else ""
body = f"Restore databaze Medicus na Reporteru\n\nVysledek: {status}\nDoba: {minutes}:{seconds:02d} (min:sec){pac_info}\n\nPodrobny log v priloze."
send_mail(
to=EMAIL_TO,
subject=subject,
body=body,
attachments=LOG_FILE,
)
print("Email odeslan.")
LOG_FILE.unlink(missing_ok=True)
except Exception as e:
print(f"Chyba pri odesilani emailu: {e}")