Compare commits

...

16 Commits

Author SHA1 Message Date
administrator 894720140f z230 2026-04-21 08:42:33 +02:00
administrator 95927c304d notebook vb 2026-04-11 08:54:47 +02:00
administrator c6d7479967 notebook vb 2026-04-09 07:07:53 +02:00
administrator 8955d33260 notebook vb 2026-04-05 07:58:12 +02:00
administrator 8ba7bae707 notebook vb 2026-04-04 08:57:00 +02:00
administrator 8782ec1bde notebook vb 2026-04-03 08:55:10 +02:00
administrator 240bc0d83f MedicusWithClaude: detailní poznámky k test_import_FINAL.py
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-03 08:53:59 +02:00
administrator 05b98a5cec MedicusWithClaude: přejmenování test_import_single → test_import_FINAL
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-03 08:43:14 +02:00
administrator e04bf6172a Backup/BackupAll: přidány verze 01 a 02
01 = původní (každá DB vlastní ZIP)
02 = nový (hlavní DB vlastní ZIP, všechny externí DB → jeden ZIP)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-03 08:29:15 +02:00
administrator 91edb6f084 Merge: BackupExterniDB01 + BackupExterniDB02 do master
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-03 08:22:03 +02:00
administrator 3eaf8709cf Backup/BackupExterniDB: přidány dva skripty pro zálohu ext. DB
01 = původní (každý FBK → vlastní ZIP)
02 = nový (všechny FBK → jeden ZIP)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-03 08:21:50 +02:00
administrator 1dd3c1d9ae notebook vb 2026-04-03 08:08:46 +02:00
administrator ba594c373b notebook vb 2026-04-01 06:05:21 +02:00
administrator 3b30c35400 notebook vb 2026-04-01 06:04:40 +02:00
administrator 4950c00309 z230 2026-03-31 14:08:53 +02:00
administrator 3141875629 notebook vb 2026-03-31 07:47:17 +02:00
29 changed files with 5595 additions and 4 deletions
+185
View File
@@ -0,0 +1,185 @@
import subprocess
import os
from pathlib import Path
from datetime import datetime
import zipfile
import time
import traceback
from EmailMessagingGraph import send_mail
# ============================================================
# CONFIG
# ============================================================
GBAK = r"C:\Program Files\Firebird\Firebird_2_5_CGM\bin\gbak.exe"
FB_USER = "SYSDBA"
FB_PASS = "masterkey"
FB_PORT = "3050"
MAIN_DB = r"localhost/3050:C:\medicus 3\data\MEDICUS.FDB"
EXT_DIR = Path(r"U:\externi")
BACKUP_DIR = Path(r"U:\medicusbackup")
MAIL_TO = "vladimir.buzalka@buzalka.cz"
CHUNK = 8 * 1024 * 1024 # 8 MB
# ============================================================
# HELPERS
# ============================================================
def gbak_and_zip(label: str, db_conn: str, fbk: Path, zipf: Path, log: Path) -> dict:
"""
Run gbak backup and ZIP the result.
Returns a result dict.
"""
result = {
"label": label,
"ok": False,
"fbk_size": 0,
"zip_size": 0,
"t_gbak": 0,
"t_zip": 0,
"error": None,
}
# 1) GBAK
print(f"GBAK: {label} ... ", end="", flush=True)
t0 = time.time()
cmd = [GBAK, "-b", "-user", FB_USER, "-pas", FB_PASS, db_conn, str(fbk), "-v"]
with open(log, "w", encoding="utf-8") as f:
subprocess.run(cmd, stdout=f, stderr=subprocess.STDOUT, check=True)
result["t_gbak"] = time.time() - t0
result["fbk_size"] = fbk.stat().st_size
print(f"OK ({result['t_gbak']:.0f}s, {result['fbk_size']/1024/1024:.1f} MB)")
# 2) ZIP
t1 = time.time()
processed = 0
fbk_size = result["fbk_size"]
with zipfile.ZipFile(zipf, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zf:
zi = zipfile.ZipInfo(fbk.name)
zi.compress_type = zipfile.ZIP_DEFLATED
with zf.open(zi, "w", force_zip64=True) as z:
with open(fbk, "rb") as src:
while buf := src.read(CHUNK):
z.write(buf)
processed += len(buf)
pct = processed * 100 / fbk_size
print(f"\r ZIP {label}: {pct:6.2f}%", end="", flush=True)
print()
result["t_zip"] = time.time() - t1
result["zip_size"] = zipf.stat().st_size
# 3) Delete FBK + LOG
fbk.unlink()
log.unlink()
result["ok"] = True
return result
def format_result(r: dict) -> str:
ratio = 100 * (1 - r["zip_size"] / r["fbk_size"]) if r["fbk_size"] else 0
return (
f" {r['label']}: "
f"FBK {r['fbk_size']/1024/1024:.1f} MB → "
f"ZIP {r['zip_size']/1024/1024:.1f} MB "
f"({ratio:.0f}% komprese, "
f"gbak {r['t_gbak']:.0f}s, zip {r['t_zip']:.0f}s)"
)
# ============================================================
# MAIN
# ============================================================
def main():
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
now = datetime.now()
ts = now.strftime("%Y-%m-%d_%H-%M-%S")
backed_up = []
errors = []
# ----------------------------------------------------------
# 1) Hlavní DB MEDICUS.FDB
# ----------------------------------------------------------
fbk = BACKUP_DIR / f"MEDICUS_{ts}.fbk"
zipf = BACKUP_DIR / f"MEDICUS_{ts}.zip"
log = BACKUP_DIR / f"MEDICUS_{ts}.log"
try:
r = gbak_and_zip("MEDICUS", MAIN_DB, fbk, zipf, log)
backed_up.append(r)
except Exception:
errors.append({"label": "MEDICUS", "error": traceback.format_exc()})
for f in (fbk, log):
if f.exists():
f.unlink()
# ----------------------------------------------------------
# 2) Externí DB MEDICUS_FILES_*.fdb
# ----------------------------------------------------------
fdb_all = sorted(
set(EXT_DIR.glob("MEDICUS_FILES_*.fdb")) | set(EXT_DIR.glob("MEDICUS_FILES_*.FDB")),
key=lambda p: p.name.lower(),
)
for fdb in fdb_all:
name = fdb.stem
fbk = BACKUP_DIR / f"{name}_{ts}.fbk"
zipf = BACKUP_DIR / f"{name}_{ts}.zip"
log = BACKUP_DIR / f"{name}_{ts}.log"
db_conn = f"localhost/{FB_PORT}:{fdb}"
try:
r = gbak_and_zip(name, db_conn, fbk, zipf, log)
backed_up.append(r)
except Exception:
errors.append({"label": name, "error": traceback.format_exc()})
for f in (fbk, log):
if f.exists():
f.unlink()
# ----------------------------------------------------------
# Report
# ----------------------------------------------------------
total = 1 + len(fdb_all)
report = [
f"Backup Medicus {now.strftime('%d.%m.%Y %H:%M')}",
f"Celkem DB: {total} | OK: {len(backed_up)} | Chyby: {len(errors)}",
f"Výstupní adresář: {BACKUP_DIR}",
"",
]
if backed_up:
report.append("--- Zálohováno ---")
total_zip = sum(r["zip_size"] for r in backed_up)
for r in backed_up:
report.append(format_result(r))
report.append(f" Celková velikost ZIP: {total_zip/1024/1024:.1f} MB")
report.append("")
if errors:
report.append("--- CHYBY ---")
for e in errors:
report.append(f" {e['label']}:\n{e['error']}")
report.append("")
has_errors = bool(errors)
subject = (
f"{'X' if has_errors else 'OK'} MEDICUS backup "
f"{len(backed_up)}/{total}"
+ (f" {len(errors)} chyb" if has_errors else "")
)
send_mail(MAIL_TO, subject, "\n".join(report))
print("\n" + "\n".join(report))
if errors:
raise RuntimeError(f"{len(errors)} backup(s) failed")
if __name__ == "__main__":
main()
+235
View File
@@ -0,0 +1,235 @@
import subprocess
import os
from pathlib import Path
from datetime import datetime
import zipfile
import time
import traceback
from EmailMessagingGraph import send_mail
# ============================================================
# CONFIG
# ============================================================
GBAK = r"C:\Program Files\Firebird\Firebird_2_5_CGM\bin\gbak.exe"
FB_USER = "SYSDBA"
FB_PASS = "masterkey"
FB_PORT = "3050"
MAIN_DB = r"localhost/3050:C:\medicus 3\data\MEDICUS.FDB"
EXT_DIR = Path(r"c:\medicusext")
BACKUP_DIR = Path(r"U:\medicusbackup")
MAIL_TO = "vladimir.buzalka@buzalka.cz"
CHUNK = 8 * 1024 * 1024 # 8 MB
# ============================================================
# HELPERS
# ============================================================
def run_gbak(label: str, db_conn: str, fbk: Path, log: Path) -> dict:
"""Run gbak, return result dict (without zip info)."""
result = {
"label": label,
"ok": False,
"fbk": fbk,
"fbk_size": 0,
"zip_size": 0,
"t_gbak": 0,
"t_zip": 0,
"error": None,
}
print(f"GBAK: {label} ... ", end="", flush=True)
t0 = time.time()
cmd = [GBAK, "-b", "-user", FB_USER, "-pas", FB_PASS, db_conn, str(fbk), "-v"]
with open(log, "w", encoding="utf-8") as f:
subprocess.run(cmd, stdout=f, stderr=subprocess.STDOUT, check=True)
result["t_gbak"] = time.time() - t0
result["fbk_size"] = fbk.stat().st_size
print(f"OK ({result['t_gbak']:.0f}s, {result['fbk_size']/1024/1024:.1f} MB)")
result["ok"] = True
return result
def zip_single(label: str, fbk: Path, zipf: Path) -> tuple[int, float]:
"""ZIP one FBK into its own ZIP. Returns (zip_size, t_zip)."""
t1 = time.time()
processed = 0
fbk_size = fbk.stat().st_size
with zipfile.ZipFile(zipf, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zf:
zi = zipfile.ZipInfo(fbk.name)
zi.compress_type = zipfile.ZIP_DEFLATED
with zf.open(zi, "w", force_zip64=True) as z:
with open(fbk, "rb") as src:
while buf := src.read(CHUNK):
z.write(buf)
processed += len(buf)
pct = processed * 100 / fbk_size
print(f"\r ZIP {label}: {pct:6.2f}%", end="", flush=True)
print()
return zipf.stat().st_size, time.time() - t1
def zip_multiple(fbk_results: list[dict], zipf: Path) -> tuple[int, float]:
"""ZIP multiple FBK files into one ZIP. Returns (zip_size, t_zip)."""
t1 = time.time()
total_fbk_size = sum(r["fbk_size"] for r in fbk_results)
total_processed = 0
with zipfile.ZipFile(zipf, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zf:
for r in fbk_results:
fbk = r["fbk"]
zi = zipfile.ZipInfo(fbk.name)
zi.compress_type = zipfile.ZIP_DEFLATED
with zf.open(zi, "w", force_zip64=True) as z:
with open(fbk, "rb") as src:
while buf := src.read(CHUNK):
z.write(buf)
total_processed += len(buf)
pct = total_processed * 100 / total_fbk_size
print(f"\r ZIP {fbk.name}: {pct:6.2f}%", end="", flush=True)
print()
return zipf.stat().st_size, time.time() - t1
def format_result(r: dict) -> str:
ratio = 100 * (1 - r["zip_size"] / r["fbk_size"]) if r["fbk_size"] else 0
return (
f" {r['label']}: "
f"FBK {r['fbk_size']/1024/1024:.1f} MB → "
f"ZIP {r['zip_size']/1024/1024:.1f} MB "
f"({ratio:.0f}% komprese, "
f"gbak {r['t_gbak']:.0f}s, zip {r['t_zip']:.0f}s)"
)
# ============================================================
# MAIN
# ============================================================
def main():
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
now = datetime.now()
ts = now.strftime("%Y-%m-%d_%H-%M-%S")
backed_up = []
errors = []
# ----------------------------------------------------------
# 1) Hlavní DB MEDICUS.FDB → vlastní ZIP
# ----------------------------------------------------------
fbk = BACKUP_DIR / f"MEDICUS_{ts}.fbk"
zipf = BACKUP_DIR / f"MEDICUS_{ts}.zip"
log = BACKUP_DIR / f"MEDICUS_{ts}.log"
try:
r = run_gbak("MEDICUS", MAIN_DB, fbk, log)
log.unlink()
zip_size, t_zip = zip_single("MEDICUS", fbk, zipf)
fbk.unlink()
r["zip_size"] = zip_size
r["t_zip"] = t_zip
backed_up.append(r)
except Exception:
errors.append({"label": "MEDICUS", "fbk_size": 0, "zip_size": 0, "t_gbak": 0, "t_zip": 0, "error": traceback.format_exc()})
for f in (fbk, log):
if f.exists():
f.unlink()
# ----------------------------------------------------------
# 2) Externí DB MEDICUS_FILES_*.fdb → všechny do jednoho ZIP
# ----------------------------------------------------------
fdb_all = sorted(
set(EXT_DIR.glob("MEDICUS_FILES_*.fdb")) | set(EXT_DIR.glob("MEDICUS_FILES_*.FDB")),
key=lambda p: p.name.lower(),
)
ext_results = []
for fdb in fdb_all:
name = fdb.stem
fbk = BACKUP_DIR / f"{name}_{ts}.fbk"
log = BACKUP_DIR / f"{name}_{ts}.log"
db_conn = f"localhost/{FB_PORT}:{fdb}"
try:
r = run_gbak(name, db_conn, fbk, log)
log.unlink()
ext_results.append(r)
except Exception:
errors.append({"label": name, "fbk_size": 0, "zip_size": 0, "t_gbak": 0, "t_zip": 0, "error": traceback.format_exc()})
for f in (fbk, log):
if f.exists():
f.unlink()
# ZIP všechny externí FBK do jednoho souboru
if ext_results:
ext_zip = BACKUP_DIR / f"MEDICUS_FILES_{ts}.zip"
print(f"\nZIP externích DB → {ext_zip.name}")
try:
zip_size, t_zip = zip_multiple(ext_results, ext_zip)
for r in ext_results:
r["zip_size"] = zip_size # sdílená velikost výsledného ZIPu
r["t_zip"] = t_zip
r["fbk"].unlink()
backed_up.append(r)
except Exception:
errors.append({"label": "MEDICUS_FILES (zip)", "fbk_size": 0, "zip_size": 0, "t_gbak": 0, "t_zip": 0, "error": traceback.format_exc()})
for r in ext_results:
if r["fbk"].exists():
r["fbk"].unlink()
# ----------------------------------------------------------
# Report
# ----------------------------------------------------------
total = 1 + len(fdb_all)
report = [
f"Backup Medicus {now.strftime('%d.%m.%Y %H:%M')}",
f"Celkem DB: {total} | OK: {len(backed_up)} | Chyby: {len(errors)}",
f"Výstupní adresář: {BACKUP_DIR}",
"",
]
if backed_up:
report.append("--- Zálohováno ---")
# Hlavní DB
main_results = [r for r in backed_up if r["label"] == "MEDICUS"]
ext_backed = [r for r in backed_up if r["label"] != "MEDICUS"]
for r in main_results:
report.append(format_result(r))
if ext_backed:
total_ext_fbk = sum(r["fbk_size"] for r in ext_backed)
ext_zip_size = ext_backed[0]["zip_size"] if ext_backed else 0
ratio = 100 * (1 - ext_zip_size / total_ext_fbk) if total_ext_fbk else 0
report.append(f" Externí DB ({len(ext_backed)} souborů):")
for r in ext_backed:
report.append(f" {r['label']}: FBK {r['fbk_size']/1024/1024:.1f} MB (gbak {r['t_gbak']:.0f}s)")
report.append(
f" → společný ZIP: {ext_zip_size/1024/1024:.1f} MB "
f"({ratio:.0f}% komprese, zip {ext_backed[0]['t_zip']:.0f}s)"
)
total_zip = sum(r["zip_size"] for r in main_results) + (ext_backed[0]["zip_size"] if ext_backed else 0)
report.append(f" Celková velikost ZIP: {total_zip/1024/1024:.1f} MB")
report.append("")
if errors:
report.append("--- CHYBY ---")
for e in errors:
report.append(f" {e['label']}:\n{e['error']}")
report.append("")
has_errors = bool(errors)
subject = (
f"{'X' if has_errors else 'OK'} MEDICUS backup "
f"{len(backed_up)}/{total}"
+ (f" {len(errors)} chyb" if has_errors else "")
)
send_mail(MAIL_TO, subject, "\n".join(report))
print("\n" + "\n".join(report))
if errors:
raise RuntimeError(f"{len(errors)} backup(s) failed")
if __name__ == "__main__":
main()
+2 -2
View File
@@ -18,8 +18,8 @@ FB_USER = "SYSDBA"
FB_PASS = "masterkey"
FB_PORT = "3050"
SRC_DIR = Path(r"U:\externi")
BACKUP_DIR = Path(r"U:\externabackup")
SRC_DIR = Path(r"c:\medicusext")
BACKUP_DIR = Path(r"U:\medicusbackup")
MAIL_TO = "vladimir.buzalka@buzalka.cz"
+172
View File
@@ -0,0 +1,172 @@
import subprocess
import json
import os
from pathlib import Path
from datetime import datetime
import zipfile
import time
import traceback
from EmailMessagingGraph import send_mail
# ============================================================
# CONFIG
# ============================================================
GBAK = r"C:\Program Files\Firebird\Firebird_2_5_CGM\bin\gbak.exe"
FB_USER = "SYSDBA"
FB_PASS = "masterkey"
FB_PORT = "3050"
SRC_DIR = Path(r"c:\medicusext")
BACKUP_DIR = Path(r"U:\medicusbackup")
MAIL_TO = "vladimir.buzalka@buzalka.cz"
CHUNK = 8 * 1024 * 1024 # 8 MB
# ============================================================
# MAIN
# ============================================================
def main():
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
now = datetime.now()
ts = now.strftime("%Y-%m-%d_%H-%M-%S")
# Find all FDB files (case-insensitive)
fdb_files = sorted(SRC_DIR.glob("MEDICUS_FILES_*.fdb"))
fdb_upper = sorted(SRC_DIR.glob("MEDICUS_FILES_*.FDB"))
fdb_all = sorted(
set(fdb_files + fdb_upper),
key=lambda p: p.name.lower(),
)
backed_up = []
errors = []
for fdb in fdb_all:
name = fdb.stem
fbk = BACKUP_DIR / f"{name}_{ts}.fbk"
zipf = BACKUP_DIR / f"{name}_{ts}.zip"
log = BACKUP_DIR / f"{name}_{ts}.log"
result = {
"file": fdb.name,
"ok": False,
"fbk_size": 0,
"zip_size": 0,
"t_gbak": 0,
"t_zip": 0,
"error": None,
}
try:
# 1) GBAK
print(f"GBAK: {fdb.name} ... ", end="", flush=True)
t0 = time.time()
db_conn = f"localhost/{FB_PORT}:{fdb}"
cmd = [
GBAK, "-b",
"-user", FB_USER,
"-pas", FB_PASS,
db_conn, str(fbk),
"-v",
]
with open(log, "w", encoding="utf-8") as f:
subprocess.run(
cmd, stdout=f, stderr=subprocess.STDOUT, check=True,
)
result["t_gbak"] = time.time() - t0
result["fbk_size"] = fbk.stat().st_size
print(f"OK ({result['t_gbak']:.0f}s)")
# 2) ZIP
t1 = time.time()
processed = 0
fbk_size = result["fbk_size"]
with zipfile.ZipFile(
zipf, "w",
compression=zipfile.ZIP_DEFLATED,
compresslevel=9,
) as zf:
zi = zipfile.ZipInfo(fbk.name)
zi.compress_type = zipfile.ZIP_DEFLATED
with zf.open(zi, "w", force_zip64=True) as z:
with open(fbk, "rb") as src:
while buf := src.read(CHUNK):
z.write(buf)
processed += len(buf)
pct = processed * 100 / fbk_size
print(
f"\r ZIP {name}: {pct:6.2f}%",
end="", flush=True,
)
print()
result["t_zip"] = time.time() - t1
result["zip_size"] = zipf.stat().st_size
# 3) DELETE FBK + LOG
fbk.unlink()
log.unlink()
result["ok"] = True
backed_up.append(result)
except Exception:
result["error"] = traceback.format_exc()
errors.append(result)
for f in (fbk, log):
if f.exists():
f.unlink()
# Build report
report = []
report.append(f"Backup externi DB - {now.strftime('%d.%m.%Y %H:%M')}")
report.append(f"Celkem souboru: {len(fdb_all)}")
report.append(f"Zalohovano: {len(backed_up)}")
report.append(f"Chyby: {len(errors)}")
report.append("")
if backed_up:
report.append("--- Backed up ---")
total_zip = 0
for r in backed_up:
total_zip += r["zip_size"]
report.append(
f" {r['file']}: "
f"FBK {r['fbk_size']/1024/1024:.1f} MB -> "
f"ZIP {r['zip_size']/1024/1024:.1f} MB "
f"(gbak {r['t_gbak']:.0f}s, zip {r['t_zip']:.0f}s)"
)
report.append(f" Total ZIP: {total_zip / 1024 / 1024:.1f} MB")
report.append("")
if errors:
report.append("--- ERRORS ---")
for r in errors:
report.append(f" {r['file']}: {r['error']}")
report.append("")
# Send email
has_errors = len(errors) > 0
subject = (
f"{'X' if has_errors else 'OK'} "
f"MEDICUS externi DB - "
f"backup {len(backed_up)}/{len(fdb_all)}"
f"{f', {len(errors)} errors' if has_errors else ''}"
)
send_mail(MAIL_TO, subject, "\n".join(report))
print("\n" + "\n".join(report))
if errors:
raise RuntimeError(f"{len(errors)} backup(s) failed")
if __name__ == "__main__":
main()
+189
View File
@@ -0,0 +1,189 @@
import subprocess
import json
import os
from pathlib import Path
from datetime import datetime
import zipfile
import time
import traceback
from EmailMessagingGraph import send_mail
# ============================================================
# CONFIG
# ============================================================
GBAK = r"C:\Program Files\Firebird\Firebird_2_5_CGM\bin\gbak.exe"
FB_USER = "SYSDBA"
FB_PASS = "masterkey"
FB_PORT = "3050"
SRC_DIR = Path(r"c:\medicusext")
BACKUP_DIR = Path(r"U:\medicusbackup")
MAIL_TO = "vladimir.buzalka@buzalka.cz"
CHUNK = 8 * 1024 * 1024 # 8 MB
# ============================================================
# MAIN
# ============================================================
def main():
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
now = datetime.now()
ts = now.strftime("%Y-%m-%d_%H-%M-%S")
# Find all FDB files (case-insensitive)
fdb_files = sorted(SRC_DIR.glob("MEDICUS_FILES_*.fdb"))
fdb_upper = sorted(SRC_DIR.glob("MEDICUS_FILES_*.FDB"))
fdb_all = sorted(
set(fdb_files + fdb_upper),
key=lambda p: p.name.lower(),
)
backed_up = []
errors = []
fbk_paths = [] # FBK files to be zipped together
# --------------------------------------------------------
# 1) GBAK all databases
# --------------------------------------------------------
for fdb in fdb_all:
name = fdb.stem
fbk = BACKUP_DIR / f"{name}_{ts}.fbk"
log = BACKUP_DIR / f"{name}_{ts}.log"
result = {
"file": fdb.name,
"ok": False,
"fbk_size": 0,
"zip_size": 0,
"t_gbak": 0,
"t_zip": 0,
"error": None,
}
try:
print(f"GBAK: {fdb.name} ... ", end="", flush=True)
t0 = time.time()
db_conn = f"localhost/{FB_PORT}:{fdb}"
cmd = [
GBAK, "-b",
"-user", FB_USER,
"-pas", FB_PASS,
db_conn, str(fbk),
"-v",
]
with open(log, "w", encoding="utf-8") as f:
subprocess.run(
cmd, stdout=f, stderr=subprocess.STDOUT, check=True,
)
result["t_gbak"] = time.time() - t0
result["fbk_size"] = fbk.stat().st_size
print(f"OK ({result['t_gbak']:.0f}s)")
# Delete log, keep FBK for zipping
log.unlink()
result["ok"] = True
fbk_paths.append((fbk, result))
backed_up.append(result)
except Exception:
result["error"] = traceback.format_exc()
errors.append(result)
for f in (fbk, log):
if f.exists():
f.unlink()
# --------------------------------------------------------
# 2) ZIP all FBK files into one archive
# --------------------------------------------------------
total_zip_size = 0
if fbk_paths:
zip_path = BACKUP_DIR / f"MEDICUS_FILES_{ts}.zip"
print(f"\nZIP: {zip_path.name}")
t_zip_start = time.time()
# Calculate total size for progress
total_fbk_size = sum(fbk.stat().st_size for fbk, _ in fbk_paths)
total_processed = 0
with zipfile.ZipFile(
zip_path, "w",
compression=zipfile.ZIP_DEFLATED,
compresslevel=9,
) as zf:
for fbk, result in fbk_paths:
zi = zipfile.ZipInfo(fbk.name)
zi.compress_type = zipfile.ZIP_DEFLATED
with zf.open(zi, "w", force_zip64=True) as z:
with open(fbk, "rb") as src:
while buf := src.read(CHUNK):
z.write(buf)
total_processed += len(buf)
pct = total_processed * 100 / total_fbk_size
print(
f"\r {fbk.name}: {pct:6.2f}%",
end="", flush=True,
)
print()
t_zip_total = time.time() - t_zip_start
total_zip_size = zip_path.stat().st_size
print(f"ZIP OK ({t_zip_total:.0f}s, {total_zip_size/1024/1024:.1f} MB)")
# Fill zip_size into each result and delete FBK files
for fbk, result in fbk_paths:
result["zip_size"] = total_zip_size
fbk.unlink()
# --------------------------------------------------------
# Build report
# --------------------------------------------------------
report = []
report.append(f"Backup externi DB - {now.strftime('%d.%m.%Y %H:%M')}")
report.append(f"Celkem souboru: {len(fdb_all)}")
report.append(f"Zalohovano: {len(backed_up)}")
report.append(f"Chyby: {len(errors)}")
report.append("")
if backed_up:
report.append("--- Backed up ---")
total_fbk_mb = sum(r["fbk_size"] for r in backed_up) / 1024 / 1024
for r in backed_up:
report.append(
f" {r['file']}: "
f"FBK {r['fbk_size']/1024/1024:.1f} MB "
f"(gbak {r['t_gbak']:.0f}s)"
)
report.append(f" Total FBK: {total_fbk_mb:.1f} MB -> ZIP: {total_zip_size/1024/1024:.1f} MB")
report.append("")
if errors:
report.append("--- ERRORS ---")
for r in errors:
report.append(f" {r['file']}: {r['error']}")
report.append("")
# Send email
has_errors = len(errors) > 0
subject = (
f"{'X' if has_errors else 'OK'} "
f"MEDICUS externi DB - "
f"backup {len(backed_up)}/{len(fdb_all)}"
f"{f', {len(errors)} errors' if has_errors else ''}"
)
send_mail(MAIL_TO, subject, "\n".join(report))
print("\n" + "\n".join(report))
if errors:
raise RuntimeError(f"{len(errors)} backup(s) failed")
if __name__ == "__main__":
main()
+103
View File
@@ -361,3 +361,106 @@ Správný RTF formát klikacího odkazu:
- Napsat `rtf_to_text()` pro extrakci čistého textu z dekurzů
- Prozkoumat tabulky: LECH/LECD (léky?), POU (poukazy?), AMBULEKY (výkony?)
- První report domluvit s uživatelem co chce vidět
---
## KAR.POZNAMKA automatické tagy (2026-04-09)
Pole `POZNAMKA` v tabulce KAR je **BLOB SUB_TYPE 1** (text blob, win1250)
Medicus ho renderuje jako RTF stejně jako DEKURS. Lze psát i prostý text.
### Konvence automatických tagů
Automaticky zapisované informace se ukládají na **začátek** POZNAMKA ve formátu:
```
[[klic:hodnota1, hodnota2]] #zapsáno DD-MM-YYYY HH:MM#
```
- Oddělovače `[[` a `]]` se v lékařských poznámkách přirozeně nevyskytují
- Tag lze programově najít a nahradit regexem
- Ruční text lékaře/sestry pod tagem zůstává nedotčen
### Implementovaný tag: prev_prohlidka
```
[[prev_prohlidka:15-05-2024 01022, 15-04-2026]] #zapsáno 09-04-2026 14:30#
```
- `15-05-2024` = datum poslední PP
- `01022` = kód výkonu (01022 = dospělí, 01021 = starší varianta)
- `15-04-2026` = nejdřívější možný termín příští PP (23 měsíců od poslední)
- Skript: `MedicusWithClaudeTest/prev_prohlidka.py`
---
## Tabulka VZPARC výkonové dávky pojišťovnám (zjištěno 2026-04-09)
Zachyceno přes Firebird trace log při exportu dávek z Medicusu.
### Struktura
- **1954 záznamů**, data od 2007 (s DAVKA blobem od 2013)
- Klíčové sloupce: `ID`, `ROK`, `DRUH`, `DATUMOD`, `DATUMDO`, `DAVKA` (blob), `ICP`, `ODB`, `IDICZ`
- Vazba na ICZ přes `IDICZ`
### Typy dávek (DRUH)
| DRUH | Popis |
|---|---|
| `98` | Výkonová dávka obsahuje výkony vykázané pojišťovně (DP98 formát) |
| `05` | Doplňková dávka (DP05) |
| `80` | Registrační dávka přihlášení/odhlášení pacientů (DP80) |
| `36` | Jiný typ |
### DAVKA blob formát (CP852)
Stejný formát jako KDAVKA v PORTAL, dekódování identické:
```python
text = raw.encode('cp1250', errors='replace').decode('cp852', errors='replace')
```
Struktura DP98 dávky:
```
DP98... hlavička (IČP, rok, měsíc, disk, počet případů, částka)
A ... ambulantní případ: RC pacienta na pozici 34, délka 10
V ... výkon: V + DDMM + YYYY + KOD(5) + ...
Z ... ZULP
L ... lék
```
### Parsování RC z A řádku
```python
aktualni_rc = line[34:44].strip() # pevná pozice, délka 10
```
### Parsování výkonu z V řádku
```python
dd = int(line[1:3])
mm = int(line[3:5])
yyyy = int(line[5:9])
kod = line[9:14].strip() # 5znakový kód výkonu
```
### Poznámka: PORTAL vs VZPARC
- **PORTAL** starší tabulka, KDAVKA blob, data jen do 2015
- **VZPARC** správná tabulka pro výkonové dávky, data od 2013 do dnes
- Pro hledání výkonů per pacient vždy používat **VZPARC**
---
## Skript prev_prohlidka.py (MedicusWithClaudeTest/)
Účel: najde datum poslední preventivní prohlídky (PP) pro každého registrovaného
pacienta a zapíše/aktualizuje tag v KAR.POZNAMKA.
### Co dělá
1. Načte registrované pacienty (přesný dotaz přes REGISTR + ICP, IČP=09305001)
2. Projde všechny VZPARC DRUH=98 dávky, hledá výkony `01022` nebo `01021`
3. Pro každého pacienta zachová jen **nejnovější** datum PP
4. Zapíše/přepíše tag `[[prev_prohlidka:...]]` na začátek KAR.POZNAMKA
### Výsledek posledního spuštění (2026-04-09)
- Registrovaných pacientů: 1621
- Dávek VZPARC DRUH=98: 778
- Pacientů s nalezenou PP: 1289
- Zapsáno do KAR: 1120
- Nenalezeno (odregistrovaní nebo bez PP): 169
### Závislosti
```
pip install python-dateutil
```
(fdb již musí být nainstalováno)
+2 -2
View File
@@ -8,8 +8,8 @@ conn = fdb.connect(
password="masterkey",
charset="win1250")
cesta = r"u:\NextcloudOrdinace\Dokumentace_ke_zpracování"
cestazpracovana = r"u:\NextcloudOrdinace\Dokumentace_zpracovaná"
cesta = r"u:\testimport"
cestazpracovana = r"u:\testimportzpracovana"
# Konstanty pro detekci sekce Vložené přílohy (RTF kódování win1250)
PRILOHY_HEADER = r"Vlo\'9een\'e9 p\'f8\'edlohy:"
+397
View File
@@ -0,0 +1,397 @@
import os, shutil, fdb, time, threading
import re, datetime, funkce, funkce_ext
# Connect to the Firebird database
conn = fdb.connect(
dsn=r'localhost:c:\medicus 3\data\medicus.fdb',
user='SYSDBA',
password="masterkey",
charset="win1250")
cesta = r"u:\testimport"
cestazpracovana = r"u:\testimportzpracovana"
# Konstanty pro detekci sekce Vložené přílohy (RTF kódování win1250)
PRILOHY_HEADER = r"Vlo\'9een\'e9 p\'f8\'edlohy:"
PRILOHY_CLOSING = r'\pard\s10\plain\cs15\f0\fs20 \par'
# ─── Helper funkce ────────────────────────────────────────────────────────────
def restore_files_for_import(retezec):
drop = r"u:\Dropbox\!!!Days\Downloads Z230\Dokumentace"
next = r"u:\NextcloudOrdinace\Dokumentace_ke_zpracování"
if not os.path.exists(drop):
print(f"The directory '{drop}' does not exist.")
return
for item in os.listdir(drop):
item_path = os.path.join(drop, item)
if os.path.isfile(item_path) or os.path.islink(item_path):
os.unlink(item_path)
print(f"Deleted file: {item_path}")
elif os.path.isdir(item_path):
shutil.rmtree(item_path)
print(f"Deleted directory: {item_path}")
for item in os.listdir(next):
item_path = os.path.join(next, item)
if os.path.isfile(item_path) and item_path.endswith(".pdf") and retezec in item_path:
shutil.copy(item_path, os.path.join(drop, item))
print(f"Copied file: {item_path}")
def kontrola_rc(rc, connection):
cur = connection.cursor()
cur.execute("select count(*),idpac from kar where rodcis=? group by idpac", (rc,))
row = cur.fetchone()
if row:
return row[1]
else:
return False
def kontrola_struktury(souborname, connection):
if souborname.endswith('.pdf'):
pattern = re.compile(r'(^\d{9,10}) (\d{4}-\d{2}-\d{2}) (\w+, \w.+?) \[(.+?)\] \[(.*?)\]')
match = pattern.search(souborname)
vpohode = True
if match and len(match.groups()) == 5:
datum = match.group(2)
try:
datetime.datetime.strptime(datum, "%Y-%m-%d").date()
except:
vpohode = False
return vpohode
cur = connection.cursor()
cur.execute("select count(*) from kar where rodcis=?", (match.group(1),))
row = cur.fetchone()[0]
if row != 1:
vpohode = False
return vpohode
else:
vpohode = False
return vpohode
else:
vpohode = False
return vpohode
return vpohode
def vrat_info_o_souboru(souborname, connection):
pattern = re.compile(r'(^\d{9,10}) (\d{4}-\d{2}-\d{2}) (\w+, \w.+?) \[(.+?)\] \[(.*?)\]')
match = pattern.search(souborname)
rc = match.group(1)
datum = datetime.datetime.strptime(match.group(2), "%Y-%m-%d").date()
jmeno = match.group(3)
prvnizavorka = match.group(4)
druhazavorka = match.group(5)
cur = connection.cursor()
cur.execute("select idpac from kar where rodcis=?", (rc,))
idpac = cur.fetchone()[0]
datumsouboru = datetime.datetime.fromtimestamp(os.path.getctime(os.path.join(cesta, souborname)))
return (rc, idpac, datum, jmeno, prvnizavorka, druhazavorka, souborname, datumsouboru)
def prejmenuj_chybny_soubor(souborname, cesta):
if souborname[0] != "":
soubornovy = "" + souborname
os.rename(os.path.join(cesta, souborname), os.path.join(cesta, soubornovy))
def _pokus_o_zamek(dekurs_id, vysledek):
"""Běží ve vlákně: pokusí se zamknout dekurz přes separátní spojení.
Výsledek zapíše do slovníku vysledek: {'ok': True} nebo {'chyba': str}.
Pokud vlákno stále běží po uplynutí timeoutu → záznam je zamčený.
"""
conn_t = None
try:
conn_t = fdb.connect(
dsn=r'localhost:c:\medicus 3\data\medicus.fdb',
user='SYSDBA', password='masterkey', charset='win1250'
)
cur_t = conn_t.cursor()
cur_t.execute(
"SELECT ID FROM DEKURS WHERE ID = ? FOR UPDATE WITH LOCK",
(dekurs_id,)
)
cur_t.fetchone()
conn_t.rollback() # Uvolni zámek sloužil jen k ověření
vysledek['ok'] = True
except Exception as e:
vysledek['chyba'] = str(e)
finally:
if conn_t:
try:
conn_t.close()
except Exception:
pass
def zkus_zamknout_dnesni_dekurs(conn, idpac, datum_vlozeni, timeout_sec=2):
"""Zjistí zda existuje dnešní dekurz a ověří že není zamčený.
Vrátí:
(id, rtf) dnešní dekurz existuje a není zamčený
None žádný dnešní dekurz (bude se dělat INSERT, zámek není potřeba)
Vyhodí RuntimeError pokud je záznam zamčený jiným uživatelem (Medicus ho má otevřený).
Poznámka: NOWAIT transakci fdb neumí spolehlivě nastavit, proto spustíme
pokus o zámek ve vlákně s timeoutem. Pokud vlákno do timeout_sec sekund
neskončí, záznam je zamčený a přeskočíme celou skupinu.
"""
cur = conn.cursor()
# Krok 1: přečti ID, datum a obsah posledního dekurzu (běžný SELECT)
cur.execute("""
SELECT FIRST 1 ID, DATUM, DEKURS FROM DEKURS
WHERE IDPAC = ?
ORDER BY ID DESC
""", (idpac,))
row = cur.fetchone()
if row is None:
print(f" Žádný dekurz pro pacienta IDPAC={idpac}")
return None
dekurs_id, dekurs_datum, dekurs_rtf = row
print(f" Poslední dekurs: ID={dekurs_id}, datum={dekurs_datum}")
if dekurs_datum != datum_vlozeni:
print(f" → jiný den ({dekurs_datum}{datum_vlozeni}), vytvoříme nový (INSERT)")
return None
# Krok 2: ověř přes vlákno s timeoutem zda záznam není zamčený
print(f" → dnešní den ({datum_vlozeni}) ✓ ověřuji zámek (timeout {timeout_sec}s)...")
vysledek = {}
t = threading.Thread(target=_pokus_o_zamek, args=(dekurs_id, vysledek), daemon=True)
t.start()
t.join(timeout=timeout_sec)
if t.is_alive():
# Vlákno stále čeká na zámek = záznam drží Medicus
raise RuntimeError(f"DEKURZ ID={dekurs_id} je zamčený (Medicus má záznam otevřený)")
if 'chyba' in vysledek:
raise fdb.DatabaseError(vysledek['chyba'])
print(f" → záznam volný, pokračuji se zápisem")
return (dekurs_id, dekurs_rtf)
def ma_sekci_prilohy(rtf):
return PRILOHY_HEADER in rtf
def pridat_do_sekce_prilohy(rtf, bookmark_list, filenameforbookmark_list):
"""Přidá více souborů do EXISTUJÍCÍ sekce 'Vložené přílohy'.
Postup:
1. Spočítá počet Files: odkazů = N → nové indexy začínají od N
2. Vloží nové \\pard řádky před uzavírací prázdný řádek sekce
3. Přidá nové bookmarky na konec {\\info{\\bookmarks ...}}
"""
# 1. Počet existujících Files: odkazů
bkm_match = re.search(r'\{\\info\{\\bookmarks ([^}]*)\}\}', rtf)
if bkm_match:
bkm_entries = [e for e in bkm_match.group(1).split(';') if e.strip()]
n_files = sum(1 for e in bkm_entries if '"Files:' in e)
else:
n_files = 0
print(f" Počet existujících Files odkazů: {n_files}, přidávám {len(bookmark_list)} nových")
# 2. Vložit nové \pard řádky před PRILOHY_CLOSING
prilohy_pos = rtf.find(PRILOHY_HEADER)
closing_pos = rtf.find(PRILOHY_CLOSING, prilohy_pos)
if closing_pos == -1:
raise RuntimeError("Nenalezen uzavírací řádek sekce Vložené přílohy!")
new_pards = ''
for i, fname in enumerate(filenameforbookmark_list):
idx = n_files + i
new_pards += (r'\pard\s10{\*\bkmkstart ' + str(idx) + r'}'
r'\plain\cs32\f0\ul\fs20\cf1 ' + fname
+ r'{\*\bkmkend ' + str(idx) + r'}\par' + '\n')
rtf = rtf[:closing_pos] + new_pards + rtf[closing_pos:]
# 3. Přidat nové bookmarky na konec {\info{\bookmarks ...}}
def append_bookmarks(m):
entries = [e for e in m.group(1).split(';') if e.strip()]
entries.extend(bookmark_list)
return '{\\info{\\bookmarks ' + ';'.join(entries) + '}}'
rtf = re.sub(r'\{\\info\{\\bookmarks ([^}]*)\}\}', append_bookmarks, rtf)
return rtf
def merge_rtf_prepend(existing_rtf, new_bkm_list, new_body_pards, n_new):
"""Vloží novou sekci příloh na ZAČÁTEK stávajícího dekurzu (sekce tam ještě není)."""
rtf = existing_rtf
rtf = re.sub(r'\\bkmkstart (\d+)',
lambda m: '\\bkmkstart ' + str(int(m.group(1)) + n_new), rtf)
rtf = re.sub(r'\\bkmkend (\d+)',
lambda m: '\\bkmkend ' + str(int(m.group(1)) + n_new), rtf)
new_bkm_str = ';'.join(new_bkm_list)
def merge_bkm(m):
existing = m.group(1).strip()
combined = new_bkm_str + (';' + existing if existing else '')
return '{\\info{\\bookmarks ' + combined + '}}'
if re.search(r'\{\\info\{\\bookmarks', rtf):
rtf = re.sub(r'\{\\info\{\\bookmarks ([^}]*)\}\}', merge_bkm, rtf)
else:
rtf = re.sub(r'(\\deflang\d+)',
r'\1{\\info{\\bookmarks ' + new_bkm_str + '}}', rtf, count=1)
match = re.search(r'\\uc1\\pard', rtf)
if match:
pos = match.start()
rtf = rtf[:pos] + new_body_pards + '\n' + rtf[pos:]
return rtf
# Šablona RTF pro nový dekurs
RTF_TEMPLATE = r"""{\rtf1\ansi\ansicpg1250\uc1\deff0\deflang1029{\info{\bookmarks BOOKMARKNAMES}}{\fonttbl{\f0\fnil\fcharset238 Arial;}{\f5\fnil\fcharset238 Symbol;}}
{\colortbl ;\red0\green0\blue255;\red0\green128\blue0;\red0\green0\blue0;}
{\stylesheet{\s10\fi0\li0\ql\ri0\sb0\sa0 Vlevo;}{\*\cs15\f0\fs20 Norm\'e1ln\'ed;}{\*\cs20\f0\i\fs20 Z\'e1hlav\'ed;}{\*\cs32\f0\ul\fs20\cf1 Odkaz;}}
BOOKMARKSTEXT
\pard\s10\plain\cs15\f0\fs20 \par
}"""
# ─── Hlavní tělo skriptu ──────────────────────────────────────────────────────
info = []
for soubor in os.listdir(cesta):
if os.path.isfile(os.path.join(cesta, soubor)):
print(soubor)
if kontrola_struktury(soubor, conn):
info.append(vrat_info_o_souboru(soubor, conn))
else:
prejmenuj_chybny_soubor(soubor, cesta)
info = sorted(info, key=lambda x: (x[0], x[1]))
print(info)
skupiny = {}
for row in info:
skupiny[row[0]] = []
for row in info:
skupiny[row[0]].append(row)
for key in skupiny.keys():
print(f"\n{'='*60}")
print(f"RC: {key}, souborů: {len(skupiny[key])}")
idpac = skupiny[key][0][1]
datumzapisu = datetime.datetime.now().date()
caszapisu = datetime.datetime.now().time()
# ── PRE-CHECK: zkus zamknout dnešní dekurz PŘED zpracováním souborů ──────
print(f"\n>>> Kontrola zámku dekurzu pro IDPAC={idpac}...")
try:
existujici = zkus_zamknout_dnesni_dekurs(conn, idpac, datumzapisu)
except RuntimeError as e:
# Vlákno nepřišlo do timeoutu = záznam drží Medicus
print(f"\n!!! DEKURZ ZAMČEN soubory skupiny RC={key} přeskočeny.")
print(" Spusťte skript znovu až bude záznam volný.")
continue
except fdb.DatabaseError as e:
chyba = str(e).lower()
if 'deadlock' in chyba or 'lock conflict' in chyba or 'update conflict' in chyba:
print(f"\n!!! DEKURZ ZAMČEN (DB konflikt) soubory skupiny RC={key} přeskočeny.")
print(" Spusťte skript znovu až bude záznam volný.")
continue
raise # jiná DB chyba propaguj dál
cislo = 9
poradi = 0
bookmark_list = []
filenameforbookmark_list = []
bookmarks_body = ''
# ── Krok 1: vložit každý soubor do ext DB + přesunout do zpracovaných ────
for row in skupiny[key]:
fileid = funkce_ext.zapis_file_ext(
vstupconnection=conn, idpac=row[1],
cesta=cesta, souborname=row[6], prvnizavorka=row[4],
soubordate=row[2], souborfiledate=row[7], poznamka=row[5])
print(f" → FILES.ID = {fileid} ({row[6]})")
# Přesun souboru do zpracovaných
for attempt in range(3):
try:
dest = os.path.join(cestazpracovana, row[6])
if not os.path.exists(dest):
shutil.move(os.path.join(cesta, row[6]), dest)
else:
ts = datetime.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
shutil.move(os.path.join(cesta, row[6]),
os.path.join(cestazpracovana, row[6][:-4] + " " + ts + ".pdf"))
print(" Přesun OK!")
break
except Exception as e:
print(f" Attempt {attempt + 1} failed: {e}")
if attempt < 2:
print(" Retrying in 5 seconds...")
time.sleep(5)
else:
print(" Max retries reached. Command failed.")
filenameforbookmark = row[2].strftime('%Y-%m-%d') + ' ' + row[4] + ': ' + row[5]
bookmark_list.append('"' + filenameforbookmark + '","Files:' + str(fileid) + '",' + str(cislo))
filenameforbookmark_list.append(filenameforbookmark)
cislo += 7
bookmarks_body += (r'\pard\s10{\*\bkmkstart ' + str(poradi) + r'}'
r'\plain\cs32\f0\ul\fs20\cf1 ' + filenameforbookmark
+ r'{\*\bkmkend ' + str(poradi) + r'}\par')
poradi += 1
# ── Krok 2: sestavit tělo nové sekce příloh ───────────────────────────────
new_body = (r'\uc1\pard\s10\plain\cs20\f0\i\fs20 Vlo\'9een\'e9 p\'f8\'edlohy:\par' + '\n'
+ bookmarks_body + '\n'
+ r'\pard\s10\plain\cs15\f0\fs20 \par')
# ── Krok 3: rozhodovací logika (3 případy) ────────────────────────────────
cur = conn.cursor()
if existujici:
dekurs_id, existing_rtf = existujici
if ma_sekci_prilohy(existing_rtf):
# Případ 1: dnešní dekurz má sekci příloh → přidáme soubory dovnitř
print(f"\n>>> Sekce 'Vložené přílohy' nalezena v DEKURS ID={dekurs_id}")
print(">>> Přidávám soubory DO existující sekce...")
merged_rtf = pridat_do_sekce_prilohy(existing_rtf, bookmark_list, filenameforbookmark_list)
else:
# Případ 2: dnešní dekurz existuje, ale sekci příloh nemá → prepend
print(f"\n>>> DEKURS ID={dekurs_id} nemá sekci příloh → vkládám sekci na začátek...")
merged_rtf = merge_rtf_prepend(existing_rtf, bookmark_list, new_body, len(skupiny[key]))
print("\n=== Výsledný RTF ===")
print(merged_rtf)
cur.execute("UPDATE DEKURS SET DEKURS = ? WHERE ID = ?", (merged_rtf, dekurs_id))
conn.commit()
print(f"\n>>> UPDATE DEKURS ID={dekurs_id} hotovo!")
else:
# Případ 3: žádný dnešní dekurz → vytvoříme nový
print(f"\n>>> Žádný dekurs pro dnešek → vytvářím nový...")
bookmark_str = ';'.join(bookmark_list)
rtf = RTF_TEMPLATE.replace('BOOKMARKNAMES', bookmark_str)
rtf = rtf.replace('BOOKMARKSTEXT', new_body)
print("\n=== Výsledný RTF ===")
print(rtf)
dekursid = funkce.get_dekurs_id(conn)
cur.execute(
"INSERT INTO DEKURS (id, iduzi, idprac, idodd, idpac, datum, cas, dekurs)"
" VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(dekursid, 6, 2, 2, idpac, datumzapisu, caszapisu, rtf)
)
conn.commit()
print(f"\n>>> Nový DEKURS ID={dekursid}")
print("\n=== HOTOVO ===")
conn.close()
+397
View File
@@ -0,0 +1,397 @@
import os, shutil, fdb, time, threading
import re, datetime, funkce, funkce_ext
# Connect to the Firebird database
conn = fdb.connect(
dsn=r'localhost:c:\medicus 3\data\medicus.fdb',
user='SYSDBA',
password="masterkey",
charset="win1250")
cesta = r"u:\testimport"
cestazpracovana = r"u:\testimportzpracovana"
# Konstanty pro detekci sekce Vložené přílohy (RTF kódování win1250)
PRILOHY_HEADER = r"Vlo\'9een\'e9 p\'f8\'edlohy:"
PRILOHY_CLOSING = r'\pard\s10\plain\cs15\f0\fs20 \par'
# ─── Helper funkce ────────────────────────────────────────────────────────────
def restore_files_for_import(retezec):
drop = r"u:\Dropbox\!!!Days\Downloads Z230\Dokumentace"
next = r"u:\NextcloudOrdinace\Dokumentace_ke_zpracování"
if not os.path.exists(drop):
print(f"The directory '{drop}' does not exist.")
return
for item in os.listdir(drop):
item_path = os.path.join(drop, item)
if os.path.isfile(item_path) or os.path.islink(item_path):
os.unlink(item_path)
print(f"Deleted file: {item_path}")
elif os.path.isdir(item_path):
shutil.rmtree(item_path)
print(f"Deleted directory: {item_path}")
for item in os.listdir(next):
item_path = os.path.join(next, item)
if os.path.isfile(item_path) and item_path.endswith(".pdf") and retezec in item_path:
shutil.copy(item_path, os.path.join(drop, item))
print(f"Copied file: {item_path}")
def kontrola_rc(rc, connection):
cur = connection.cursor()
cur.execute("select count(*),idpac from kar where rodcis=? group by idpac", (rc,))
row = cur.fetchone()
if row:
return row[1]
else:
return False
def kontrola_struktury(souborname, connection):
if souborname.endswith('.pdf'):
pattern = re.compile(r'(^\d{9,10}) (\d{4}-\d{2}-\d{2}) (\w+, \w.+?) \[(.+?)\] \[(.*?)\]')
match = pattern.search(souborname)
vpohode = True
if match and len(match.groups()) == 5:
datum = match.group(2)
try:
datetime.datetime.strptime(datum, "%Y-%m-%d").date()
except:
vpohode = False
return vpohode
cur = connection.cursor()
cur.execute("select count(*) from kar where rodcis=?", (match.group(1),))
row = cur.fetchone()[0]
if row != 1:
vpohode = False
return vpohode
else:
vpohode = False
return vpohode
else:
vpohode = False
return vpohode
return vpohode
def vrat_info_o_souboru(souborname, connection):
pattern = re.compile(r'(^\d{9,10}) (\d{4}-\d{2}-\d{2}) (\w+, \w.+?) \[(.+?)\] \[(.*?)\]')
match = pattern.search(souborname)
rc = match.group(1)
datum = datetime.datetime.strptime(match.group(2), "%Y-%m-%d").date()
jmeno = match.group(3)
prvnizavorka = match.group(4)
druhazavorka = match.group(5)
cur = connection.cursor()
cur.execute("select idpac from kar where rodcis=?", (rc,))
idpac = cur.fetchone()[0]
datumsouboru = datetime.datetime.fromtimestamp(os.path.getctime(os.path.join(cesta, souborname)))
return (rc, idpac, datum, jmeno, prvnizavorka, druhazavorka, souborname, datumsouboru)
def prejmenuj_chybny_soubor(souborname, cesta):
if souborname[0] != "":
soubornovy = "" + souborname
os.rename(os.path.join(cesta, souborname), os.path.join(cesta, soubornovy))
def _pokus_o_zamek(dekurs_id, vysledek):
"""Běží ve vlákně: pokusí se zamknout dekurz přes separátní spojení.
Výsledek zapíše do slovníku vysledek: {'ok': True} nebo {'chyba': str}.
Pokud vlákno stále běží po uplynutí timeoutu → záznam je zamčený.
"""
conn_t = None
try:
conn_t = fdb.connect(
dsn=r'localhost:c:\medicus 3\data\medicus.fdb',
user='SYSDBA', password='masterkey', charset='win1250'
)
cur_t = conn_t.cursor()
cur_t.execute(
"SELECT ID FROM DEKURS WHERE ID = ? FOR UPDATE WITH LOCK",
(dekurs_id,)
)
cur_t.fetchone()
conn_t.rollback() # Uvolni zámek sloužil jen k ověření
vysledek['ok'] = True
except Exception as e:
vysledek['chyba'] = str(e)
finally:
if conn_t:
try:
conn_t.close()
except Exception:
pass
def zkus_zamknout_dnesni_dekurs(conn, idpac, datum_vlozeni, timeout_sec=2):
"""Zjistí zda existuje dnešní dekurz a ověří že není zamčený.
Vrátí:
(id, rtf) dnešní dekurz existuje a není zamčený
None žádný dnešní dekurz (bude se dělat INSERT, zámek není potřeba)
Vyhodí RuntimeError pokud je záznam zamčený jiným uživatelem (Medicus ho má otevřený).
Poznámka: NOWAIT transakci fdb neumí spolehlivě nastavit, proto spustíme
pokus o zámek ve vlákně s timeoutem. Pokud vlákno do timeout_sec sekund
neskončí, záznam je zamčený a přeskočíme celou skupinu.
"""
cur = conn.cursor()
# Krok 1: přečti ID, datum a obsah posledního dekurzu (běžný SELECT)
cur.execute("""
SELECT FIRST 1 ID, DATUM, DEKURS FROM DEKURS
WHERE IDPAC = ?
ORDER BY ID DESC
""", (idpac,))
row = cur.fetchone()
if row is None:
print(f" Žádný dekurz pro pacienta IDPAC={idpac}")
return None
dekurs_id, dekurs_datum, dekurs_rtf = row
print(f" Poslední dekurs: ID={dekurs_id}, datum={dekurs_datum}")
if dekurs_datum != datum_vlozeni:
print(f" → jiný den ({dekurs_datum}{datum_vlozeni}), vytvoříme nový (INSERT)")
return None
# Krok 2: ověř přes vlákno s timeoutem zda záznam není zamčený
print(f" → dnešní den ({datum_vlozeni}) ✓ ověřuji zámek (timeout {timeout_sec}s)...")
vysledek = {}
t = threading.Thread(target=_pokus_o_zamek, args=(dekurs_id, vysledek), daemon=True)
t.start()
t.join(timeout=timeout_sec)
if t.is_alive():
# Vlákno stále čeká na zámek = záznam drží Medicus
raise RuntimeError(f"DEKURZ ID={dekurs_id} je zamčený (Medicus má záznam otevřený)")
if 'chyba' in vysledek:
raise fdb.DatabaseError(vysledek['chyba'])
print(f" → záznam volný, pokračuji se zápisem")
return (dekurs_id, dekurs_rtf)
def ma_sekci_prilohy(rtf):
return PRILOHY_HEADER in rtf
def pridat_do_sekce_prilohy(rtf, bookmark_list, filenameforbookmark_list):
"""Přidá více souborů do EXISTUJÍCÍ sekce 'Vložené přílohy'.
Postup:
1. Spočítá počet Files: odkazů = N → nové indexy začínají od N
2. Vloží nové \\pard řádky před uzavírací prázdný řádek sekce
3. Přidá nové bookmarky na konec {\\info{\\bookmarks ...}}
"""
# 1. Počet existujících Files: odkazů
bkm_match = re.search(r'\{\\info\{\\bookmarks ([^}]*)\}\}', rtf)
if bkm_match:
bkm_entries = [e for e in bkm_match.group(1).split(';') if e.strip()]
n_files = sum(1 for e in bkm_entries if '"Files:' in e)
else:
n_files = 0
print(f" Počet existujících Files odkazů: {n_files}, přidávám {len(bookmark_list)} nových")
# 2. Vložit nové \pard řádky před PRILOHY_CLOSING
prilohy_pos = rtf.find(PRILOHY_HEADER)
closing_pos = rtf.find(PRILOHY_CLOSING, prilohy_pos)
if closing_pos == -1:
raise RuntimeError("Nenalezen uzavírací řádek sekce Vložené přílohy!")
new_pards = ''
for i, fname in enumerate(filenameforbookmark_list):
idx = n_files + i
new_pards += (r'\pard\s10{\*\bkmkstart ' + str(idx) + r'}'
r'\plain\cs32\f0\ul\fs20\cf1 ' + fname
+ r'{\*\bkmkend ' + str(idx) + r'}\par' + '\n')
rtf = rtf[:closing_pos] + new_pards + rtf[closing_pos:]
# 3. Přidat nové bookmarky na konec {\info{\bookmarks ...}}
def append_bookmarks(m):
entries = [e for e in m.group(1).split(';') if e.strip()]
entries.extend(bookmark_list)
return '{\\info{\\bookmarks ' + ';'.join(entries) + '}}'
rtf = re.sub(r'\{\\info\{\\bookmarks ([^}]*)\}\}', append_bookmarks, rtf)
return rtf
def merge_rtf_prepend(existing_rtf, new_bkm_list, new_body_pards, n_new):
"""Vloží novou sekci příloh na ZAČÁTEK stávajícího dekurzu (sekce tam ještě není)."""
rtf = existing_rtf
rtf = re.sub(r'\\bkmkstart (\d+)',
lambda m: '\\bkmkstart ' + str(int(m.group(1)) + n_new), rtf)
rtf = re.sub(r'\\bkmkend (\d+)',
lambda m: '\\bkmkend ' + str(int(m.group(1)) + n_new), rtf)
new_bkm_str = ';'.join(new_bkm_list)
def merge_bkm(m):
existing = m.group(1).strip()
combined = new_bkm_str + (';' + existing if existing else '')
return '{\\info{\\bookmarks ' + combined + '}}'
if re.search(r'\{\\info\{\\bookmarks', rtf):
rtf = re.sub(r'\{\\info\{\\bookmarks ([^}]*)\}\}', merge_bkm, rtf)
else:
rtf = re.sub(r'(\\deflang\d+)',
r'\1{\\info{\\bookmarks ' + new_bkm_str + '}}', rtf, count=1)
match = re.search(r'\\uc1\\pard', rtf)
if match:
pos = match.start()
rtf = rtf[:pos] + new_body_pards + '\n' + rtf[pos:]
return rtf
# Šablona RTF pro nový dekurs
RTF_TEMPLATE = r"""{\rtf1\ansi\ansicpg1250\uc1\deff0\deflang1029{\info{\bookmarks BOOKMARKNAMES}}{\fonttbl{\f0\fnil\fcharset238 Arial;}{\f5\fnil\fcharset238 Symbol;}}
{\colortbl ;\red0\green0\blue255;\red0\green128\blue0;\red0\green0\blue0;}
{\stylesheet{\s10\fi0\li0\ql\ri0\sb0\sa0 Vlevo;}{\*\cs15\f0\fs20 Norm\'e1ln\'ed;}{\*\cs20\f0\i\fs20 Z\'e1hlav\'ed;}{\*\cs32\f0\ul\fs20\cf1 Odkaz;}}
BOOKMARKSTEXT
\pard\s10\plain\cs15\f0\fs20 \par
}"""
# ─── Hlavní tělo skriptu ──────────────────────────────────────────────────────
info = []
for soubor in os.listdir(cesta):
if os.path.isfile(os.path.join(cesta, soubor)):
print(soubor)
if kontrola_struktury(soubor, conn):
info.append(vrat_info_o_souboru(soubor, conn))
else:
prejmenuj_chybny_soubor(soubor, cesta)
info = sorted(info, key=lambda x: (x[0], x[1]))
print(info)
skupiny = {}
for row in info:
skupiny[row[0]] = []
for row in info:
skupiny[row[0]].append(row)
for key in skupiny.keys():
print(f"\n{'='*60}")
print(f"RC: {key}, souborů: {len(skupiny[key])}")
idpac = skupiny[key][0][1]
datumzapisu = datetime.datetime.now().date()
caszapisu = datetime.datetime.now().time()
# ── PRE-CHECK: zkus zamknout dnešní dekurz PŘED zpracováním souborů ──────
print(f"\n>>> Kontrola zámku dekurzu pro IDPAC={idpac}...")
try:
existujici = zkus_zamknout_dnesni_dekurs(conn, idpac, datumzapisu)
except RuntimeError as e:
# Vlákno nepřišlo do timeoutu = záznam drží Medicus
print(f"\n!!! DEKURZ ZAMČEN soubory skupiny RC={key} přeskočeny.")
print(" Spusťte skript znovu až bude záznam volný.")
continue
except fdb.DatabaseError as e:
chyba = str(e).lower()
if 'deadlock' in chyba or 'lock conflict' in chyba or 'update conflict' in chyba:
print(f"\n!!! DEKURZ ZAMČEN (DB konflikt) soubory skupiny RC={key} přeskočeny.")
print(" Spusťte skript znovu až bude záznam volný.")
continue
raise # jiná DB chyba propaguj dál
cislo = 9
poradi = 0
bookmark_list = []
filenameforbookmark_list = []
bookmarks_body = ''
# ── Krok 1: vložit každý soubor do ext DB + přesunout do zpracovaných ────
for row in skupiny[key]:
fileid = funkce_ext.zapis_file_ext(
vstupconnection=conn, idpac=row[1],
cesta=cesta, souborname=row[6], prvnizavorka=row[4],
soubordate=row[2], souborfiledate=row[7], poznamka=row[5])
print(f" → FILES.ID = {fileid} ({row[6]})")
# Přesun souboru do zpracovaných
for attempt in range(3):
try:
dest = os.path.join(cestazpracovana, row[6])
if not os.path.exists(dest):
shutil.move(os.path.join(cesta, row[6]), dest)
else:
ts = datetime.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
shutil.move(os.path.join(cesta, row[6]),
os.path.join(cestazpracovana, row[6][:-4] + " " + ts + ".pdf"))
print(" Přesun OK!")
break
except Exception as e:
print(f" Attempt {attempt + 1} failed: {e}")
if attempt < 2:
print(" Retrying in 5 seconds...")
time.sleep(5)
else:
print(" Max retries reached. Command failed.")
filenameforbookmark = row[2].strftime('%Y-%m-%d') + ' ' + row[4] + ': ' + row[5]
bookmark_list.append('"' + filenameforbookmark + '","Files:' + str(fileid) + '",' + str(cislo))
filenameforbookmark_list.append(filenameforbookmark)
cislo += 7
bookmarks_body += (r'\pard\s10{\*\bkmkstart ' + str(poradi) + r'}'
r'\plain\cs32\f0\ul\fs20\cf1 ' + filenameforbookmark
+ r'{\*\bkmkend ' + str(poradi) + r'}\par')
poradi += 1
# ── Krok 2: sestavit tělo nové sekce příloh ───────────────────────────────
new_body = (r'\uc1\pard\s10\plain\cs20\f0\i\fs20 Vlo\'9een\'e9 p\'f8\'edlohy:\par' + '\n'
+ bookmarks_body + '\n'
+ r'\pard\s10\plain\cs15\f0\fs20 \par')
# ── Krok 3: rozhodovací logika (3 případy) ────────────────────────────────
cur = conn.cursor()
if existujici:
dekurs_id, existing_rtf = existujici
if ma_sekci_prilohy(existing_rtf):
# Případ 1: dnešní dekurz má sekci příloh → přidáme soubory dovnitř
print(f"\n>>> Sekce 'Vložené přílohy' nalezena v DEKURS ID={dekurs_id}")
print(">>> Přidávám soubory DO existující sekce...")
merged_rtf = pridat_do_sekce_prilohy(existing_rtf, bookmark_list, filenameforbookmark_list)
else:
# Případ 2: dnešní dekurz existuje, ale sekci příloh nemá → prepend
print(f"\n>>> DEKURS ID={dekurs_id} nemá sekci příloh → vkládám sekci na začátek...")
merged_rtf = merge_rtf_prepend(existing_rtf, bookmark_list, new_body, len(skupiny[key]))
print("\n=== Výsledný RTF ===")
print(merged_rtf)
cur.execute("UPDATE DEKURS SET DEKURS = ? WHERE ID = ?", (merged_rtf, dekurs_id))
conn.commit()
print(f"\n>>> UPDATE DEKURS ID={dekurs_id} hotovo!")
else:
# Případ 3: žádný dnešní dekurz → vytvoříme nový
print(f"\n>>> Žádný dekurs pro dnešek → vytvářím nový...")
bookmark_str = ';'.join(bookmark_list)
rtf = RTF_TEMPLATE.replace('BOOKMARKNAMES', bookmark_str)
rtf = rtf.replace('BOOKMARKSTEXT', new_body)
print("\n=== Výsledný RTF ===")
print(rtf)
dekursid = funkce.get_dekurs_id(conn)
cur.execute(
"INSERT INTO DEKURS (id, iduzi, idprac, idodd, idpac, datum, cas, dekurs)"
" VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(dekursid, 6, 2, 2, idpac, datumzapisu, caszapisu, rtf)
)
conn.commit()
print(f"\n>>> Nový DEKURS ID={dekursid}")
print("\n=== HOTOVO ===")
conn.close()
@@ -0,0 +1,226 @@
# s03soubory_01_FINAL.py dokumentace
**Finální verze importního skriptu pro vkládání PDF dokumentů do dekurzů Medicusu.**
Datum finalizace: 2026-04-04
Autor: Vladimír Buzalka + Claude (Anthropic)
---
## Co skript dělá
Zpracuje PDF soubory v určené složce (`cesta`) a pro každý soubor:
1. Ověří správnost názvu souboru (formát, RC, datum)
2. Zkontroluje, zda cílový dekurz není zamčený v Medicusu
3. Zapíše soubor do externí databáze souborů (tabulka FILES)
4. Přesune soubor do složky zpracovaných
5. Vloží odkaz (bookmark) do dekurzu pacienta jako RTF záznam
---
## Adresáře
| Proměnná | Cesta (testovací) | Popis |
|---|---|---|
| `cesta` | `u:\testimport` | Vstupní složka sem patří soubory ke zpracování |
| `cestazpracovana` | `u:\testimportzpracovana` | Cílová složka sem se přesouvají zpracované soubory |
> V produkci tyto cesty nahradit skutečnými složkami (Nextcloud/Dropbox).
---
## Formát názvu souboru
Každý PDF soubor musí mít název ve tvaru:
```
RC YYYY-MM-DD Příjmení, Jméno [typ dokumentu] [poznámka].pdf
```
**Příklad:**
```
7309208104 2020-10-16 Buzalka, Vladimír [LZ ortopedie] [VAS LS páteře, obstřik].pdf
```
| Část | Popis |
|---|---|
| `RC` | Rodné číslo pacienta (9 nebo 10 číslic) musí existovat v tabulce KAR |
| `YYYY-MM-DD` | Datum dokumentu |
| `Příjmení, Jméno` | Jméno pacienta (jen pro čitelnost, nepoužívá se k vyhledání) |
| `[typ dokumentu]` | První závorka druh nálezu (LZ ortopedie, EKG, Lab. nález…) |
| `[poznámka]` | Druhá závorka krátký popis obsahu (může být prázdná `[]`) |
**Chybný soubor** (špatný název, RC nenalezeno v DB) je přejmenován přidáním prefixu `♥`:
```
♥chybny soubor.pdf
```
Skript ho přeskočí a nechá v složce pro ruční opravu.
---
## Databázové tabulky
| Tabulka | DB | Popis |
|---|---|---|
| `KAR` | Medicus (`medicus.fdb`) | Kartotéka pacientů lookup RC → IDPAC |
| `DEKURS` | Medicus (`medicus.fdb`) | Dekurzy čtení a zápis RTF záznamu |
| `FILES` | Externí DB (`MEDICUS_FILES_YYYYMM.fdb`) | Binární uložení PDF souborů |
---
## Klíčová novinka oproti s03soubory.py ochrana před zamčeným dekurzem
### Problém
Medicus drží **exkluzivní zámek** (Firebird row lock) na záznamu tabulky DEKURS po celou dobu, kdy má lékařka pacienta otevřeného. Kdyby skript provedl `UPDATE DEKURS` do zamčeného záznamu, přepsal by lékařčiny neuložené změny.
### Řešení detekce zámku pomocí vlákna s timeoutem
Firebird neumí NOWAIT nastavit per-statement v SQL (syntaxe `FOR UPDATE WITH LOCK NOWAIT` není platná). Nastavení NOWAIT je vlastnost transakce, nikoliv dotazu. Knihovna `fdb` navíc toto nastavení spolehlivě nepodporuje.
**Zvolené řešení:** spuštění pokusu o zámek ve vedlejším vlákně s timeoutem 2 sekundy.
```
hlavní vlákno vedlejší vlákno (_pokus_o_zamek)
───────────────── ─────────────────────────────────
t.start() ──────► fdb.connect() [nové spojení]
t.join(timeout=2s) SELECT ... FOR UPDATE WITH LOCK
├── záznam volný → fetchone() → rollback() → konec
└── záznam zamčený → čeká (blokuje)...
─────────────────
po 2 sekundách:
t.is_alive()?
ANO → záznam zamčený → RuntimeError → přeskoč skupinu
NE → záznam volný → pokračuj se zápisem
```
### Důležitý detail pořadí operací
**Kontrola zámku probíhá PŘED zápisem do FILES a PŘED přesunem souboru.**
Kdyby se pořadí obrátilo, mohlo by dojít k situaci:
- soubor zapsán do FILES ✓
- soubor přesunut do zpracovaných ✓
- dekurz zamčen → UPDATE selže
- soubor je pryč ze vstupní složky, ale odkaz v dekurzu chybí
Správné pořadí:
```
1. Zkontroluj zámek dekurzu (NOWAIT)
└── zamčeno → přeskoč (soubory zůstanou v cesta)
2. Zapiš soubory do ext. DB (FILES)
3. Přesuň soubory do zpracovaných
4. Sestav RTF
5. UPDATE nebo INSERT do DEKURS
```
---
## Logika vkládání do dekurzu 3 případy
Po úspěšné kontrole zámku skript rozhodne, co s dekurzem udělat:
```
Existuje dnešní dekurz pro pacienta?
├── ANO → obsahuje sekci "Vložené přílohy"?
│ ├── ANO → Případ 1: přidá nové soubory DOVNITŘ sekce
│ └── NE → Případ 2: vloží celou sekci na ZAČÁTEK dekurzu (prepend)
└── NE → Případ 3: vytvoří NOVÝ dekurz ze šablony RTF_TEMPLATE
```
### Případ 1 `pridat_do_sekce_prilohy()`
Dnešní dekurz **existuje a má** sekci „Vložené přílohy".
- Spočítá kolik odkazů (Files:) už sekce obsahuje → nové indexy bookmarků navazují
- Nové `\pard` řádky vloží **před** uzavírací prázdný řádek sekce (`PRILOHY_CLOSING`)
- Nové bookmarky přidá na **konec** `{\info{\bookmarks ...}}`
- Provede `UPDATE DEKURS SET DEKURS = ? WHERE ID = ?`
### Případ 2 `merge_rtf_prepend()`
Dnešní dekurz **existuje, ale nemá** sekci příloh (lékařka do něj napsala text).
- Přečísluje existující bookmarky (posunutí o počet nových souborů)
- Novou sekci „Vložené přílohy" vloží **na začátek** těla RTF (před `\uc1\pard`)
- Nové bookmarky předřadí před existující v `{\info{\bookmarks ...}}`
- Lékařčin text zůstane zachován, jen se posune níž
- Provede `UPDATE DEKURS SET DEKURS = ? WHERE ID = ?`
### Případ 3 nový INSERT
Pro dnešní datum **neexistuje žádný dekurz**.
- Vyplní `RTF_TEMPLATE` (bookmarky + tělo sekce příloh)
- Provede `INSERT INTO DEKURS (id, iduzi, idprac, idodd, idpac, datum, cas, dekurs)`
- `iduzi=6` (Vladimír Buzalka), `idprac=2`, `idodd=2`
---
## RTF formát dekurzu
### Struktura bookmarku
Každý přiložený soubor je reprezentován jako:
1. **Bookmark entry** v `{\info{\bookmarks ...}}`:
```
"2020-10-16 LZ ortopedie: VAS LS páteře, obstřik","Files:21923",9
```
- `"popis"` zobrazený text odkazu
- `"Files:ID"` odkaz na záznam v tabulce FILES (slouží Medicusu k načtení souboru)
- `9` číslo fontu/stylu (od 9, každý další +7)
2. **Vizuální řádek** v těle RTF:
```rtf
\pard\s10{\*\bkmkstart 0}\plain\cs32\f0\ul\fs20\cf1 2020-10-16 LZ ortopedie: VAS LS páteře, obstřik{\*\bkmkend 0}\par
```
- `\bkmkstart N` / `\bkmkend N` index bookmarku (0, 1, 2…)
- `\cs32\ul\cf1` styl „Odkaz" (modrý podtržený text)
### Konstanty pro detekci sekce příloh (win1250 RTF escape)
```python
PRILOHY_HEADER = r"Vlo\'9een\'e9 p\'f8\'edlohy:" # "Vložené přílohy:"
PRILOHY_CLOSING = r'\pard\s10\plain\cs15\f0\fs20 \par' # uzavírací prázdný řádek
```
---
## Funkce přehled
| Funkce | Popis |
|---|---|
| `restore_files_for_import(retezec)` | Debug utilita vrátí soubory z Nextcloudu zpět do Dropboxu. Nepoužívá se v produkci. |
| `kontrola_rc(rc, connection)` | Ověří zda RC existuje v KAR, vrátí IDPAC nebo False. |
| `kontrola_struktury(souborname, connection)` | Ověří formát názvu souboru a existenci RC v DB. |
| `vrat_info_o_souboru(souborname, connection)` | Parsuje název souboru, dohledá IDPAC, vrátí tuple s metadaty. |
| `prejmenuj_chybny_soubor(souborname, cesta)` | Přidá prefix `` k chybnému souboru. |
| `_pokus_o_zamek(dekurs_id, vysledek)` | Interní běží ve vlákně, zkouší zamknout dekurz. |
| `zkus_zamknout_dnesni_dekurs(conn, idpac, datum_vlozeni, timeout_sec=2)` | Zjistí zda dnešní dekurz existuje a není zamčený. Vyhodí RuntimeError pokud je zamčený. |
| `ma_sekci_prilohy(rtf)` | Vrátí True pokud RTF obsahuje sekci „Vložené přílohy". |
| `pridat_do_sekce_prilohy(rtf, bookmark_list, filenameforbookmark_list)` | Případ 1 přidá soubory do existující sekce příloh. |
| `merge_rtf_prepend(existing_rtf, new_bkm_list, new_body_pards, n_new)` | Případ 2 vloží sekci příloh na začátek existujícího dekurzu. |
---
## Ošetření chyb
| Situace | Chování |
|---|---|
| Soubor má chybný název | Přejmenován na `♥soubor.pdf`, přeskočen |
| RC nenalezeno v KAR | Přejmenován na `♥soubor.pdf`, přeskočen |
| Dekurz zamčený (timeout vlákna) | Skupina přeskočena, soubory zůstanou v `cesta` |
| DB konflikt při zamykání (-913 deadlock) | Skupina přeskočena, soubory zůstanou v `cesta` |
| Přesun souboru selže | 3 pokusy s 5s pauzou, poté varování |
| Jiná DB chyba | Výjimka se propaguje, skript havaruje |
---
## Vývoj a testování
| Verze | Soubor | Co přibilo |
|---|---|---|
| Prototyp | `test_import_FINAL.py` | Ruční zadání IDPAC a DATUM, ověření RTF logiky |
| v1 | `s03soubory.py` | Automatický parsing RC z názvu, dávkování po skupinách |
| **v1 FINAL** | `s03soubory_01_FINAL.py` | Ochrana před zamčeným dekurzem (threading + timeout) |
### Jak byl objeven problém se zámky
Experimentem bylo ověřeno, že Medicus drží Firebird row lock na záznamu DEKURS po celou dobu, kdy má lékařka pacienta otevřeného (`SELECT FIRST 1 ... FOR UPDATE WITH LOCK` z Pythonu čekalo dokud lékařka neuložila). NOWAIT nelze nastavit přes SQL syntaxi ani spolehlivě přes fdb TPB bajty, proto bylo zvoleno řešení přes vlákno s timeoutem.
@@ -0,0 +1,156 @@
# test_import_FINAL.py detailní dokumentace
## Co skript dělá
Importuje PDF soubory (lékařské zprávy) do Medicus DB. Konkrétně:
1. Uloží fyzický soubor do **externí Firebird DB** (tabulka FILES)
2. Vloží nebo aktualizuje **dekurs pacienta** (tabulka DEKURS) s klikacím RTF odkazem na soubor
---
## Vstupní data (konfigurace nahoře)
```python
CESTA = r'u:\\' # adresář se zdrojovými PDF soubory
IDPAC = 9742 # ID pacienta v DB
DATUM = datetime.date(2026, 3, 18) # datum zprávy (ne dnešek!)
SOUBORY = [
{
'souborname': 'název souboru.pdf',
'prvnizavorka': 'typ zprávy', # např. "vyšetření"
'druhazavorka': 'poznámka', # volný text
'datum': DATUM,
},
...
]
```
Pozor: `DATUM` je datum zprávy (ne dnešek). Podle tohoto data se hledá existující dekurs.
---
## Rozhodovací logika 3 scénáře
```
Poslední dekurs pacienta
├─ z JINÉHO dne / neexistuje
│ └─→ SCÉNÁŘ 3: vytvoří nový dekurs
└─ z DNEŠNÍHO dne (= DATUM)
├─ MÁ sekci "Vložené přílohy"
│ └─→ SCÉNÁŘ 1: přidá odkaz DO existující sekce
└─ NEMÁ sekci "Vložené přílohy"
└─→ SCÉNÁŘ 2: prepend nové sekce na začátek
```
Klíčová funkce pro detekci: `ma_sekci_prilohy(rtf)` hledá RTF string `Vlo\'9een\'e9 p\'f8\'edlohy:` (= „Vložené přílohy:" zakódováno win1250).
---
## Krok 1 uložení souboru do ext DB
Volá `funkce_ext.zapis_file_ext(...)` pro každý soubor. Vrátí `fileid` (ID záznamu v tabulce FILES).
Z každého souboru se postaví:
- **bookmark entry** pro `{\info{\bookmarks ...}}` blok RTF:
`"2026-03-18 vyšetření: poznámka","Files:1234",9`
- **RTF pard** (klikací odkaz) pro tělo dekurzu:
`\pard\s10{\*\bkmkstart 0}\plain\cs32\f0\ul\fs20\cf1 2026-03-18 vyšetření: poznámka{\*\bkmkend 0}\par`
Číslo `cislo` začíná na 9 a roste po 7 (interní Medicus konvence). Index `poradi` (bkmkstart) začíná na 0 a roste po 1.
---
## Krok 2 práce s dekurzem
### Scénář 1: přidání DO existující sekce (`pridat_do_sekce_prilohy`)
Situace: dnešní dekurs již má blok „Vložené přílohy" s nějakými odkazy.
Postup:
1. Spočítá počet existujících `Files:` odkazů v `{\info{\bookmarks}}` → to je index nového (`new_idx`)
2. Posune všechny `\bkmkstart N` / `\bkmkend N` kde `N >= new_idx` o +1 (uvolní místo)
3. Vloží nový `\pard` řádek **před** uzavírací `\pard\s10\plain\cs15\f0\fs20 \par` sekce
4. Vloží nový bookmark na pozici `new_idx` v `{\info{\bookmarks}}`
Výsledek: soubor se přidá na konec existujícího seznamu příloh, indexy zůstanou konzistentní.
### Scénář 2: prepend nové sekce (`merge_rtf_prepend`)
Situace: dnešní dekurs existuje, ale ještě nemá blok příloh.
Postup:
1. Posune všechny existující `\bkmkstart N` / `\bkmkend N` o +n_new (počet nových souborů)
2. Přidá nové bookmarky **na začátek** `{\info{\bookmarks}}` bloku
- Pokud `{\info{\bookmarks}}` neexistuje, vloží ho za `\deflang1029`
3. Vloží nové tělo (záhlaví „Vložené přílohy:" + řádky s odkazy) **před** první `\uc1\pard` těla stávajícího dekurzu
Výsledek: sekce příloh je viditelně nahoře, stávající text dekurzu zůstane pod ní.
### Scénář 3: nový dekurs
Situace: žádný dnešní dekurs neexistuje.
Sestaví RTF šablonu s:
- `{\info{\bookmarks ...}}` všechny bookmarky
- záhlaví „Vložené přílohy:" + klikací řádky
- uzavírací prázdný řádek
Vloží jako nový řádek do tabulky DEKURS s `iduzi=6, idprac=2, idodd=2` (Vladimír Buzalka, ordinace).
---
## RTF formát dekurzu
```rtf
{\rtf1\ansi\ansicpg1250\uc1\deff0\deflang1029
{\info{\bookmarks "2026-03-18 vyšetření: poznámka","Files:1234",9}}
{\fonttbl{\f0\fnil\fcharset238 Arial;} ...}
{\colortbl ;\red0\green0\blue255; ...}
{\stylesheet ... {\*\cs32\f0\ul\fs20\cf1 Odkaz;}}
\uc1\pard\s10\plain\cs20\f0\i\fs20 Vložené přílohy:\par
\pard\s10{\*\bkmkstart 0}\plain\cs32\f0\ul\fs20\cf1 2026-03-18 vyšetření: poznámka{\*\bkmkend 0}\par
\pard\s10\plain\cs15\f0\fs20 \par
}
```
- **cs20** = kurzíva (záhlaví sekce)
- **cs32** = podtržený modrý text (klikací odkaz)
- **cs15** = normální text
- `\cf1` = modrá barva (první v colortbl)
---
## Závislosti
| Import | Odkud | Co dělá |
|--------|-------|---------|
| `funkce_ext.zapis_file_ext` | `funkce_ext.py` | Uloží soubor do ext DB (tabulka FILES), vrátí fileid |
| `funkce.get_dekurs_id` | `funkce.py` | Vrátí nové ID pro INSERT do tabulky DEKURS |
| `fdb` | pip | Připojení k Firebird DB |
---
## Tabulky v DB
| Tabulka | DB | Popis |
|---------|----|-------|
| `DEKURS` | hlavní (`medicus.fdb`) | Záznamy dekurzu, pole `DEKURS` obsahuje RTF text |
| `FILES` | ext DB (`MEDICUS_FILES_*.fdb`) | Binární obsah souborů |
---
## Jak spustit
Skript se spouští jednorázově na Windows stroji s přístupem k Firebird DB. Před spuštěním:
1. Upravit `SOUBORY` seznam PDF souborů ke zpracování
2. Zkontrolovat `IDPAC`, `DATUM`, `CESTA`
3. Ověřit, že PDF soubory fyzicky existují na `CESTA`
Po spuštění ověřit v Medicus: karta pacienta → záložka Dekurzy → kliknout na odkaz.
+140
View File
@@ -0,0 +1,140 @@
# Dekurzy report dokumentace
## Co report dělá
Generuje Excel soubor s přehledem všech dekurzů z ordinace MUDr. Buzalkové za zadané období.
Hlavní list **Dekurz** zobrazuje každý dekurz jako jeden řádek. Čísla v sloupcích jsou klikatelné hyperlinkové zkratky, které přeskočí na příslušný detailní list (Recepty, Výkony, Soubory apod.).
---
## Spuštění
```
python dekurz_report.py
```
**Vstupní parametry** (nastavit přímo v souboru):
| Proměnná | Výchozí hodnota | Popis |
|---|---|---|
| `DATUM_OD` | `2025-01-01` | Začátek období |
| `DATUM_DO` | dnešní datum | Konec období (automaticky) |
| `VYSTUPNI_ADRESAR` | `u:\Dropbox\Ordinace\Reporty` | Kam se ukládá |
| `NAZEV_REPORTU` | `Dekurzy` | Část názvu souboru |
**Výstupní soubor:** `YYYY-MM-DD HH-MM-SS Dekurzy.xlsx`
Starý soubor se stejným názvem je automaticky smazán.
---
## Zdroj dat
Databáze Firebird: `localhost:c:\medicus 3\data\medicus.fdb`
Připojení: SYSDBA / masterkey, charset win1250
### Hlavní dotaz
```sql
SELECT d.DATUM, d.CAS, u.ZKRATKA, k.PRIJMENI, k.JMENO, k.RODCIS, k.POJ, d.DEKURS
FROM DEKURS d
JOIN KAR k ON k.IDPAC = d.IDPAC
LEFT JOIN UZIVATEL u ON u.IDUZI = d.IDUZI
WHERE d.DATUM >= '2025-01-01' AND d.DATUM <= dnes
ORDER BY d.DATUM DESC, d.CAS DESC, k.PRIJMENI, k.JMENO
```
Řazení: nejnovější záznamy nahoře.
---
## Jak funguje parsování RTF bookmarků
Každý dekurz je uložen jako RTF blob ve sloupci `DEKURS.DEKURS`.
Medicus do RTF hlavičky zapisuje **bookmarky** hypertextové odkazy na propojené záznamy:
```
{\info{\bookmarks "ATORIS","Rec:322528",17;"01543","VykA:189603",8}}
```
Formát: `"název","TYP:ID",číslo_stylu`
Skript parsuje regex `"([^"]+)","([A-Za-z]+):(\d+)"` a extrahuje typ a ID záznamu.
### Typy bookmarků a jejich tabulky
| Bookmark | List v Excelu | Tabulka v DB | PK | Zobrazované sloupce |
|---|---|---|---|---|
| `Rec` | Recepty | `RECEPT` | `ID` | LEK, DSIG (lék, dávkování) |
| `VykA` | Výkony | `DOKLADD` | `ID` | KOD, DDGN (kód výkonu, diagnóza) |
| `Files` | Soubory | `FILES` | `ID` | FILENAME, DATUM |
| `MEDLAB` | MedLab | `HISTDOC` | `ID` | DATUM, TYP (žádanka do laboratoře) |
| `Lab` | Lab | `LABVH` | `IDVH` | DATUM, CISLO (výsledky laboratoře) |
| `Ock` | Očkování | `OCKZAZ` | `ID` | DATUM, LATKA (vakcína) |
| `Nes` | Neschop. | `NES` | `ID` | ZACNES, KONNES (od do) |
| `Lec` | Léčiva | `LECD` | `ID` | KOD, DATOSE (léčivo podané v ordinaci) |
| `SpecVys` | SpecVys | `SPECVYS` | `IDSPECVYS` | TYP, DATUM (Tonotrack, holter…) |
| `PlaPac` | Platby | `PLA` | `IDPLA` | DATUM, CENA, DOKLAD |
| ostatní | Ostatní | | | TYP, ID, Název (formuláře, poukazy…) |
**Ostatní typy** (méně časté):
`ORTOPE` ePoukaz na ortopedickou pomůcku
`ZDRINF` Žádost o předání zdravotních informací
`PROHLAS` Prohlášení
`POTDPN` Potvrzení DPN
`MOTORVO` Posudek motorového vozidla
`LAZPEC` Lázně
`VypZdrD` Výpis ze zdravotní dokumentace
`VYMLIST` Výměnný list
`PouRTG` Poukaz RTG
`ZPUPRN` Způsobilost k práci/řízení
`EPOSMRO` ePosudek MRO
`ZNESUP` Potvrzení neschopnosti uchazeče o zaměstnání
> Všechny výše uvedené jdou do listu **Ostatní** s uvedením typu, ID a názvu.
---
## Struktura Excel souboru
### List Dekurz (hlavní)
| Sloupec | Zdroj | Popis |
|---|---|---|
| Datum | `DEKURS.DATUM` | Datum dekurzu |
| Čas | `DEKURS.CAS` | Čas (HH:MM) |
| Lékař | `UZIVATEL.ZKRATKA` | MBU / VBU / ISE |
| Jméno | `KAR.PRIJMENI + JMENO` | Formát: `Příjmení, I.` |
| Rodné číslo | `KAR.RODCIS` | |
| Pojišťovna | `KAR.POJ` | Kód pojišťovny (111, 201…) |
| Rec … PlaPac | RTF bookmark | Počet záznamů **klikací hyperlink** na detailní list |
| Ostatní | RTF bookmark | Počet ostatních typů hyperlink na list Ostatní |
### Detailní listy
Každý list má:
- Záhlaví s vlastní barevnou kombinací
- Sloupce: Datum, Jméno + specifické sloupce dle typu
- Střídání bílých a barevných řádků
- Tenké šedé ohraničení všech buněk
- Zmrazený první řádek
- Autofiltr
---
## Technické poznámky
- Firebird limit `IN (...)` je 1500 hodnot dotazy na detaily se automaticky dělí do dávek po 1000
- RTF blob je čten přes `blob.read()` nebo přímo jako string
- Jméno pacienta: `Příjmení, I.` (iniciála prvního písmene jména)
- Chybová hláška `BlobReader.close: invalid BLOB handle` je neškodná GC uzavírá handlery po odpojení DB
---
## Scheduled Task
Spouštěcí příkaz:
```
python "C:\Users\vlado\PycharmProjects\Medicus\MedicusWithClaudeDekurz\dekurz_report.py"
```
Doporučené spouštění: každý den ráno (např. 6:00), aby byl vždy čerstvý soubor v Dropboxu.
@@ -0,0 +1,352 @@
import sys, io, re, os, glob
from datetime import date, datetime, timedelta
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
import fdb
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
VYSTUPNI_ADRESAR = r'u:\Dropbox\Ordinace\Reporty'
NAZEV_REPORTU = 'Dekurzy_TEST'
DATUM_OD = (date.today() - timedelta(days=100)).strftime('%Y-%m-%d')
DATUM_DO = date.today().strftime('%Y-%m-%d')
conn = fdb.connect(
dsn=r'localhost:c:\medicus 3\data\medicus.fdb',
user='SYSDBA', password='masterkey', charset='win1250'
)
cur = conn.cursor()
cur.execute(f"""
SELECT d.DATUM, d.CAS, u.ZKRATKA, k.PRIJMENI, k.JMENO, k.RODCIS, k.POJ, d.DEKURS
FROM DEKURS d
JOIN KAR k ON k.IDPAC = d.IDPAC
LEFT JOIN UZIVATEL u ON u.IDUZI = d.IDUZI
WHERE d.DATUM >= '{DATUM_OD}' AND d.DATUM <= '{DATUM_DO}'
ORDER BY d.DATUM DESC, d.CAS DESC, k.PRIJMENI, k.JMENO
""")
raw_rows = cur.fetchall()
TOP_TYPY = ['VykA', 'Rec', 'Files', 'MEDLAB', 'Lab', 'Ock', 'Nes', 'Lec', 'SpecVys', 'PlaPac']
def rtf_na_text(rtf):
"""Jednoduchý převod RTF na čistý text."""
# Odstraň info blok (bookmarky apod.)
text = re.sub(r'\{\\info.*?\}', '', rtf, flags=re.DOTALL)
# Odstraň ostatní skupiny v {} rekurzivně (fonty, barvy apod.)
for _ in range(6):
text = re.sub(r'\{[^{}]*\}', '', text)
# Nový řádek za \par \line
text = re.sub(r'\\par\b\s*', '\n', text)
text = re.sub(r'\\line\b\s*', '\n', text)
# Dekóduj RTF hex escape sekvence (\'xx) jako cp1250 → správná čeština
def decode_hex(m):
try:
return bytes.fromhex(m.group(1)).decode('cp1250')
except Exception:
return ''
text = re.sub(r"\\'([0-9a-fA-F]{2})", decode_hex, text)
# Odstraň ostatní RTF příkazy
text = re.sub(r'\\[a-zA-Z]+\-?[0-9]*\s?', '', text)
text = re.sub(r'[{}\\]', '', text)
# Vyčisti prázdné řádky a whitespace
lines = [l.strip() for l in text.splitlines()]
lines = [l for l in lines if l]
return '\n'.join(lines)
# Parse dekurzů
rows = []
for datum, cas, zkratka, prijmeni, jmeno, rodcis, poj, dekurs_blob in raw_rows:
rtf = dekurs_blob.read() if hasattr(dekurs_blob, 'read') else (dekurs_blob or '')
pocty = {}
ids_by_typ = {t: [] for t in TOP_TYPY}
ids_ostatni = []
for nazev, typ, rid in re.findall(r'"([^"]+)","([A-Za-z]+):(\d+)"', rtf):
pocty[typ] = pocty.get(typ, 0) + 1
if typ in ids_by_typ:
ids_by_typ[typ].append(int(rid))
else:
ids_ostatni.append((typ, int(rid), nazev))
top = [pocty.get(t, 0) for t in TOP_TYPY]
ostatni = sum(v for k, v in pocty.items() if k not in TOP_TYPY)
iniciala = jmeno[0] + '.' if jmeno and jmeno.strip() else ''
jmeno_cel = f"{prijmeni.strip()}, {iniciala}" if prijmeni else iniciala
text_dekurzu = rtf_na_text(rtf)
rows.append((datum, cas, zkratka, jmeno_cel, rodcis, poj, top, ostatni, ids_by_typ, ids_ostatni, text_dekurzu))
# ── Načtení detailů z DB ────────────────────────────────────────────────────
def fetch_details(cur, table, pk, id_col, fields, ids):
if not ids:
return {}
result = {}
batch_size = 1000
for i in range(0, len(ids), batch_size):
batch = ids[i:i+batch_size]
ph = ','.join('?' * len(batch))
cur.execute(f"SELECT {pk}, {','.join(fields)} FROM {table} WHERE {id_col} IN ({ph})", batch)
for row in cur.fetchall():
result[row[0]] = row[1:]
return result
def get_ids(rows, typ):
return list({rid for *_, ids_by_typ, _, _text in rows for rid in ids_by_typ[typ]})
rec_det = fetch_details(cur, 'RECEPT', 'ID', 'ID', ['LEK','DSIG'], get_ids(rows,'Rec'))
vyka_det = fetch_details(cur, 'DOKLADD', 'ID', 'ID', ['KOD','DDGN','BODY'], get_ids(rows,'VykA'))
files_det = fetch_details(cur, 'FILES', 'ID', 'ID', ['FILENAME','DATUM'], get_ids(rows,'Files'))
medlab_det = fetch_details(cur, 'HISTDOC', 'ID', 'ID', ['DATUM','TYP'], get_ids(rows,'MEDLAB'))
lab_det = fetch_details(cur, 'LABVH', 'IDVH', 'IDVH', ['DATUM','CISLO'], get_ids(rows,'Lab'))
ock_det = fetch_details(cur, 'OCKZAZ', 'ID', 'ID', ['DATUM','LATKA'], get_ids(rows,'Ock'))
nes_det = fetch_details(cur, 'NES', 'ID', 'ID', ['ZACNES','KONNES'], get_ids(rows,'Nes'))
lec_det = fetch_details(cur, 'LECD', 'ID', 'ID', ['KOD','DATOSE'], get_ids(rows,'Lec'))
spec_det = fetch_details(cur, 'SPECVYS', 'IDSPECVYS','IDSPECVYS',['TYP','DATUM'], get_ids(rows,'SpecVys'))
pla_det = fetch_details(cur, 'PLA', 'IDPLA', 'IDPLA', ['DATUM','CENA','DOKLAD'], get_ids(rows,'PlaPac'))
conn.close()
print(f"Načteno {len(rows)} dekurzů (období: {DATUM_OD} {DATUM_DO})")
# ── Styly ──────────────────────────────────────────────────────────────────
tenka_cara = Side(style='thin', color='AAAAAA')
ohraniceni = Border(left=tenka_cara, right=tenka_cara, top=tenka_cara, bottom=tenka_cara)
hl_font = Font(bold=True, color="FFFFFF")
hl_fill = PatternFill("solid", fgColor="2E75B6")
r_fill = [PatternFill("solid", fgColor="FFFFFF"), PatternFill("solid", fgColor="DCE6F1")]
BARVY_LISTU = {
'Recepty': ('1F6B33', 'E2EFDA'),
'Výkony': ('2E4057', 'D6E4F0'),
'Soubory': ('7B3F00', 'FAE5D3'),
'Žádanky': ('4A235A', 'F5EEF8'),
'Lab výsl.': ('145A32', 'D5F5E3'),
'Očkování': ('7E5109', 'FDEBD0'),
'Neschop.': ('922B21', 'FADBD8'),
'Léčiva': ('1A5276', 'D6EAF8'),
'SpecVys': ('0B5345', 'D1F2EB'),
'Platby': ('4D5656', 'EAECEE'),
'Ostatní': ('2C3E50', 'EBF5FB'),
}
def zapis_hlavicku(ws, sloupce, sirky, barva_hex):
hl_fill_l = PatternFill("solid", fgColor=barva_hex)
for col, (nazev, sirka) in enumerate(zip(sloupce, sirky), start=1):
cell = ws.cell(row=1, column=col, value=nazev)
cell.font = hl_font
cell.fill = hl_fill_l
cell.alignment = Alignment(horizontal='center')
cell.border = ohraniceni
ws.column_dimensions[cell.column_letter].width = sirka
def zapis_radek(ws, row_i, hodnoty, zarovnani, barva_hex):
fill = PatternFill("solid", fgColor="FFFFFF") if row_i % 2 == 0 \
else PatternFill("solid", fgColor=barva_hex)
for col_i, (val, align) in enumerate(zip(hodnoty, zarovnani), start=1):
cell = ws.cell(row=row_i, column=col_i, value=val)
cell.fill = fill
cell.border = ohraniceni
cell.alignment = Alignment(horizontal=align)
if col_i == 1 and isinstance(val, __import__('datetime').date):
cell.number_format = 'DD.MM.YYYY'
def hyperlink_cell(ws, row_i, col_i, cil_list, cil_radek, text, barva_hex):
fill = PatternFill("solid", fgColor="FFFFFF") if row_i % 2 == 0 \
else PatternFill("solid", fgColor=barva_hex)
cell = ws.cell(row=row_i, column=col_i)
cell.value = str(text)
# Název listu s mezerou musí být v apostrofech
sheet_ref = f"'{cil_list}'" if ' ' in cil_list else cil_list
cell.hyperlink = f'#{sheet_ref}!A{cil_radek}'
cell.font = Font(color="0000FF", underline='single')
cell.fill = fill
cell.border = ohraniceni
cell.alignment = Alignment(horizontal='center')
# ── Workbook ───────────────────────────────────────────────────────────────
wb = openpyxl.Workbook()
LISTY = [
('Recepty', 'Rec', rec_det, ['Datum','Jméno','Recept','Dávkování'], [12,25,25,12], None),
('Výkony', 'VykA', vyka_det, ['Datum','Jméno','Kód výkonu','Diagnóza','Body'], [12,25,14,10,8], None),
('Soubory', 'Files', files_det, ['Datum','Jméno','Soubor','Datum souboru'], [12,25,35,14], None),
('Žádanky', 'MEDLAB', medlab_det, ['Datum','Jméno','Typ'], [12,25,15], None),
('Lab výsl.', 'Lab', lab_det, ['Datum','Jméno','Číslo'], [12,25,20], None),
('Očkování', 'Ock', ock_det, ['Datum','Jméno','Datum očkování','Vakcína'], [12,25,14,30], None),
('Neschop.', 'Nes', nes_det, ['Datum','Jméno','Od','Do'], [12,25,12,12], None),
('Léčiva', 'Lec', lec_det, ['Datum','Jméno','Kód','Datum výkonu'], [12,25,12,14], None),
('SpecVys', 'SpecVys', spec_det, ['Datum','Jméno','Typ vyšetření','Datum vyšetření'], [12,25,25,14], None),
('Platby', 'PlaPac', pla_det, ['Datum','Jméno','Datum platby','Částka','Doklad'], [12,25,14,12,15], None),
('Ostatní', None, None, ['Datum','Jméno','Typ','ID','Název'], [12,25,12,10,30], None),
]
ws_d = wb.active
ws_d.title = "Dekurz"
# List "Text dekurzu" hned za Dekurzem
ws_text = wb.create_sheet("Text dekurzu")
ws_listy = {}
for nazev, *_ in LISTY:
ws_listy[nazev] = wb.create_sheet(nazev)
for nazev, typ, det, sloupce, sirky, _ in LISTY:
barva_hl, _ = BARVY_LISTU[nazev]
zapis_hlavicku(ws_listy[nazev], sloupce, sirky, barva_hl)
ws_listy[nazev].freeze_panes = 'A2'
# Záhlaví listu "Text dekurzu"
zapis_hlavicku(ws_text,
['Datum', 'Čas', 'Lékař', 'Jméno', 'Rodné číslo', 'Text dekurzu'],
[12, 8, 8, 25, 14, 100],
'2E75B6')
ws_text.freeze_panes = 'A2'
row_ptr_text = 2
# Zobrazované názvy sloupců pro TOP_TYPY (stejné pořadí)
NAZVY_TYPY = ['VykA', 'Rec', 'Files', 'Žádanky', 'Lab výsl.', 'Ock', 'Nes', 'Lec', 'SpecVys', 'PlaPac']
# Záhlaví Dekurz sloupce AR (bez textu)
nazvy_d = ['Datum', 'Čas', 'Lékař', 'Jméno', 'Rodné číslo', 'Pojišťovna'] + ['HodVyk'] + NAZVY_TYPY + ['Ostatní']
sirky_d = [12, 8, 8, 25, 14, 12 ] + [10] + [38, 8, 8, 8, 8, 8, 8, 8, 8, 8] + [8]
zapis_hlavicku(ws_d, nazvy_d, sirky_d, '2E75B6')
ws_d.freeze_panes = 'A2'
ws_d.auto_filter.ref = f"A1:R{len(rows)+1}"
row_ptr = {nazev: 2 for nazev, *_ in LISTY}
# ── Plnění dat ─────────────────────────────────────────────────────────────
def get_det_hodnoty(typ, rid, datum, jmeno_cel):
if typ == 'Rec':
d = rec_det.get(rid, ('', ''))
return [datum, jmeno_cel, d[0] or '', d[1] or '']
elif typ == 'VykA':
d = vyka_det.get(rid, ('', '', 0))
return [datum, jmeno_cel, d[0] or '', (d[1] or '').strip(), d[2] or 0]
elif typ == 'Files':
d = files_det.get(rid, ('', ''))
return [datum, jmeno_cel, d[0] or '', d[1]]
elif typ == 'MEDLAB':
d = medlab_det.get(rid, ('', ''))
return [datum, jmeno_cel, d[1] or '']
elif typ == 'Lab':
d = lab_det.get(rid, ('', ''))
return [datum, jmeno_cel, d[1] or '']
elif typ == 'Ock':
d = ock_det.get(rid, ('', ''))
return [datum, jmeno_cel, d[0], d[1] or '']
elif typ == 'Nes':
d = nes_det.get(rid, ('', ''))
return [datum, jmeno_cel, d[0], d[1]]
elif typ == 'Lec':
d = lec_det.get(rid, ('', ''))
return [datum, jmeno_cel, d[0] or '', d[1]]
elif typ == 'SpecVys':
d = spec_det.get(rid, ('', ''))
return [datum, jmeno_cel, d[0] or '', d[1]]
elif typ == 'PlaPac':
d = pla_det.get(rid, ('', '', ''))
return [datum, jmeno_cel, d[0], d[1], d[2] or '']
return []
ZAROVNANI = {
'Recepty': ['left','left','left','center'],
'Výkony': ['left','left','center','center','right'],
'Soubory': ['left','left','left','left'],
'MedLab': ['left','left','center'],
'Lab': ['left','left','center'],
'Očkování': ['left','left','left','left'],
'Neschop.': ['left','left','left','left'],
'Léčiva': ['left','left','center','left'],
'SpecVys': ['left','left','left','left'],
'Platby': ['left','left','left','right','center'],
'Ostatní': ['left','left','center','center','left'],
}
for row_i, (datum, cas, zkratka, jmeno_cel, rodcis, poj, top, ostatni, ids_by_typ, ids_ostatni, text_dekurzu) in enumerate(rows, start=2):
fill_d = r_fill[row_i % 2]
for col_i in range(1, len(nazvy_d) + 1):
ws_d.cell(row=row_i, column=col_i).fill = fill_d
ws_d.cell(row=row_i, column=col_i).border = ohraniceni
ws_d.cell(row=row_i, column=1, value=datum).number_format = 'DD.MM.YYYY'
ws_d.cell(row=row_i, column=2, value=str(cas)[:5] if cas else '').alignment = Alignment(horizontal='center')
ws_d.cell(row=row_i, column=3, value=zkratka or '').alignment = Alignment(horizontal='center')
ws_d.cell(row=row_i, column=4, value=jmeno_cel)
# Sloupec 5 Rodné číslo jako hyperlink na list "Text dekurzu"
hyperlink_cell(ws_d, row_i, 5, 'Text dekurzu', row_ptr_text, rodcis or '', 'DCE6F1')
ws_d.cell(row=row_i, column=6, value=poj or '').alignment = Alignment(horizontal='center')
# Sloupec G HodVyk (součet bodů všech výkonů dekurzu)
hodvyk = sum((vyka_det.get(rid, ('', '', 0))[2] or 0) for rid in ids_by_typ['VykA'])
cell_hv = ws_d.cell(row=row_i, column=7, value=hodvyk if hodvyk else '')
cell_hv.alignment = Alignment(horizontal='center')
for col_off, (typ, pocet) in enumerate(zip(TOP_TYPY, top)):
col_i = 8 + col_off
if pocet == 0:
continue
nazev_listu = next((n for n, t, *_ in LISTY if t == typ), None)
if nazev_listu and ids_by_typ[typ]:
_, barva_ll = BARVY_LISTU[nazev_listu]
# Pro výkony (VykA) přidej kódy do závorky
if typ == 'VykA':
kody = [str(vyka_det.get(rid, ('',))[0] or '').strip() for rid in ids_by_typ[typ]]
kody_str = ', '.join(k for k in kody if k)
display_text = f"{pocet} ({kody_str})" if kody_str else pocet
else:
display_text = pocet
hyperlink_cell(ws_d, row_i, col_i, nazev_listu, row_ptr[nazev_listu], display_text, barva_ll[1:] if len(barva_ll) > 6 else 'DCE6F1')
ws_det = ws_listy[nazev_listu]
barva_hl, barva_r = BARVY_LISTU[nazev_listu]
for rid in ids_by_typ[typ]:
hodnoty = get_det_hodnoty(typ, rid, datum, jmeno_cel)
zarovnani_l = ZAROVNANI.get(nazev_listu, ['left']*10)
zapis_radek(ws_det, row_ptr[nazev_listu], hodnoty, zarovnani_l, barva_r)
row_ptr[nazev_listu] += 1
else:
ws_d.cell(row=row_i, column=col_i, value=pocet).alignment = Alignment(horizontal='center')
if ostatni:
ws_det = ws_listy['Ostatní']
barva_hl, barva_r = BARVY_LISTU['Ostatní']
hyperlink_cell(ws_d, row_i, 18, 'Ostatní', row_ptr['Ostatní'], ostatni, barva_r)
for typ, rid, nazev in ids_ostatni:
zapis_radek(ws_det, row_ptr['Ostatní'],
[datum, jmeno_cel, typ, rid, nazev],
ZAROVNANI['Ostatní'], barva_r)
row_ptr['Ostatní'] += 1
# Zápis do listu "Text dekurzu"
fill_t = r_fill[row_ptr_text % 2]
for col_i in range(1, 7):
ws_text.cell(row=row_ptr_text, column=col_i).fill = fill_t
ws_text.cell(row=row_ptr_text, column=col_i).border = ohraniceni
c1 = ws_text.cell(row=row_ptr_text, column=1, value=datum)
c1.number_format = 'DD.MM.YYYY'
c1.alignment = Alignment(horizontal='left', vertical='center')
ws_text.cell(row=row_ptr_text, column=2, value=str(cas)[:5] if cas else '').alignment = Alignment(horizontal='center', vertical='center')
ws_text.cell(row=row_ptr_text, column=3, value=zkratka or '').alignment = Alignment(horizontal='center', vertical='center')
ws_text.cell(row=row_ptr_text, column=4, value=jmeno_cel).alignment = Alignment(horizontal='left', vertical='center')
ws_text.cell(row=row_ptr_text, column=5, value=rodcis or '').alignment = Alignment(horizontal='left', vertical='center')
cell_txt = ws_text.cell(row=row_ptr_text, column=6, value=text_dekurzu)
cell_txt.alignment = Alignment(horizontal='left', vertical='top', wrap_text=True)
row_ptr_text += 1
# Autofiltr na detailních listech
for nazev, *_ in LISTY:
ws = ws_listy[nazev]
max_col = ws.max_column
max_row = ws.max_row
if max_row > 1:
ws.auto_filter.ref = f"A1:{ws.cell(row=1, column=max_col).column_letter}{max_row}"
# Smazat starý TEST report
for stary in glob.glob(os.path.join(VYSTUPNI_ADRESAR, f'* {NAZEV_REPORTU}.xlsx')):
os.remove(stary)
print(f"Smazán: {stary}")
# Uložit nový
os.makedirs(VYSTUPNI_ADRESAR, exist_ok=True)
casova_znacka = datetime.now().strftime('%Y-%m-%d %H-%M-%S')
vystup = os.path.join(VYSTUPNI_ADRESAR, f'{casova_znacka} {NAZEV_REPORTU}.xlsx')
wb.save(vystup)
print(f"Uloženo: {vystup}")
for nazev, *_ in LISTY:
print(f" {nazev}: {row_ptr[nazev]-2} řádků")
+352
View File
@@ -0,0 +1,352 @@
import sys, io, re, os, glob
from datetime import date, datetime
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
import fdb
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
VYSTUPNI_ADRESAR = r'u:\Dropbox\Ordinace\Reporty'
NAZEV_REPORTU = 'Dekurzy'
DATUM_OD = '2025-01-01'
DATUM_DO = date.today().strftime('%Y-%m-%d')
conn = fdb.connect(
dsn=r'localhost:c:\medicus 3\data\medicus.fdb',
user='SYSDBA', password='masterkey', charset='win1250'
)
cur = conn.cursor()
cur.execute(f"""
SELECT d.DATUM, d.CAS, u.ZKRATKA, k.PRIJMENI, k.JMENO, k.RODCIS, k.POJ, d.DEKURS
FROM DEKURS d
JOIN KAR k ON k.IDPAC = d.IDPAC
LEFT JOIN UZIVATEL u ON u.IDUZI = d.IDUZI
WHERE d.DATUM >= '{DATUM_OD}' AND d.DATUM <= '{DATUM_DO}'
ORDER BY d.DATUM DESC, d.CAS DESC, k.PRIJMENI, k.JMENO
""")
raw_rows = cur.fetchall()
TOP_TYPY = ['VykA', 'Rec', 'Files', 'MEDLAB', 'Lab', 'Ock', 'Nes', 'Lec', 'SpecVys', 'PlaPac']
def rtf_na_text(rtf):
"""Jednoduchý převod RTF na čistý text."""
# Odstraň info blok (bookmarky apod.)
text = re.sub(r'\{\\info.*?\}', '', rtf, flags=re.DOTALL)
# Odstraň ostatní skupiny v {} rekurzivně (fonty, barvy apod.)
for _ in range(6):
text = re.sub(r'\{[^{}]*\}', '', text)
# Nový řádek za \par \line
text = re.sub(r'\\par\b\s*', '\n', text)
text = re.sub(r'\\line\b\s*', '\n', text)
# Dekóduj RTF hex escape sekvence (\'xx) jako cp1250 → správná čeština
def decode_hex(m):
try:
return bytes.fromhex(m.group(1)).decode('cp1250')
except Exception:
return ''
text = re.sub(r"\\'([0-9a-fA-F]{2})", decode_hex, text)
# Odstraň ostatní RTF příkazy
text = re.sub(r'\\[a-zA-Z]+\-?[0-9]*\s?', '', text)
text = re.sub(r'[{}\\]', '', text)
# Vyčisti prázdné řádky a whitespace
lines = [l.strip() for l in text.splitlines()]
lines = [l for l in lines if l]
return '\n'.join(lines)
# Parse dekurzů
rows = []
for datum, cas, zkratka, prijmeni, jmeno, rodcis, poj, dekurs_blob in raw_rows:
rtf = dekurs_blob.read() if hasattr(dekurs_blob, 'read') else (dekurs_blob or '')
pocty = {}
ids_by_typ = {t: [] for t in TOP_TYPY}
ids_ostatni = []
for nazev, typ, rid in re.findall(r'"([^"]+)","([A-Za-z]+):(\d+)"', rtf):
pocty[typ] = pocty.get(typ, 0) + 1
if typ in ids_by_typ:
ids_by_typ[typ].append(int(rid))
else:
ids_ostatni.append((typ, int(rid), nazev))
top = [pocty.get(t, 0) for t in TOP_TYPY]
ostatni = sum(v for k, v in pocty.items() if k not in TOP_TYPY)
iniciala = jmeno[0] + '.' if jmeno and jmeno.strip() else ''
jmeno_cel = f"{prijmeni.strip()}, {iniciala}" if prijmeni else iniciala
text_dekurzu = rtf_na_text(rtf)
rows.append((datum, cas, zkratka, jmeno_cel, rodcis, poj, top, ostatni, ids_by_typ, ids_ostatni, text_dekurzu))
# ── Načtení detailů z DB ────────────────────────────────────────────────────
def fetch_details(cur, table, pk, id_col, fields, ids):
if not ids:
return {}
result = {}
batch_size = 1000
for i in range(0, len(ids), batch_size):
batch = ids[i:i+batch_size]
ph = ','.join('?' * len(batch))
cur.execute(f"SELECT {pk}, {','.join(fields)} FROM {table} WHERE {id_col} IN ({ph})", batch)
for row in cur.fetchall():
result[row[0]] = row[1:]
return result
def get_ids(rows, typ):
return list({rid for *_, ids_by_typ, _, _text in rows for rid in ids_by_typ[typ]})
rec_det = fetch_details(cur, 'RECEPT', 'ID', 'ID', ['LEK','DSIG'], get_ids(rows,'Rec'))
vyka_det = fetch_details(cur, 'DOKLADD', 'ID', 'ID', ['KOD','DDGN','BODY'], get_ids(rows,'VykA'))
files_det = fetch_details(cur, 'FILES', 'ID', 'ID', ['FILENAME','DATUM'], get_ids(rows,'Files'))
medlab_det = fetch_details(cur, 'HISTDOC', 'ID', 'ID', ['DATUM','TYP'], get_ids(rows,'MEDLAB'))
lab_det = fetch_details(cur, 'LABVH', 'IDVH', 'IDVH', ['DATUM','CISLO'], get_ids(rows,'Lab'))
ock_det = fetch_details(cur, 'OCKZAZ', 'ID', 'ID', ['DATUM','LATKA'], get_ids(rows,'Ock'))
nes_det = fetch_details(cur, 'NES', 'ID', 'ID', ['ZACNES','KONNES'], get_ids(rows,'Nes'))
lec_det = fetch_details(cur, 'LECD', 'ID', 'ID', ['KOD','DATOSE'], get_ids(rows,'Lec'))
spec_det = fetch_details(cur, 'SPECVYS', 'IDSPECVYS','IDSPECVYS',['TYP','DATUM'], get_ids(rows,'SpecVys'))
pla_det = fetch_details(cur, 'PLA', 'IDPLA', 'IDPLA', ['DATUM','CENA','DOKLAD'], get_ids(rows,'PlaPac'))
conn.close()
print(f"Načteno {len(rows)} dekurzů")
# ── Styly ──────────────────────────────────────────────────────────────────
tenka_cara = Side(style='thin', color='AAAAAA')
ohraniceni = Border(left=tenka_cara, right=tenka_cara, top=tenka_cara, bottom=tenka_cara)
hl_font = Font(bold=True, color="FFFFFF")
hl_fill = PatternFill("solid", fgColor="2E75B6")
r_fill = [PatternFill("solid", fgColor="FFFFFF"), PatternFill("solid", fgColor="DCE6F1")]
BARVY_LISTU = {
'Recepty': ('1F6B33', 'E2EFDA'),
'Výkony': ('2E4057', 'D6E4F0'),
'Soubory': ('7B3F00', 'FAE5D3'),
'Žádanky': ('4A235A', 'F5EEF8'),
'Lab výsl.': ('145A32', 'D5F5E3'),
'Očkování': ('7E5109', 'FDEBD0'),
'Neschop.': ('922B21', 'FADBD8'),
'Léčiva': ('1A5276', 'D6EAF8'),
'SpecVys': ('0B5345', 'D1F2EB'),
'Platby': ('4D5656', 'EAECEE'),
'Ostatní': ('2C3E50', 'EBF5FB'),
}
def zapis_hlavicku(ws, sloupce, sirky, barva_hex):
hl_fill_l = PatternFill("solid", fgColor=barva_hex)
for col, (nazev, sirka) in enumerate(zip(sloupce, sirky), start=1):
cell = ws.cell(row=1, column=col, value=nazev)
cell.font = hl_font
cell.fill = hl_fill_l
cell.alignment = Alignment(horizontal='center')
cell.border = ohraniceni
ws.column_dimensions[cell.column_letter].width = sirka
def zapis_radek(ws, row_i, hodnoty, zarovnani, barva_hex):
fill = PatternFill("solid", fgColor="FFFFFF") if row_i % 2 == 0 \
else PatternFill("solid", fgColor=barva_hex)
for col_i, (val, align) in enumerate(zip(hodnoty, zarovnani), start=1):
cell = ws.cell(row=row_i, column=col_i, value=val)
cell.fill = fill
cell.border = ohraniceni
cell.alignment = Alignment(horizontal=align)
if col_i == 1 and isinstance(val, __import__('datetime').date):
cell.number_format = 'DD.MM.YYYY'
def hyperlink_cell(ws, row_i, col_i, cil_list, cil_radek, text, barva_hex):
fill = PatternFill("solid", fgColor="FFFFFF") if row_i % 2 == 0 \
else PatternFill("solid", fgColor=barva_hex)
cell = ws.cell(row=row_i, column=col_i)
cell.value = str(text)
# Název listu s mezerou musí být v apostrofech
sheet_ref = f"'{cil_list}'" if ' ' in cil_list else cil_list
cell.hyperlink = f'#{sheet_ref}!A{cil_radek}'
cell.font = Font(color="0000FF", underline='single')
cell.fill = fill
cell.border = ohraniceni
cell.alignment = Alignment(horizontal='center')
# ── Workbook ───────────────────────────────────────────────────────────────
wb = openpyxl.Workbook()
LISTY = [
('Recepty', 'Rec', rec_det, ['Datum','Jméno','Recept','Dávkování'], [12,25,25,12], None),
('Výkony', 'VykA', vyka_det, ['Datum','Jméno','Kód výkonu','Diagnóza','Body'], [12,25,14,10,8], None),
('Soubory', 'Files', files_det, ['Datum','Jméno','Soubor','Datum souboru'], [12,25,35,14], None),
('Žádanky', 'MEDLAB', medlab_det, ['Datum','Jméno','Typ'], [12,25,15], None),
('Lab výsl.', 'Lab', lab_det, ['Datum','Jméno','Číslo'], [12,25,20], None),
('Očkování', 'Ock', ock_det, ['Datum','Jméno','Datum očkování','Vakcína'], [12,25,14,30], None),
('Neschop.', 'Nes', nes_det, ['Datum','Jméno','Od','Do'], [12,25,12,12], None),
('Léčiva', 'Lec', lec_det, ['Datum','Jméno','Kód','Datum výkonu'], [12,25,12,14], None),
('SpecVys', 'SpecVys', spec_det, ['Datum','Jméno','Typ vyšetření','Datum vyšetření'], [12,25,25,14], None),
('Platby', 'PlaPac', pla_det, ['Datum','Jméno','Datum platby','Částka','Doklad'], [12,25,14,12,15], None),
('Ostatní', None, None, ['Datum','Jméno','Typ','ID','Název'], [12,25,12,10,30], None),
]
ws_d = wb.active
ws_d.title = "Dekurz"
# List "Text dekurzu" hned za Dekurzem
ws_text = wb.create_sheet("Text dekurzu")
ws_listy = {}
for nazev, *_ in LISTY:
ws_listy[nazev] = wb.create_sheet(nazev)
for nazev, typ, det, sloupce, sirky, _ in LISTY:
barva_hl, _ = BARVY_LISTU[nazev]
zapis_hlavicku(ws_listy[nazev], sloupce, sirky, barva_hl)
ws_listy[nazev].freeze_panes = 'A2'
# Záhlaví listu "Text dekurzu"
zapis_hlavicku(ws_text,
['Datum', 'Čas', 'Lékař', 'Jméno', 'Rodné číslo', 'Text dekurzu'],
[12, 8, 8, 25, 14, 100],
'2E75B6')
ws_text.freeze_panes = 'A2'
row_ptr_text = 2
# Zobrazované názvy sloupců pro TOP_TYPY (stejné pořadí)
NAZVY_TYPY = ['VykA', 'Rec', 'Files', 'Žádanky', 'Lab výsl.', 'Ock', 'Nes', 'Lec', 'SpecVys', 'PlaPac']
# Záhlaví Dekurz sloupce AR
nazvy_d = ['Datum', 'Čas', 'Lékař', 'Jméno', 'Rodné číslo', 'Pojišťovna'] + ['HodVyk'] + NAZVY_TYPY + ['Ostatní']
sirky_d = [12, 8, 8, 25, 14, 12 ] + [10] + [38, 8, 8, 8, 8, 8, 8, 8, 8, 8] + [8]
zapis_hlavicku(ws_d, nazvy_d, sirky_d, '2E75B6')
ws_d.freeze_panes = 'A2'
ws_d.auto_filter.ref = f"A1:R{len(rows)+1}"
row_ptr = {nazev: 2 for nazev, *_ in LISTY}
# ── Plnění dat ─────────────────────────────────────────────────────────────
def get_det_hodnoty(typ, rid, datum, jmeno_cel):
if typ == 'Rec':
d = rec_det.get(rid, ('', ''))
return [datum, jmeno_cel, d[0] or '', d[1] or '']
elif typ == 'VykA':
d = vyka_det.get(rid, ('', '', 0))
return [datum, jmeno_cel, d[0] or '', (d[1] or '').strip(), d[2] or 0]
elif typ == 'Files':
d = files_det.get(rid, ('', ''))
return [datum, jmeno_cel, d[0] or '', d[1]]
elif typ == 'MEDLAB':
d = medlab_det.get(rid, ('', ''))
return [datum, jmeno_cel, d[1] or '']
elif typ == 'Lab':
d = lab_det.get(rid, ('', ''))
return [datum, jmeno_cel, d[1] or '']
elif typ == 'Ock':
d = ock_det.get(rid, ('', ''))
return [datum, jmeno_cel, d[0], d[1] or '']
elif typ == 'Nes':
d = nes_det.get(rid, ('', ''))
return [datum, jmeno_cel, d[0], d[1]]
elif typ == 'Lec':
d = lec_det.get(rid, ('', ''))
return [datum, jmeno_cel, d[0] or '', d[1]]
elif typ == 'SpecVys':
d = spec_det.get(rid, ('', ''))
return [datum, jmeno_cel, d[0] or '', d[1]]
elif typ == 'PlaPac':
d = pla_det.get(rid, ('', '', ''))
return [datum, jmeno_cel, d[0], d[1], d[2] or '']
return []
ZAROVNANI = {
'Recepty': ['left','left','left','center'],
'Výkony': ['left','left','center','center','right'],
'Soubory': ['left','left','left','left'],
'Žádanky': ['left','left','center'],
'Lab výsl.': ['left','left','center'],
'Očkování': ['left','left','left','left'],
'Neschop.': ['left','left','left','left'],
'Léčiva': ['left','left','center','left'],
'SpecVys': ['left','left','left','left'],
'Platby': ['left','left','left','right','center'],
'Ostatní': ['left','left','center','center','left'],
}
for row_i, (datum, cas, zkratka, jmeno_cel, rodcis, poj, top, ostatni, ids_by_typ, ids_ostatni, text_dekurzu) in enumerate(rows, start=2):
fill_d = r_fill[row_i % 2]
for col_i in range(1, len(nazvy_d) + 1):
ws_d.cell(row=row_i, column=col_i).fill = fill_d
ws_d.cell(row=row_i, column=col_i).border = ohraniceni
ws_d.cell(row=row_i, column=1, value=datum).number_format = 'DD.MM.YYYY'
ws_d.cell(row=row_i, column=2, value=str(cas)[:5] if cas else '').alignment = Alignment(horizontal='center')
ws_d.cell(row=row_i, column=3, value=zkratka or '').alignment = Alignment(horizontal='center')
ws_d.cell(row=row_i, column=4, value=jmeno_cel)
# Sloupec 5 Rodné číslo jako hyperlink na list "Text dekurzu"
hyperlink_cell(ws_d, row_i, 5, 'Text dekurzu', row_ptr_text, rodcis or '', 'DCE6F1')
ws_d.cell(row=row_i, column=6, value=poj or '').alignment = Alignment(horizontal='center')
# Sloupec G HodVyk (součet bodů všech výkonů dekurzu)
hodvyk = sum((vyka_det.get(rid, ('', '', 0))[2] or 0) for rid in ids_by_typ['VykA'])
cell_hv = ws_d.cell(row=row_i, column=7, value=hodvyk if hodvyk else '')
cell_hv.alignment = Alignment(horizontal='center')
for col_off, (typ, pocet) in enumerate(zip(TOP_TYPY, top)):
col_i = 8 + col_off
if pocet == 0:
continue
nazev_listu = next((n for n, t, *_ in LISTY if t == typ), None)
if nazev_listu and ids_by_typ[typ]:
_, barva_ll = BARVY_LISTU[nazev_listu]
# Pro výkony (VykA) přidej kódy do závorky
if typ == 'VykA':
kody = [str(vyka_det.get(rid, ('',))[0] or '').strip() for rid in ids_by_typ[typ]]
kody_str = ', '.join(k for k in kody if k)
display_text = f"{pocet} ({kody_str})" if kody_str else pocet
else:
display_text = pocet
hyperlink_cell(ws_d, row_i, col_i, nazev_listu, row_ptr[nazev_listu], display_text, barva_ll[1:] if len(barva_ll) > 6 else 'DCE6F1')
ws_det = ws_listy[nazev_listu]
barva_hl, barva_r = BARVY_LISTU[nazev_listu]
for rid in ids_by_typ[typ]:
hodnoty = get_det_hodnoty(typ, rid, datum, jmeno_cel)
zarovnani_l = ZAROVNANI.get(nazev_listu, ['left']*10)
zapis_radek(ws_det, row_ptr[nazev_listu], hodnoty, zarovnani_l, barva_r)
row_ptr[nazev_listu] += 1
else:
ws_d.cell(row=row_i, column=col_i, value=pocet).alignment = Alignment(horizontal='center')
if ostatni:
ws_det = ws_listy['Ostatní']
barva_hl, barva_r = BARVY_LISTU['Ostatní']
hyperlink_cell(ws_d, row_i, 18, 'Ostatní', row_ptr['Ostatní'], ostatni, barva_r)
for typ, rid, nazev in ids_ostatni:
zapis_radek(ws_det, row_ptr['Ostatní'],
[datum, jmeno_cel, typ, rid, nazev],
ZAROVNANI['Ostatní'], barva_r)
row_ptr['Ostatní'] += 1
# Zápis do listu "Text dekurzu"
fill_t = r_fill[row_ptr_text % 2]
for col_i in range(1, 7):
ws_text.cell(row=row_ptr_text, column=col_i).fill = fill_t
ws_text.cell(row=row_ptr_text, column=col_i).border = ohraniceni
c1 = ws_text.cell(row=row_ptr_text, column=1, value=datum)
c1.number_format = 'DD.MM.YYYY'
c1.alignment = Alignment(horizontal='left', vertical='center')
ws_text.cell(row=row_ptr_text, column=2, value=str(cas)[:5] if cas else '').alignment = Alignment(horizontal='center', vertical='center')
ws_text.cell(row=row_ptr_text, column=3, value=zkratka or '').alignment = Alignment(horizontal='center', vertical='center')
ws_text.cell(row=row_ptr_text, column=4, value=jmeno_cel).alignment = Alignment(horizontal='left', vertical='center')
ws_text.cell(row=row_ptr_text, column=5, value=rodcis or '').alignment = Alignment(horizontal='left', vertical='center')
cell_txt = ws_text.cell(row=row_ptr_text, column=6, value=text_dekurzu)
cell_txt.alignment = Alignment(horizontal='left', vertical='top', wrap_text=True)
row_ptr_text += 1
# Autofiltr na detailních listech
for nazev, *_ in LISTY:
ws = ws_listy[nazev]
max_col = ws.max_column
max_row = ws.max_row
if max_row > 1:
ws.auto_filter.ref = f"A1:{ws.cell(row=1, column=max_col).column_letter}{max_row}"
# Smazat starý report
for stary in glob.glob(os.path.join(VYSTUPNI_ADRESAR, f'* {NAZEV_REPORTU}.xlsx')):
os.remove(stary)
print(f"Smazán: {stary}")
# Uložit nový
os.makedirs(VYSTUPNI_ADRESAR, exist_ok=True)
casova_znacka = datetime.now().strftime('%Y-%m-%d %H-%M-%S')
vystup = os.path.join(VYSTUPNI_ADRESAR, f'{casova_znacka} {NAZEV_REPORTU}.xlsx')
wb.save(vystup)
print(f"Uloženo: {vystup}")
for nazev, *_ in LISTY:
print(f" {nazev}: {row_ptr[nazev]-2} řádků")
@@ -0,0 +1,281 @@
import os,shutil,fdb,time, socket
import re,datetime,funkce
def get_medicus_connection():
"""
Connect to Firebird 'medicus.fdb' depending on computer name.
Returns fdb.Connection or raises RuntimeError if unknown or connection fails.
"""
computer_name = socket.gethostname().upper()
try:
if computer_name == "Z230":
print("Computer name is Z230")
cesta = r"u:\dropboxtest\Ordinace\Dokumentace_ke_zpracování"
cestazpracovana = r"u:\Dropboxtest\Ordinace\Dokumentace_zpracovaná"
return fdb.connect(dsn=r"localhost:c:\medicus 3\data\medicus.fdb", user="SYSDBA", password="masterkey", charset="win1250"),cesta,cestazpracovana
elif computer_name == "LEKAR":
print("Computer name is LEKAR")
cesta = r"z:\dropbox\Ordinace\Dokumentace_ke_zpracování"
cestazpracovana = r"z:\Dropbox\Ordinace\Dokumentace_zpracovaná"
return fdb.connect(dsn=r"localhost:m:\medicus\data\medicus.fdb", user="SYSDBA", password="masterkey", charset="win1250"),cesta,cestazpracovana
elif computer_name in ("SESTRA", "POHODA"):
print("Computer name is SESTRA or POHODA")
cesta = r"z:\dropbox\Ordinace\Dokumentace_ke_zpracování"
cestazpracovana = r"z:\Dropbox\Ordinace\Dokumentace_zpracovaná"
return fdb.connect(dsn=r"192.168.1.10:m:\medicus\data\medicus.fdb", user="SYSDBA", password="masterkey", charset="win1250"),cesta,cestazpracovana
else:
raise RuntimeError(f"❌ Unknown computer name: {computer_name}")
except Exception as e:
print(f"⚠️ Error connecting to Medicus on {computer_name}: {e}")
raise
#toto volání současně nadefinuje cesty do dropboxu
conn,cesta,cestazpracovana=get_medicus_connection()
def is_encodable_win1250(text: str) -> bool:
try:
text.encode("cp1250")
return True
except UnicodeEncodeError:
return False
def make_win1250_safe(text: str) -> str:
return text.encode("cp1250", errors="replace").decode("cp1250").replace("?", "_")
def restore_files_for_import(retezec):
drop=r"u:\Dropbox\!!!Days\Downloads Z230\Dokumentace"
next=r"u:\NextcloudOrdinace\Dokumentace_ke_zpracování"
# Check if the directory exists
if not os.path.exists(drop):
print(f"The directory '{drop}' does not exist.")
return
# Iterate over all files and subdirectories in the directory
for item in os.listdir(drop):
item_path = os.path.join(drop, item)
# If it's a file or a symbolic link, delete it
if os.path.isfile(item_path) or os.path.islink(item_path):
os.unlink(item_path)
print(f"Deleted file: {item_path}")
# If it's a directory, delete it recursively
elif os.path.isdir(item_path):
shutil.rmtree(item_path)
print(f"Deleted directory: {item_path}")
for item in os.listdir(next):
item_path = os.path.join(next, item)
# If it's a file finished with PDF, copy it
if os.path.isfile(item_path) and item_path.endswith(".pdf") and retezec in item_path:
shutil.copy(item_path,os.path.join(drop,item))
print(f"Copied file: {item_path}")
def kontrola_rc(rc,connection):
cur = connection.cursor()
cur.execute("select count(*),idpac from kar where rodcis=? group by idpac",(rc,))
row = cur.fetchone()
if row:
return row[1]
else:
return False
def kontrola_struktury(souborname,connection):
if souborname.endswith('.pdf'):
#kontrola struktury
pattern=re.compile(r'(^\d{9,10}) (\d{4}-\d{2}-\d{2}) (\w+, \w.+?) \[(.+?)\] \[(.*?)\]')
match=pattern.search(souborname)
# print(souborname)
vpohode=True
if match and len(match.groups())==5:
datum=match.group(2)
try:
datum_object = datetime.datetime.strptime(datum,"%Y-%m-%d").date()
# print(datum_object)
except:
vpohode=False
return vpohode
cur = connection.cursor()
cur.execute("select count(*) from kar where rodcis=?", (match.group(1),))
row = cur.fetchone()[0]
if row!=1:
vpohode = False
return vpohode
else:
vpohode=False
return vpohode
else:
vpohode=False
return vpohode
return vpohode
def vrat_info_o_souboru(souborname, connection):
pattern = re.compile(r'(^\d{9,10}) (\d{4}-\d{2}-\d{2}) (\w+, \w.+?) \[(.+?)\] \[(.*?)\]')
match = pattern.search(souborname)
rc = match.group(1)
datum = datetime.datetime.strptime(match.group(2), "%Y-%m-%d").date()
jmeno = match.group(3)
prvnizavorka = match.group(4)
druhazavorka = match.group(5)
cur=connection.cursor()
cur.execute("select idpac from kar where rodcis=?",(rc,))
idpac = cur.fetchone()[0]
datumsouboru = datetime.datetime.fromtimestamp(os.path.getctime(os.path.join(cesta,souborname)))
return (rc,idpac,datum,jmeno,prvnizavorka,druhazavorka,souborname,datumsouboru)
def prejmenuj_chybny_soubor(souborname,cesta):
if souborname[0]!="":
soubornovy = "" + souborname
os.rename(os.path.join(cesta,souborname),os.path.join(cesta,soubornovy))
# print(kontrola_struktury(ss))
# info=vrat_info_o_souboru(ss)
# print(kontrola_rc(info[0],conn))
# restore_files_for_import("")
# restore_files_for_import("346204097")
info=[]
for soubor in os.listdir(cesta):
plna_cesta = os.path.join(cesta, soubor)
if not os.path.isfile(plna_cesta):
continue # skip folders or anything thats not a regular fil
if not is_encodable_win1250(soubor):
safe_name = make_win1250_safe(soubor)
novy_plna_cesta = os.path.join(cesta, safe_name)
print(f"⚠️ Renaming invalid filename:\n {soubor}{safe_name}")
os.rename(plna_cesta, novy_plna_cesta)
# Update variable for later processing
soubor = safe_name
plna_cesta = novy_plna_cesta
print(soubor)
if kontrola_struktury(soubor,conn):
info.append(vrat_info_o_souboru(soubor,conn))
# os.remove(os.path.join(cesta,soubor))
else:
prejmenuj_chybny_soubor(soubor,cesta)
info = sorted(info, key=lambda x: (x[0], x[1]))
print(info)
skupiny={}
for row in info:
skupiny[row[0]]=[]
for row in info:
skupiny[row[0]].append(row)
# print(skupiny)
# rtf = r"""{\rtf1\ansi\ansicpg1250\uc1\deff0\deflang1029{\info{\bookmarks BOOKMARKNAMES }}{\fonttbl{\f0\fnil\fcharset238 Arial;}{\f5\fnil\fcharset238 Symbol;}}
# {\colortbl ;\red0\green0\blue255;\red0\green128\blue0;\red0\green0\blue0;}
# {\stylesheet{\s10\fi0\li0\ql\ri0\sb0\sa0 Vlevo;}{\*\cs15\f0\fs20 Norm\'e1ln\'ed;}{\*\cs20\f0\i\fs20 Z\'e1hlav\'ed;}{\*\cs22\f0\ul\fs20\cf1 Odkaz;}}
# \uc1\pard\s10\plain\cs20\f0\i\fs20 P\'f8\'edlohy:\par
# \pard\s10{\*\bkmkstart 0}\plain\cs22\f0\ul\fs20\cf1 BOOKMARKNAMESTEXT{\*\bkmkend 0}\par
# \pard\s10\plain\cs15\f0\fs20 \par
# }"""
rtf = r"""{\rtf1\ansi\ansicpg1250\uc1\deff0\deflang1029{\info{\bookmarks BOOKMARKNAMES }}{\fonttbl{\f0\fnil\fcharset238 Arial;}{\f5\fnil\fcharset238 Symbol;}}
{\colortbl ;\red0\green0\blue255;\red0\green128\blue0;\red0\green0\blue0;}
{\stylesheet{\s10\fi0\li0\ql\ri0\sb0\sa0 Vlevo;}{\*\cs15\f0\fs20 Norm\'e1ln\'ed;}{\*\cs20\f0\i\fs20 Z\'e1hlav\'ed;}{\*\cs22\f0\ul\fs20\cf1 Odkaz;}}
\uc1\pard\s10\plain\cs20\f0\i\fs20 P\'f8\'edlohy:\par
BOOKMARKSTEXT
\pard\s10\plain\cs15\f0\fs20 \par
}"""
for key in skupiny.keys():
rtf = r"""{\rtf1\ansi\ansicpg1250\uc1\deff0\deflang1029{\info{\bookmarks BOOKMARKNAMES }}{\fonttbl{\f0\fnil\fcharset238 Arial;}{\f5\fnil\fcharset238 Symbol;}}
{\colortbl ;\red0\green0\blue255;\red0\green128\blue0;\red0\green0\blue0;}
{\stylesheet{\s10\fi0\li0\ql\ri0\sb0\sa0 Vlevo;}{\*\cs15\f0\fs20 Norm\'e1ln\'ed;}{\*\cs20\f0\i\fs20 Z\'e1hlav\'ed;}{\*\cs22\f0\ul\fs20\cf1 Odkaz;}}
\uc1\pard\s10\plain\cs20\f0\i\fs20 Vlo\'9eena skenovan\'e1 dokumentace:\par
BOOKMARKSTEXT
\pard\s10\plain\cs15\f0\fs20\par
}"""
# if key=="8257300425": #346204097
if True:
prvnibookmark=True
print(key,len(skupiny[key]))
cislo=9
poradi=0
bookmark=""
bookmarks=""
for row in skupiny[key]:
# print(row)
pacid=row[1]
filename=row[6]
fileid=funkce.zapis_file(vstupconnection=conn, idpac=row[1],
cesta=cesta, souborname=row[6], prvnizavorka=row[4],
soubordate=row[2], souborfiledate=row[7], poznamka=row[5])
for attempt in range(3):
try:
# Replace this with the command that might raise an error
if not os.path.exists(os.path.join(cestazpracovana,row[6])):
shutil.move(os.path.join(cesta,row[6]), os.path.join(cestazpracovana,row[6]))
print("Command succeeded!")
break # Exit the loop if the command succeeds
else:
now = datetime.datetime.now()
datetime_string = now.strftime("%Y-%m-%d %H-%M-%S")
print(os.path.join(cestazpracovana,row[6][:-4]+" "+datetime_string+".pdf"))
shutil.move(os.path.join(cesta,row[6]),os.path.join(cestazpracovana,row[6][:-4]+" "+datetime_string+".pdf"))
print("Command succeeded!")
break # Exit the loop if the command succeeds
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < 3 - 1:
print(f"Retrying in {5} seconds...")
time.sleep(5)
else:
print("Max retries reached. Command failed.")
filename=funkce.convert_to1250(filename)
print("Encodedfilename", filename)
filenameforbookmark=row[2].strftime('%Y-%m-%d')+" "+row[4]+": "+row[5]
bookmark=bookmark+'"'+filenameforbookmark+'","Files:'+str(fileid)+'",'+str(cislo)+";"
cislo+=7
# print(bookmark)
if prvnibookmark:
bookmarks=bookmarks+r'\pard\s10{\*\bkmkstart '+str(poradi)+r"}\plain\cs22\f0\ul\fs20\cf1 "+filenameforbookmark+r"{\*\bkmkend "+str(poradi)+r"}\par"
prvnibookmark=False
else:
bookmarks=bookmarks+r'\pard\s10{\*\bkmkstart '+str(poradi)+r"}" + filenameforbookmark + r"{\*\bkmkend " + str(poradi) + r"}\par"
bookmark=bookmark[:-1]
# bookmarks=bookmarks[:-2]
print(bookmark)
print(bookmarks)
rtf = rtf.replace("BOOKMARKNAMES", bookmark)
rtf=rtf.replace("BOOKMARKSTEXT",bookmarks)
print(rtf)
dekursid = funkce.get_dekurs_id(conn)
datumzapisu = datetime.datetime.now().date()
caszapisu = datetime.datetime.now().time()
cur=conn.cursor()
cur.execute("insert into dekurs (id,iduzi,idprac,idodd,idpac,datum,cas,dekurs)"
" values(?,?,?,?,?,?,?,?)",
(dekursid,6,2,2, row[1],datumzapisu,caszapisu, rtf))
conn.commit()
# rtf = rtf.replace("FILEID", str(idfile))
#Zde zapisujeme soubor
# fileid=funkce.zapis_file(conn,row[1],cesta,row[6],row[4],row[2],row[7],row[5])
# zapis_dekurs(vstupconnection, idpac, idodd, iduzi, idprac, idfile, filename, text, datumzpravy,datumsouboru)
# return (rc, idpac, datum, jmeno, prvnizavorka, druhazavorka, souborname, datumsouboru)
# Zde zapisujeme dekurs
# text=row[2].strftime("%Y-%m-%d")+" "+row[4].strip()+": "+row[5].strip()
# funkce.zapis_dekurs(conn, row[1], 2, 6, 2, fileid, text, text, row[7], row[2])
# os.remove(os.path.join(cesta, soubor))
@@ -0,0 +1,379 @@
import os
import fdb
import csv,time,pandas as pd
import openpyxl
PathToSaveCSV=r"z:\Dropbox\Ordinace\Reporty"
timestr = time.strftime("%Y-%m-%d %H-%M-%S ")
CSVname="Pacienti.xlsx"
# ================= DELETE OLD REPORTS (KEEP TODAY) ==================
from datetime import datetime
today = datetime.now().strftime("%Y-%m-%d")
for fname in os.listdir(PathToSaveCSV):
if fname.endswith("Pacienti.xlsx"):
file_date = fname[:10] # first 10 chars = YYYY-MM-DD
if file_date != today: # delete only older files
try:
os.remove(os.path.join(PathToSaveCSV, fname))
print(f"🗑️ Deleted old report: {fname}")
except Exception as e:
print(f"⚠️ Could not delete {fname}: {e}")
con = fdb.connect(
host='192.168.1.10', database=r'm:\MEDICUS\data\medicus.FDB',
user='sysdba', password='masterkey',charset='WIN1250')
#Server=192.168.1.10
#Path=M:\Medicus\Data\Medicus.fdb
# Create a Cursor object that operates in the context of Connection con:
cur = con.cursor()
# import openpyxl module
import openpyxl
import xlwings as xw
wb = openpyxl.Workbook()
sheet = wb.active
# wb.save("sample.xlsx")
#Načtení očkování registrovaných pacientů
cur.execute("select rodcis,prijmeni,jmeno,ockzaz.datum,kodmz,ockzaz.poznamka,latka,nazev,expire from registr join kar on registr.idpac=kar.idpac join ockzaz on registr.idpac=ockzaz.idpac where datum_zruseni is null and kar.vyrazen!='A' and kar.rodcis is not null and idicp!=0 order by ockzaz.datum desc")
nacteno=cur.fetchall()
print(len(nacteno))
sheet.title="Očkování"
sheet.append(["Rodne cislo","Prijmeni","Jmeno","Datum ockovani","Kod MZ","Sarze","Latka","Nazev","Expirace"])
#nacteno jsou ockovani
for row in nacteno:
sheet.append(row)
wb.save(os.path.join(PathToSaveCSV ,timestr+CSVname))
#Načtení registrovaných pacientů
cur.execute("select rodcis,prijmeni,jmeno,datum_registrace,registr.idpac,poj from registr join kar on registr.idpac=kar.idpac where kar.vyrazen!='A' and kar.rodcis is not null and idicp!=0 and datum_zruseni is null")
nacteno=cur.fetchall()
print(len(nacteno))
wb.create_sheet('Registrovani',0)
sheet=wb['Registrovani']
sheet.append(["Rodne cislo","Prijmeni","Jmeno","Datum registrace","ID pacienta","Pojistovna"])
#nacteno jsou registrovani
for row in nacteno:
sheet.append(row)
wb.save(os.path.join(PathToSaveCSV ,timestr+CSVname))
#Načtení receptů
cur.execute("""select
kar.rodcis,
TRIM(kar.prijmeni) ||' '|| substring(kar.jmeno from 1 for 1) ||'.' as jmeno,
recept.datum,
TRIM(recept.lek) ||' '|| trim(recept.dop) as lek,
recept.expori AS Poc,
CASE
WHEN recept.opakovani is null THEN 1
ELSE recept.opakovani
END AS OP,
recept.uhrada,
recept.dsig,
recept.NOTIFIKACE_KONTAKT as notifikace,
recept_epodani.erp,
recept_epodani.vystavitel_jmeno,
recept.atc,
recept.CENAPOJ,
recept.cenapac
from recept LEFT Join RECEPT_EPODANI on recept.id_epodani=recept_epodani.id
LEFT join kar on recept.idpac=kar.idpac
order by datum desc,erp desc"""
)
nacteno=cur.fetchall()
print(len(nacteno))
wb.create_sheet('Recepty',0)
sheet=wb['Recepty']
sheet.title="Recepty"
sheet.append(["Rodné číslo","Jméno","Datum vystavení","Název leku","Poč.","Op.","Úhr.","Da signa","Notifikace","eRECEPT","Vystavil","ATC","Cena pojišťovna","Cena pacient"])
#nacteno jsou ockovani
for row in nacteno:
try:
sheet.append(row)
except:
continue
wb.save(os.path.join(PathToSaveCSV ,timestr+CSVname))
#Načtení vykony vsech
cur.execute("select dokladd.rodcis,TRIM(prijmeni) ||', '|| TRIM(jmeno),dokladd.datose,dokladd.kod,dokladd.pocvyk,dokladd.ddgn,dokladd.body,vykony.naz "
"from kar join dokladd on kar.rodcis=dokladd.rodcis join vykony on dokladd.kod=vykony.kod where (datose>=vykony.platiod and datose<=vykony.platido) OR (datose>=vykony.platiod and vykony.platido is null) order by dokladd.datose desc,dokladd.rodcis")
wb.create_sheet('Vykony',0)
sheet=wb['Vykony']
nacteno=cur.fetchall()
print(len(nacteno))
sheet.append(["Rodne cislo","Jmeno","Datum vykonu","Kod","Pocet","Dg.","Body","Nazev"])
#nacteno jsou ockovani
for row in nacteno:
sheet.append(row)
wb.save(os.path.join(PathToSaveCSV ,timestr+CSVname))
#Načtení neschopenek
import datetime
def pocet_dni(zacnes,konnes,pracne):
dnes=datetime.date.today()
if pracne=='A':
return (dnes-zacnes).days
if pracne=='N' and zacnes is not None and konnes is not None and zacnes<=konnes:
return (konnes-zacnes).days
else:
return "NA"
cur.execute("select nes.idpac, "
"kar.rodcis, "
"TRIM(prijmeni) ||', '|| TRIM(jmeno), "
"nes.datnes, "
"nes.ecn, "
"nes.zacnes, "
"nes.pracne, "
"nes.konnes, "
"nes.diagno, "
"nes.kondia, "
"nes.updated "
"from nes "
"left join kar on nes.idpac=kar.idpac where nes.datnes<=current_date "
"order by datnes desc")
tmpnacteno_vse=[]
nacteno_vse=cur.fetchall()
cur.execute("select nes.idpac, "
"kar.rodcis, "
"TRIM(prijmeni) ||', '|| TRIM(jmeno), "
"nes.datnes, "
"nes.ecn, "
"nes.zacnes, "
"nes.pracne, "
"nes.konnes, "
"nes.diagno, "
"nes.kondia, "
"nes.updated "
"from nes "
"left join kar on nes.idpac=kar.idpac where nes.datnes<=current_date and pracne='A'"
"order by datnes desc")
tmpnacteno_aktivni=[]
nacteno_aktivni=cur.fetchall()
for row in nacteno_vse:
tmpnacteno_vse.append((row[0],row[1],row[2],row[3],row[4],row[5],row[6],row[7],pocet_dni(row[5],row[7],row[6]),row[8],row[9],row[10]))
for row in nacteno_aktivni:
(tmpnacteno_aktivni.append((row[0],row[1],row[2],row[3],row[4],row[5],row[6],row[7],pocet_dni(row[5],row[7],row[6]),row[8],row[9],row[10])))
wb.create_sheet('Neschopenky všechny',0)
sheet=wb["Neschopenky všechny"]
sheet.append(["ID pac","Rodne cislo","Jmeno","Datum neschopenky","Číslo neschopenky","Zacatek","Aktivní?","Konec","Pocet dni","Diagnoza zacatel","Diagnoza konec","Aktualizovano"])
for row in tmpnacteno_vse:
sheet.append(row)
wb.create_sheet('Neschopenky aktivní',0)
sheet=wb["Neschopenky aktivní"]
sheet.append(["ID pac","Rodne cislo","Jmeno","Datum neschopenky","Číslo neschopenky","Zacatek","Aktivní?","Konec","Pocet dni","Diagnoza zacatel","Diagnoza konec","Aktualizovano"])
for row in tmpnacteno_aktivni:
sheet.append(row)
#Načtení preventivni prohlidky
cur.execute("select all dokladd.rodcis,TRIM(prijmeni) ||', '|| TRIM(jmeno),dokladd.datose,dokladd.kod,vykony.naz,dokladd.ddgn,dokladd.body "
"from dokladd left join kar on dokladd.rodcis=kar.rodcis join vykony on dokladd.kod=vykony.kod where "
"((datose>=vykony.platiod and datose<=vykony.platido) OR (datose>=vykony.platiod and vykony.platido is null)) and (dokladd.kod=1022 or dokladd.kod=1021) "
"order by datose desc")
wb.create_sheet('Preventivni prohlidky',0)
sheet=wb['Preventivni prohlidky']
nacteno=cur.fetchall()
print(len(nacteno))
sheet.append(["Rodne cislo","Jmeno","Datum","Kod","Název","Dg.","Body"])
for row in nacteno:
sheet.append(row)
wb.save(os.path.join(PathToSaveCSV ,timestr+CSVname))
#Nacteni INR
cur.execute("select all dokladd.rodcis,TRIM(prijmeni) ||', '|| TRIM(jmeno),dokladd.datose,dokladd.kod,vykony.naz,dokladd.ddgn,dokladd.body "
"from dokladd left join kar on dokladd.rodcis=kar.rodcis join vykony on dokladd.kod=vykony.kod where "
"((datose>=vykony.platiod and datose<=vykony.platido) OR (datose>=vykony.platiod and vykony.platido is null)) and (dokladd.kod=01443) "
"order by datose desc")
wb.create_sheet('INR',0)
sheet=wb['INR']
nacteno=cur.fetchall()
print(len(nacteno))
sheet.append(["Rodne cislo","Jmeno","Datum","Kod","Název","Dg.","Body"])
for row in nacteno:
sheet.append(row)
wb.save(os.path.join(PathToSaveCSV ,timestr+CSVname))
#Nacteni CRP
cur.execute("select all dokladd.rodcis,TRIM(prijmeni) ||', '|| TRIM(jmeno),dokladd.datose,dokladd.kod,vykony.naz,dokladd.ddgn,dokladd.body "
"from dokladd left join kar on dokladd.rodcis=kar.rodcis join vykony on dokladd.kod=vykony.kod where "
"((datose>=vykony.platiod and datose<=vykony.platido) OR (datose>=vykony.platiod and vykony.platido is null)) and (dokladd.kod=02230 or dokladd.kod=09111) "
"order by datose desc,dokladd.rodcis,dokladd.kod")
wb.create_sheet('CRP',0)
sheet=wb['CRP']
nacteno=cur.fetchall()
print(len(nacteno))
sheet.append(["Rodne cislo","Jmeno","Datum","Kod","Název","Dg.","Body"])
for row in nacteno:
sheet.append(row)
wb.save(os.path.join(PathToSaveCSV ,timestr+CSVname))
#Nacteni Holter
cur.execute("select all dokladd.rodcis,TRIM(prijmeni) ||', '|| TRIM(jmeno),dokladd.datose,dokladd.kod,vykony.naz,dokladd.ddgn,dokladd.body "
"from dokladd left join kar on dokladd.rodcis=kar.rodcis join vykony on dokladd.kod=vykony.kod where "
"((datose>=vykony.platiod and datose<=vykony.platido) OR (datose>=vykony.platiod and vykony.platido is null)) and (dokladd.kod=17129) "
"order by datose desc,dokladd.rodcis,dokladd.kod")
wb.create_sheet('Holter',0)
sheet=wb['Holter']
nacteno=cur.fetchall()
print(len(nacteno))
sheet.append(["Rodne cislo","Jmeno","Datum","Kod","Název","Dg.","Body"])
for row in nacteno:
sheet.append(row)
wb.save(os.path.join(PathToSaveCSV ,timestr+CSVname))
#Nacteni prostata
cur.execute("select all dokladd.rodcis,TRIM(prijmeni) ||', '|| TRIM(jmeno),dokladd.datose,dokladd.kod,vykony.naz,dokladd.ddgn,dokladd.body "
"from dokladd left join kar on dokladd.rodcis=kar.rodcis join vykony on dokladd.kod=vykony.kod where "
"((datose>=vykony.platiod and datose<=vykony.platido) OR (datose>=vykony.platiod and vykony.platido is null)) and (dokladd.kod=01130 or dokladd.kod=01131 or dokladd.kod=01132 or dokladd.kod=01133 or dokladd.kod=01134) "
"order by datose desc,dokladd.rodcis,dokladd.kod")
wb.create_sheet('Prostata',0)
sheet=wb['Prostata']
nacteno=cur.fetchall()
print(len(nacteno))
sheet.append(["Rodne cislo","Jmeno","Datum","Kod","Název","Dg.","Body"])
for row in nacteno:
sheet.append(row)
wb.save(os.path.join(PathToSaveCSV ,timestr+CSVname))
#Nacteni TOKS
cur.execute("select all dokladd.rodcis,TRIM(prijmeni) ||', '|| TRIM(jmeno),dokladd.datose,dokladd.kod,vykony.naz,dokladd.ddgn,dokladd.body "
"from dokladd left join kar on dokladd.rodcis=kar.rodcis join vykony on dokladd.kod=vykony.kod where "
"((datose>=vykony.platiod and datose<=vykony.platido) OR (datose>=vykony.platiod and vykony.platido is null)) and "
"(dokladd.kod=15118 or dokladd.kod=15119 or dokladd.kod=15120 or dokladd.kod=15121) "
"order by datose desc,dokladd.rodcis,dokladd.kod")
wb.create_sheet('TOKS',0)
sheet=wb['TOKS']
nacteno=cur.fetchall()
print(len(nacteno))
sheet.append(["Rodne cislo","Jmeno","Datum","Kod","Název","Dg.","Body"])
for row in nacteno:
sheet.append(row)
wb.save(os.path.join(PathToSaveCSV ,timestr+CSVname))
#Nacteni COVID
cur.execute("select all dokladd.rodcis,TRIM(prijmeni) ||', '|| TRIM(jmeno),dokladd.datose,dokladd.kod,vykony.naz,dokladd.ddgn,dokladd.body "
"from dokladd left join kar on dokladd.rodcis=kar.rodcis join vykony on dokladd.kod=vykony.kod where "
"((datose>=vykony.platiod and datose<=vykony.platido) OR (datose>=vykony.platiod and vykony.platido is null)) and "
"(dokladd.kod=01306) "
"order by datose desc,dokladd.rodcis,dokladd.kod")
wb.create_sheet('COVID',0)
sheet=wb['COVID']
nacteno=cur.fetchall()
print(len(nacteno))
sheet.append(["Rodne cislo","Jmeno","Datum","Kod","Název","Dg.","Body"])
for row in nacteno:
sheet.append(row)
wb.save(os.path.join(PathToSaveCSV ,timestr+CSVname))
#Nacteni Streptest
cur.execute("select all dokladd.rodcis,TRIM(prijmeni) ||', '|| TRIM(jmeno),dokladd.datose,dokladd.kod,vykony.naz,dokladd.ddgn,dokladd.body "
"from dokladd left join kar on dokladd.rodcis=kar.rodcis join vykony on dokladd.kod=vykony.kod where "
"((datose>=vykony.platiod and datose<=vykony.platido) OR (datose>=vykony.platiod and vykony.platido is null)) and "
"(dokladd.kod=02220) "
"order by datose desc,dokladd.rodcis,dokladd.kod")
wb.create_sheet('Streptest',0)
sheet=wb['Streptest']
nacteno=cur.fetchall()
print(len(nacteno))
sheet.append(["Rodne cislo","Jmeno","Datum","Kod","Název","Dg.","Body"])
for row in nacteno:
sheet.append(row)
# autofilter
for ws in wb.worksheets:
# Get the maximum number of rows and columns
max_row = ws.max_row
max_column = ws.max_column
ws.auto_filter.ref = f"A1:{openpyxl.utils.get_column_letter(max_column)}{max_row}"
# ws.auto_filter.ref = ws.dimensions
wb.save(os.path.join(PathToSaveCSV ,timestr+CSVname))
# Tento modul je pouze na autofit jednotlivych sloupcu na vsech listech workbooku
file = os.path.join(PathToSaveCSV ,timestr+CSVname)
with xw.App(visible=False) as app:
wb = xw.Book(file)
for sheet in range(len(wb.sheets)):
ws = wb.sheets[sheet]
ws.autofit()
# centrování receptů
sheet = wb.sheets['Recepty']
for sloupec in ["C:C", "E:E", "F:F", "G:G", "I:I", "M:M", "N:N"]:
sheet.range(sloupec).api.HorizontalAlignment = 3 # 3 = Center
wb.save()
wb.close()
@@ -0,0 +1,188 @@
# MedicusWithClaudeKomplexniReport CLAUDE_NOTES
## Co skript dělá
`komplexni_report.py` generuje komplexní Excel přehled ordinace.
Soubor se ukládá do `u:\Dropbox\!!!Days\Downloads Z230\YYYY-MM-DD_HH-MM-SS_Pacienti.xlsx`.
Předchozí verze (`*Pacienti.xlsx`) se před zápisem automaticky smažou.
## Spuštění
```
C:\Python\python.exe komplexni_report.py
```
Trvá cca **10 minut** (kvůli xlwings autofit přes celý Excel).
Spouští se automaticky v noci → nevadí.
## Připojení k DB
```python
fdb.connect(
host='localhost',
database=r'c:\MEDICUS 3\data\medicus.FDB',
user='sysdba', password='masterkey', charset='WIN1250'
)
```
## Závislosti
```
pip install fdb openpyxl xlwings extract-msg beautifulsoup4 python-dateutil
```
---
## Listy v Excelu (pořadí)
| List | Zdroj | Popis |
|---|---|---|
| `Registrovani` | registr + kar | Aktivní registrovaní pacienti |
| `Očkování` | ockzaz + registr + kar | Záznamy o očkování registrovaných |
| `Recepty` | recept + recept_epodani + kar | Všechny recepty, eRECEPT čísla, ceny |
| `Vykony` | dokladd + kar + vykony | Všechny výkony (s platným číselníkem) |
| `Neschopenky všechny` | nes + kar | Všechny neschopenky |
| `Neschopenky aktivní` | nes + kar | Pouze aktivní (pracne='A') |
| `Preventivni prohlidky` | dokladd + vykony | Kódy 1021, 1022 |
| `INR` | dokladd + vykony | Kód 1443 |
| `CRP` | dokladd + vykony | Kódy 2230, 9111 |
| `Holter` | dokladd + vykony | Kód 17129 |
| `Prostata` | dokladd + vykony | Kódy 11301134 |
| `TOKS` | dokladd + vykony | Kódy 1511815121 |
| `COVID` | dokladd + vykony | Kód 1306 |
| `Streptest` | dokladd + vykony | Kód 2220 |
| `Posudky řidičák` | HISTDOC (TYP=MOTORVO) + KAR | Ruční posudky k řízení MV |
| `ePosudky registr` | HISTDOC (TYP=EPOSMRO) + HISTDOC_EPOSUDEK + KAR | Elektronická podání do centrálního registru |
---
## Pomocné funkce
### `sanitize(val)`
Opraví znaky neplatné pro Excel:
- `µ``u`
- řídící znaky (ord < 32, kromě tab/LF/CR) → `_`
- náhradní znaky Unicode (0xFFFE, 0xFFFF, surrogáty) → `_`
Použito ve všech listech kde hrozí problematická data z DB.
### `fmt(val)`
Vrátí `''` pro None, jinak zavolá `sanitize()`.
### `add_vykony_sheet(sheet_name, kody)`
Helper pro listy s výkony. Přijme název listu a seznam kódů výkonů.
SQL: `dokladd JOIN kar JOIN vykony WHERE kod IN (...) AND platnost kódu platí`.
Řazení: datum DESC, rodcis, kod.
### `pocet_dni(zacnes, konnes, pracne)`
Výpočet délky neschopenky:
- `pracne='A'` (aktivní) → dny od začátku do dnes
- `pracne='N'` → dny od začátku do konce
- jinak → `"NA"`
### `parse_data(data_str)`
Parsuje `key=value` text z pole `HISTDOC.DATA` do slovníku.
Každý řádek = jeden klíč/hodnota oddělené `=`.
### `parse_date(val)`
Převede formát `D:DD.MM.YYYY` (jak ho ukládá Medicus) na `datetime.date`.
### `style_header(ws)` / `autofit_ws(ws)`
Styl záhlaví (modrý fill, bílý tučný text, centrování) a šířky sloupců (max 50 znaků).
Používají se jen na listech s posudky (ostatní listy řeší xlwings).
---
## Listy s posudky detail
### `Posudky řidičák` (MOTORVO)
Data jsou uložena v `HISTDOC.DATA` jako `key=value` text.
Parsovaná pole:
| Sloupec | Zdroj v DATA |
|---|---|
| PorCislo | `PorCislo` nebo `HISTDOC.PORCISLO` |
| DatumVyd | `DatumVyd` (formát `D:DD.MM.YYYY`) |
| DatKonec | `DatKonec` (formát `D:DD.MM.YYYY`) |
| DruhProh | `DruhProh` |
| Posouzeni | odvozeno z `Posouzeni`, `Posouzeni2`, `ZpusobPodminka` |
| ZpusobPodminka | `ZpusobPodminka` |
| SkupinaPodminka | `SkupinaPodminka` |
| Skupiny | `ZpusobJe` |
**Logika Posouzeni:**
- `Posouzeni2 = T``nezpůsobilý`
- `ZpusobPodminka = B:1``způsobilý s podmínkou`
- `Posouzeni = T``způsobilý`
**Sloupec ePosudek:**
`ANO` pokud existuje záznam v HISTDOC s `TYP='EPOSMRO'` pro stejného pacienta (IDPACI) a stejné datum.
Párování: `(IDPACI, DATUM)` přímá FK vazba mezi MOTORVO a EPOSMRO neexistuje.
Buňka s ANO je zelená (fill + font).
**Zebra pruhování:** liché řádky bílé, sudé světle modré (`DCE6F1`).
### `ePosudky registr` (EPOSMRO)
Elektronická podání do centrálního registru způsobilosti.
Stát tuto funkci zavedl přibližně od aktualizace Medicusu (03/2026).
Data parsovaná z `HISTDOC.DATA`:
| Sloupec | Zdroj v DATA |
|---|---|
| DatumVyd | `DatumVystaveni` |
| DatKonec | `PlatnostDo` |
| DruhProhlidky | `DruhProhlidkyNazev` |
| DruhPosudku | `DruhPosudkuNazev` |
| Vysledek | `VysledekNazev` |
| StavPosudku | `StavPosudkuNazev` |
| TypAkce | `TypAkceNazev` |
Stavová pole z `HISTDOC_EPOSUDEK`:
- `ID_PODANI` ID podání do registru
- `ODESLANO` timestamp odeslání
- `STATUS_ODESL` stav odpovědi z registru (`O` = odesláno)
**Zneplatnění:** `StavPosudku = zneplatneny` = lékař aktivně odvolal způsobilost
(např. pacient prodělal mrtvici, epileptický záchvat atp.).
Zneplatnění je samostatný EPOSMRO záznam, ne modifikace původního.
---
## xlwings závěrečný krok
Po `wb.save()` se soubor otevře přes xlwings (vyžaduje plný Excel):
1. `sheet.autofit()` na všech listech správné šířky sloupců
2. Na listu `Recepty`: centrování sloupců C, E, F, G, I, M, N
3. `wb_xw.save()` + zavření
xlwings je nutný pro spolehlivý autofit (openpyxl ho neumí přesně).
Trvá ~10 minut, spouští se v noci.
---
## Pořadí zpracování (pro debugování)
```
DB connect
→ smazání starých souborů
→ SQL dotazy (Registrovani, Očkování, Recepty, Výkony, Neschopenky)
→ add_vykony_sheet × 8
→ MOTORVO + EPOSMRO listy (s parsováním DATA)
→ autofilter na všech listech
→ con.close() + wb.save()
→ xlwings autofit + centrování
→ Hotovo.
```
Print výstup v konzoli ukazuje počty řádků každého listu užitečné pro kontrolu.
---
## Rozšíření v budoucnu
- Přidat další typy posudků (pracovní, vstupní, sportovní...) ze `VS_POSUDKY`
- Případně sledovat stav podání EPOSMRO v čase (datum odeslání vs. datum posudku)
- Automatické spouštění přes Windows Task Scheduler (jako `faktury_report.py`)
@@ -0,0 +1,410 @@
import os
import time
import fdb
import openpyxl
import xlwings as xw
from datetime import datetime, date
from openpyxl.utils import get_column_letter
from openpyxl.styles import Font, PatternFill, Alignment
# --- Konfigurace ---
PathToSaveCSV = r"u:\Dropbox\!!!Days\Downloads Z230"
timestr = time.strftime("%Y-%m-%d_%H-%M-%S_")
output_path = os.path.join(PathToSaveCSV, timestr + "Pacienti.xlsx")
# --- Smazání předchozích verzí ---
for fname in os.listdir(PathToSaveCSV):
if fname.endswith("Pacienti.xlsx"):
try:
os.remove(os.path.join(PathToSaveCSV, fname))
except Exception as e:
print(f"Nelze smazat {fname}: {e}")
# --- Připojení k DB ---
con = fdb.connect(
host='localhost', database=r'c:\MEDICUS 3\data\medicus.FDB',
user='sysdba', password='masterkey', charset='WIN1250'
)
cur = con.cursor()
wb = openpyxl.Workbook()
# =====================
# Pomocné funkce
# =====================
# Styly pro posudky
HEADER_FILL = PatternFill('solid', fgColor='2F5496')
HEADER_FONT = Font(bold=True, color='FFFFFF')
ZEBRA_FILL = PatternFill('solid', fgColor='DCE6F1')
GREEN_FILL = PatternFill('solid', fgColor='C6EFCE')
GREEN_FONT = Font(bold=True, color='276221')
def style_header(ws):
for cell in ws[1]:
cell.fill = HEADER_FILL
cell.font = HEADER_FONT
cell.alignment = Alignment(horizontal='center')
def autofit_ws(ws):
for col in ws.columns:
max_len = max((len(str(cell.value)) if cell.value is not None else 0) for cell in col)
ws.column_dimensions[get_column_letter(col[0].column)].width = min(max_len + 2, 50)
def sanitize(val):
"""Nahradí znaky neplatné pro Excel: µ → u, ostatní → _"""
if not isinstance(val, str):
return val
result = []
for ch in val:
if ch == 'µ':
result.append('u')
elif ord(ch) < 32 and ch not in '\t\n\r':
result.append('_')
elif ord(ch) in (0xFFFE, 0xFFFF) or 0xD800 <= ord(ch) <= 0xDFFF:
result.append('_')
else:
result.append(ch)
return ''.join(result)
def fmt(val):
return '' if val is None else sanitize(val)
def parse_data(data_str):
"""Parsuje key=value text z HISTDOC.DATA do slovníku."""
result = {}
if not data_str:
return result
for line in data_str.splitlines():
if '=' in line:
key, _, val = line.partition('=')
result[key.strip()] = val.strip()
return result
def parse_date(val):
"""Převede 'D:DD.MM.YYYY' na datetime.date."""
if val and val.startswith('D:'):
try:
return datetime.strptime(val[2:], '%d.%m.%Y').date()
except ValueError:
return val
return val
VYKONY_CONDITION = """
(datose >= vykony.platiod AND datose <= vykony.platido)
OR (datose >= vykony.platiod AND vykony.platido IS NULL)
"""
VYKONY_HEADERS = ["Rodne cislo", "Jmeno", "Datum vykonu", "Kod", "Název", "Dg.", "Body"]
def add_vykony_sheet(sheet_name, kody):
"""Přidá list s výkony filtrovanými podle seznamu kódů."""
kod_list = ", ".join(str(k) for k in kody)
cur.execute(f"""
SELECT dokladd.rodcis,
TRIM(prijmeni) || ', ' || TRIM(jmeno),
dokladd.datose, dokladd.kod, vykony.naz, dokladd.ddgn, dokladd.body
FROM dokladd
LEFT JOIN kar ON dokladd.rodcis = kar.rodcis
JOIN vykony ON dokladd.kod = vykony.kod
WHERE ({VYKONY_CONDITION})
AND dokladd.kod IN ({kod_list})
ORDER BY datose DESC, dokladd.rodcis, dokladd.kod
""")
rows = cur.fetchall()
print(f"{sheet_name}: {len(rows)}")
ws = wb.create_sheet(sheet_name)
ws.append(VYKONY_HEADERS)
for row in rows:
ws.append(list(row))
# =====================
# List: Registrovaní
# =====================
cur.execute("""
SELECT rodcis, prijmeni, jmeno, datum_registrace, registr.idpac, poj
FROM registr
JOIN kar ON registr.idpac = kar.idpac
WHERE kar.vyrazen != 'A'
AND kar.rodcis IS NOT NULL
AND idicp != 0
AND datum_zruseni IS NULL
""")
rows = cur.fetchall()
print(f"Registrovaní: {len(rows)}")
ws = wb.active
ws.title = 'Registrovani'
ws.append(["Rodne cislo", "Prijmeni", "Jmeno", "Datum registrace", "ID pacienta", "Pojistovna"])
for row in rows:
ws.append(list(row))
# =====================
# List: Očkování
# =====================
cur.execute("""
SELECT rodcis, prijmeni, jmeno, ockzaz.datum, kodmz, ockzaz.poznamka, latka, nazev, expire
FROM registr
JOIN kar ON registr.idpac = kar.idpac
JOIN ockzaz ON registr.idpac = ockzaz.idpac
WHERE datum_zruseni IS NULL
AND kar.vyrazen != 'A'
AND kar.rodcis IS NOT NULL
AND idicp != 0
ORDER BY ockzaz.datum DESC
""")
rows = cur.fetchall()
print(f"Očkování: {len(rows)}")
ws = wb.create_sheet("Očkování")
ws.append(["Rodne cislo", "Prijmeni", "Jmeno", "Datum ockovani", "Kod MZ", "Sarze", "Latka", "Nazev", "Expirace"])
for row in rows:
ws.append(list(row))
# =====================
# List: Recepty
# =====================
cur.execute("""
SELECT kar.rodcis,
TRIM(kar.prijmeni) || ' ' || SUBSTRING(kar.jmeno FROM 1 FOR 1) || '.' AS jmeno,
recept.datum,
TRIM(recept.lek) || ' ' || TRIM(recept.dop) AS lek,
recept.expori AS Poc,
CASE WHEN recept.opakovani IS NULL THEN 1 ELSE recept.opakovani END AS OP,
recept.uhrada,
recept.dsig,
recept.NOTIFIKACE_KONTAKT AS notifikace,
recept_epodani.erp,
recept_epodani.vystavitel_jmeno,
recept.atc,
recept.CENAPOJ,
recept.cenapac
FROM recept
LEFT JOIN RECEPT_EPODANI ON recept.id_epodani = recept_epodani.id
LEFT JOIN kar ON recept.idpac = kar.idpac
ORDER BY datum DESC, erp DESC
""")
rows = cur.fetchall()
print(f"Recepty: {len(rows)}")
ws = wb.create_sheet("Recepty")
ws.append(["Rodné číslo", "Jméno", "Datum vystavení", "Název leku", "Poč.", "Op.", "Úhr.",
"Da signa", "Notifikace", "eRECEPT", "Vystavil", "ATC", "Cena pojišťovna", "Cena pacient"])
for row in rows:
ws.append([sanitize(v) if isinstance(v, str) else v for v in row])
# =====================
# List: Výkony všechny
# =====================
cur.execute(f"""
SELECT dokladd.rodcis,
TRIM(prijmeni) || ', ' || TRIM(jmeno),
dokladd.datose, dokladd.kod, dokladd.pocvyk, dokladd.ddgn, dokladd.body, vykony.naz
FROM kar
JOIN dokladd ON kar.rodcis = dokladd.rodcis
JOIN vykony ON dokladd.kod = vykony.kod
WHERE {VYKONY_CONDITION}
ORDER BY dokladd.datose DESC, dokladd.rodcis
""")
rows = cur.fetchall()
print(f"Výkony: {len(rows)}")
ws = wb.create_sheet("Vykony")
ws.append(["Rodne cislo", "Jmeno", "Datum vykonu", "Kod", "Pocet", "Dg.", "Body", "Nazev"])
for row in rows:
ws.append(list(row))
# =====================
# Listy: Neschopenky
# =====================
def pocet_dni(zacnes, konnes, pracne):
dnes = date.today()
if pracne == 'A':
return (dnes - zacnes).days if zacnes else "NA"
if pracne == 'N' and zacnes and konnes and zacnes <= konnes:
return (konnes - zacnes).days
return "NA"
def nes_row(r):
return (r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7],
pocet_dni(r[5], r[7], r[6]), r[8], r[9], r[10])
NES_HEADERS = ["ID pac", "Rodne cislo", "Jmeno", "Datum neschopenky", "Číslo neschopenky",
"Zacatek", "Aktivní?", "Konec", "Pocet dni", "Diagnoza zacatel", "Diagnoza konec", "Aktualizovano"]
cur.execute("""
SELECT nes.idpac, kar.rodcis,
TRIM(prijmeni) || ', ' || TRIM(jmeno),
nes.datnes, nes.ecn, nes.zacnes, nes.pracne, nes.konnes,
nes.diagno, nes.kondia, nes.updated
FROM nes
LEFT JOIN kar ON nes.idpac = kar.idpac
WHERE nes.datnes <= CURRENT_DATE
ORDER BY datnes DESC
""")
vse = cur.fetchall()
aktivni = [r for r in vse if r[6] == 'A']
print(f"Neschopenky: {len(vse)} celkem, {len(aktivni)} aktivních")
ws = wb.create_sheet("Neschopenky všechny")
ws.append(NES_HEADERS)
for r in vse:
ws.append(list(nes_row(r)))
ws = wb.create_sheet("Neschopenky aktivní")
ws.append(NES_HEADERS)
for r in aktivni:
ws.append(list(nes_row(r)))
# =====================
# Výkonové listy jednotlivé typy výkonů
# =====================
add_vykony_sheet('Preventivni prohlidky', [1022, 1021])
add_vykony_sheet('INR', [1443])
add_vykony_sheet('CRP', [2230, 9111])
add_vykony_sheet('Holter', [17129])
add_vykony_sheet('Prostata', [1130, 1131, 1132, 1133, 1134])
add_vykony_sheet('TOKS', [15118, 15119, 15120, 15121])
add_vykony_sheet('COVID', [1306])
add_vykony_sheet('Streptest', [2220])
# =====================
# List: Posudky řidičák MOTORVO (ruční)
# =====================
cur.execute("SELECT IDPACI, DATUM FROM HISTDOC WHERE TYP = 'EPOSMRO'")
eposmro_keys = set((r[0], r[1]) for r in cur.fetchall())
cur.execute("""
SELECT h.ID, h.DATUM, h.IDPACI,
k.PRIJMENI, k.JMENO, k.RODCIS,
h.DATA, h.PORCISLO, h.STAV, h.PRINTED, h.IDUZIV, h.CREATED
FROM HISTDOC h
JOIN KAR k ON k.IDPAC = h.IDPACI
WHERE h.TYP = 'MOTORVO'
ORDER BY h.ID DESC
""")
motorvo_rows = cur.fetchall()
print(f"MOTORVO: {len(motorvo_rows)}")
motorvo_headers = [
'ID', 'DATUM', 'IDPACI', 'PRIJMENI', 'JMENO', 'RODCIS',
'PorCislo', 'DatumVyd', 'DatKonec', 'DruhProh',
'Posouzeni', 'ZpusobPodminka', 'SkupinaPodminka', 'Skupiny',
'ePosudek', 'STAV', 'PRINTED', 'IDUZIV', 'CREATED'
]
ws = wb.create_sheet("Posudky řidičák")
ws.append(motorvo_headers)
epos_col_idx = motorvo_headers.index('ePosudek') + 1
for i, row in enumerate(motorvo_rows, start=2):
(hid, datum, idpac, prijmeni, jmeno, rodcis,
data_blob, porcislo, stav, printed, iduziv, created) = row
data = parse_data(data_blob)
if data.get('Posouzeni2') == 'T':
posouzeni = 'nezpůsobilý'
elif data.get('ZpusobPodminka') == 'B:1':
posouzeni = 'způsobilý s podmínkou'
elif data.get('Posouzeni') == 'T':
posouzeni = 'způsobilý'
else:
posouzeni = ''
ws.append([
hid, fmt(datum), idpac, fmt(prijmeni), fmt(jmeno), fmt(rodcis),
fmt(porcislo or data.get('PorCislo', '')),
parse_date(data.get('DatumVyd', '')),
parse_date(data.get('DatKonec', '')),
fmt(data.get('DruhProh', '')),
posouzeni,
fmt(data.get('ZpusobPodminka', '')),
fmt(data.get('SkupinaPodminka', '')),
fmt(data.get('ZpusobJe', '')),
'ANO' if (idpac, datum) in eposmro_keys else 'NE',
fmt(stav), fmt(printed), fmt(iduziv), fmt(created),
])
if i % 2 == 0:
for cell in ws[i]:
cell.fill = ZEBRA_FILL
cell = ws.cell(row=i, column=epos_col_idx)
if cell.value == 'ANO':
cell.fill = GREEN_FILL
cell.font = GREEN_FONT
style_header(ws)
ws.freeze_panes = 'A2'
autofit_ws(ws)
# =====================
# List: Posudky řidičák EPOSMRO (elektronická podání)
# =====================
cur.execute("""
SELECT h.ID, h.DATUM, h.IDPACI,
k.PRIJMENI, k.JMENO, k.RODCIS,
h.DATA, h.STAV, h.CREATED,
e.ID_PODANI, e.ODESLANO, e.STATUS
FROM HISTDOC h
JOIN KAR k ON k.IDPAC = h.IDPACI
LEFT JOIN HISTDOC_EPOSUDEK e ON e.ID_HISTDOC = h.ID
WHERE h.TYP = 'EPOSMRO'
ORDER BY h.ID DESC
""")
epos_rows = cur.fetchall()
print(f"EPOSMRO: {len(epos_rows)}")
ws = wb.create_sheet("ePosudky registr")
ws.append([
'ID', 'DATUM', 'IDPACI', 'PRIJMENI', 'JMENO', 'RODCIS',
'DatumVyd', 'DatKonec', 'DruhProhlidky', 'DruhPosudku',
'Vysledek', 'StavPosudku', 'TypAkce',
'STAV', 'CREATED', 'ID_PODANI', 'ODESLANO', 'STATUS_ODESL'
])
for i, row in enumerate(epos_rows, start=2):
(hid, datum, idpac, prijmeni, jmeno, rodcis,
data_blob, stav, created, id_podani, odeslano, status_odesl) = row
data = parse_data(data_blob)
ws.append([
hid, fmt(datum), idpac, fmt(prijmeni), fmt(jmeno), fmt(rodcis),
parse_date(data.get('DatumVystaveni', '')),
parse_date(data.get('PlatnostDo', '')),
fmt(data.get('DruhProhlidkyNazev', '')),
fmt(data.get('DruhPosudkuNazev', '')),
fmt(data.get('VysledekNazev', '')),
fmt(data.get('StavPosudkuNazev', '')),
fmt(data.get('TypAkceNazev', '')),
fmt(stav), fmt(created), fmt(id_podani), fmt(odeslano), fmt(status_odesl),
])
if i % 2 == 0:
for cell in ws[i]:
cell.fill = ZEBRA_FILL
style_header(ws)
ws.freeze_panes = 'A2'
autofit_ws(ws)
# =====================
# Autofilter na všech listech
# =====================
for ws in wb.worksheets:
ws.auto_filter.ref = f"A1:{get_column_letter(ws.max_column)}{ws.max_row}"
# =====================
# Uložení
# =====================
con.close()
wb.save(output_path)
print(f"Uloženo: {output_path}")
# =====================
# xlwings: autofit + centrování Recepty
# =====================
with xw.App(visible=False) as app:
wb_xw = xw.Book(output_path)
for sheet in wb_xw.sheets:
sheet.autofit()
for sloupec in ["C:C", "E:E", "F:F", "G:G", "I:I", "M:M", "N:N"]:
wb_xw.sheets['Recepty'].range(sloupec).api.HorizontalAlignment = 3
wb_xw.save()
wb_xw.close()
print("Hotovo.")
+321
View File
@@ -0,0 +1,321 @@
# MedicusWithClaudePN Pracovní neschopnosti
## Účel
Report aktivních pracovních neschopností pro MUDr. Buzalkovou Michaelu.
Generuje PDF a odesílá na výchozí tiskárnu.
## Spuštění
```bash
# Test otevře PDF v prohlížeči, netiskne:
python pn_report.py --no-print
# Ostrý provoz vytiskne rovnou na výchozí tiskárnu:
python pn_report.py
```
## Požadavky
```bash
pip install reportlab pywin32
```
## SQL dotaz aktivní PN
Zachycen přes Firebird trace přímo z Medicusu (přesná kopie logiky aplikace),
doplněn o podotaz na poslední 14denní potvrzení z tabulky HPN.
```sql
SELECT
nes.id,
nes.idpac,
TRIM(kar.prijmeni) || ', ' || TRIM(kar.jmeno) AS jmeno,
kar.rodcis,
nes.zacnes,
nes.konnes,
nes.diagno,
COALESCE(nes.ecn, nes.cisnes) AS cisnes,
(SELECT MAX(h.datum) FROM hpn h
WHERE h.idnes = nes.id AND h.typ = '2' AND h.storno = 'F') AS posl_potvrzeni
FROM nes, kar
WHERE nes.zacnes <= current_date
AND nes.konnes IS NULL
AND nes.idpac = kar.idpac
AND nes.pracne = 'A'
AND nes.storno <> 'T'
AND (
NOT EXISTS (SELECT id FROM nesd WHERE nesd.idnes = nes.id)
OR (SELECT FIRST 1 kam FROM nesd WHERE nesd.idnes = nes.id
ORDER BY nesd.datum DESC, nesd.id DESC) = 'N'
)
ORDER BY kar.prijmeni ASC, kar.jmeno ASC
```
### Klíčové podmínky
| Podmínka | Význam |
|---|---|
| `pracne = 'A'` | Pouze pracovní neschopnosti (ne jiné typy) |
| `storno <> 'T'` | Vyřazení stornovaných záznamů |
| `zacnes <= current_date` | PN již začala |
| `konnes IS NULL` | PN dosud neskončila datum konce nemůže být v budoucnosti (pravidlo ČSSZ), aktivní PN má vždy `konnes = NULL` |
| `nesd` subquery | PN nebyla předána dál poslední záznam `Kam = 'N'` = stále u pacienta |
| `COALESCE(ecn, cisnes)` | Použije ECN (elektronické), jinak starší CISNES |
## Sloupce reportu
| Sloupec | Zdroj | Poznámka |
|---|---|---|
| # | | Pořadové číslo |
| Příjmení a jméno | KAR | |
| Rod. číslo | KAR | |
| Začátek PN | NES.ZACNES | |
| Dnů | výpočet | Počet dní od začátku PN do dnes |
| Diagnóza | NES.DIAGNO | |
| Posl. potvrzení | HPN (TYP='2') | Datum posledního 14denního potvrzení |
| Dní od potvr. | výpočet | Červeně pokud > 14 dní |
## Tabulka HPN typy podání
| TYP | Význam |
|---|---|
| `H` | Hlášení neschopnosti (vznik PN) |
| `1` | První zpráva |
| `P` | **Průběžná zpráva = 14denní potvrzení trvání PN** |
| `2` | Neznámý typ (2033 záznamů v DB, ale ne pro průběžná potvrzení) |
| `C`, `Y`, `Z` | Vzácné typy (jednotky záznamů) |
Vazba: `HPN.IDNES → NES.ID`
---
## Jak Medicus zobrazuje PN daného pacienta (zachyceno z trace)
### 1. Seznam všech PN pacienta
```sql
SELECT
ID, IDPAC, DATNES, CISNES, PODNIK, ADRESA, PROFES, ZACNES,
KONNES, PRACNE, PRICINA, DIAGNO, KONDIA, PREDAN, IDUZI,
VYSTAVIL, DATUKONNES, IDODD, IDPRAC, STORNO,
DATVYCHOD, DATVYCHDO, VYCH1OD, VYCH1DO, VYCH2OD, VYCH2DO, VYCH3OD, VYCH3DO,
DATOSETRENI, DATPRINAV, OMLUVENKA, RODCISNES,
DatNastUstPece, DatUkonUstPece, ICPE, ECN, EPODANI,
ADR_OBEC, ADR_CP, ADR_CO, ADR_DOD, ADR_PSC, ADR_STAT,
ZAM_ADRESA, DATUKON_OSSZ, ZAMDRUH, STATDPNKOD,
SOUHLAS_SSZKOD, SOUHLAS_SSZNAZ, SOUHLAS_DATUM,
DGZMENA, CIZI, OSSZ, DUVOD_UKONCENI, UKON_OSSZ, PORUS_REZIMU,
UKON_OSSZNAZ, ADR_ZMENA, coalesce(ECN, CISNES) as CISLO,
ADR_ZMENA_DO, POTVRZENI_VYDANO, DATNAR, SPRAVCE_POJ,
ZAM_OBEC, ZAM_CO, ZAM_CP, ZAM_DOD, ZAM_PSC, ZAM_STAT, ZAM_CCSZ_ID, ZAM_CSSZ_VARSYM,
VYCHINDIVIDUAL, VERZE_DPN, LEKAR_VYSTAVIL, LEKAR_VYSTAVIL_ICPE,
USEDATNAR, KONTAKT_TEL, KONTAKT_EMAIL,
case when KONTAKT_TEL is not NULL then 'S'
when KONTAKT_EMAIL is not null then 'E'
else NULL end as NOTIFIKACE,
coalesce(KONTAKT_TEL, KONTAKT_EMAIL) as NOTIFIKACE_KONTAKT
FROM NES
WHERE IDPAC = ?
ORDER BY DATNES ASC, ID ASC
```
### 2. Formuláře eNeschopenky pro vybranou PN (záložka "Formuláře eNeschopenky")
Zobrazuje pouze TYP `H`, `1`, `2` (ne `P` = propuštění).
Pouze záznamy s vazbou na HISTDOC (`IDHISTDOC IS NOT NULL`).
```sql
SELECT
HD.ID AS HISTDOCID,
H.TYP AS HISTDOCTYP, -- H=hlášení, 1=první zpráva, 2=průběžná/potvrzení
HD.DATUM AS DATZAD, -- Datum vystavení (z HISTDOC)
H.DATUM AS DATPOD, -- Datum podání (z HPN)
H.STAV,
H.ODBAVENO,
HD.TYP
FROM HPN H
JOIN HISTDOC HD ON H.IDHISTDOC = HD.ID
WHERE H.IDNES = ?
ORDER BY HD.DATUM ASC
```
### 3. Rychlý přehled formulářů (bez HISTDOC)
Používá se pro zjištění stavu vrací všechny záznamy TYP `H`, `1`, `2`:
```sql
SELECT * FROM HPN
WHERE IDNES = ?
AND STORNO = 'F'
AND TYP IN ('1', '2', 'H')
ORDER BY DATUM DESC, CAS DESC, ID DESC
```
### 4. TFHpnHistorie formulář "Historie HPN" (acHpnHistorie)
Kompletní přehled všech HPN záznamů pro vybranou PN. Spouští se akcí `acHpnHistorie`.
```sql
select h.ID, h.IDNES, h.IDPODANI, h.TYP, h.DATA, h.DATUM, h.CAS, h.ODPOVED,
h.IDPRAC, h.IDUZI, h.UPRAVENO, h.OPRAVA_ID, h.STAV, h.STORNO,
h.POR_CISLO, h.ID_CHYBY, h.OSSZ,
n.CisNes, n.Ecn, coalesce(n.CisNes, n.Ecn) as Cislo, n.IdPac,
k.Prijmeni, k.Jmeno, k.Titul, coalesce(n.RODCISNES, k.RODCIS) as RODCIS,
u.Zkratka, hp.Odeslano, hp.CorelationId,
(select first 1 h2.ID from HPN h2 where h2.OPRAVA_ID = h.ID) as IdOpravy,
h.verze_dpn,
n.SPRAVCE_POJ,
n.ADRESA as ULICE, n.ADR_CP, n.ADR_CO, n.ADR_DOD, n.ADR_OBEC, n.ADR_PSC, n.ADR_STAT,
n.PODNIK, n.PROFES, n.ZAM_ADRESA, n.ZAM_CP, n.ZAM_CO, n.ZAM_OBEC, n.ZAM_PSC, n.ZAM_STAT,
n.ZACNES, n.DIAGNO, n.DATNES, n.PRICINA,
n.KONNES, n.KONDIA, n.DATUKONNES,
n.DATVYCHOD, n.VYCH1OD, n.VYCH1DO, n.VYCH2OD, n.VYCH2DO,
n.PRICINA, n.ICPE, n.VYCH3OD, n.VYCH3DO, n.KONTAKT_TEL,
h.IDHISTDOC, h.ODBAVENO,
IIF((H.IDHISTDOC is not null),
(select HD.TYP from HISTDOC HD where HD.ID = H.IDHISTDOC), null) as HISTDOCTYP
from HPN h
left join NES n on (n.id = h.idnes)
left join KAR k on (k.IdPac = n.IdPac)
left join UZIVATEL u on (u.IdUzi = h.IdUzi)
left join HPN_PODANI hp on (hp.ID = h.IdPodani)
where h.IdNes = ?
ORDER BY h.Datum ASC, h.Cas ASC, h.Id ASC
```
### 5. Kontrola čekajících podání (PN s neodeslanými HPN záznamy)
```sql
select nes.id from nes
where nes.idpac = ?
and nes.storno = 'F'
and nes.epodani = 'T'
and nes.icpe = ?
and coalesce(nes.verze_dpn, '') not in ('', 'p', 'o')
and exists (
select 1 from hpn
where hpn.idnes = nes.id
and hpn.storno = 'F'
and hpn.typ in ('1', '2', 'H')
and hpn.idpodani is null -- dosud neodesláno
and hpn.stav <> 99
and hpn.stav <> 10
)
```
### 6. Kontrola potvrzení vydaného tento měsíc (POTVRZENI_VYDANO)
Medicus kontroluje zda pro daného pacienta existuje PN aktivní alespoň 10 dní,
u které ještě nebylo vydáno potvrzení v aktuálním měsíci:
```sql
select first 1 ZACNES, CISNES, ID, POTVRZENI_VYDANO
from NES
where (IDPAC = ?)
and (? >= ZACNES + 10) -- PN trvá alespoň 10 dní
and ((KONNES is NULL) or (KONNES > ?))
and (STORNO = 'F')
and (
extract(month from POTVRZENI_VYDANO) || extract(year from POTVRZENI_VYDANO)
=
extract(month from cast(? as date)) || extract(year from cast(? as date))
)
```
### 7. Vyhledání HPN záznamu podle ICPE v XML datech
Číslo `11031812` (ICPE lékaře) se hledá přímo v obsahu XML blobu `HPN.DATA`.
Medicus takto identifikuje konkrétní HPN záznam při opravě nebo ověření stavu:
```sql
-- Neodeslané nebo čekající záznamy:
select h.id from HPN h
left join HPN h2 on (h2.OPRAVA_ID = h.ID)
where (h.storno = 'F')
and ((h.stav in (0,1)) or (h.stav is NULL))
and (h2.OPRAVA_ID is null)
and (H.DATA containing '11031812') -- hledá ICPE v XML obsahu
and (h.IDNES = ?)
-- Úspěšně odeslané záznamy (stav=1):
select h.id from HPN h
left join HPN h2 on (h2.OPRAVA_ID = h.ID)
where (h.storno = 'F')
and (h.stav = 1)
and h2.OPRAVA_ID is null
and (H.DATA containing '11031812')
and h.IDNES = ?
```
### 8. Předání/Převzetí (záložka "Předání/Převzetí")
```sql
SELECT ID, IDNES, KAMODKUD, DATUM, KAM, ICZ, ICPE, ICO, JMENO_LEKARE
FROM NESD
WHERE IDNES = ?
ORDER BY DATUM ASC, ID ASC
```
### Poznámky
- **HISTDOC** každé odeslání formuláře vytváří záznam v HISTDOC; HPN bez IDHISTDOC se v UI nezobrazí
- **HPN.STAV** stav podání (1 = odesláno/přijato)
- **HPN.ODBAVENO** příznak zpracování (`'F'` = ne, `'T'` = ano)
- HPN záznamy TYP='2' (průběžná potvrzení) **nemají IDHISTDOC** JOIN s HISTDOC by je odfiltroval. Pro datum posledního potvrzení v reportu proto používáme prostý MAX bez JOINu. HISTDOC mají pouze TYP='P' (ukončení PN).
---
## Vazby tabulky NES (zjištěno z DB)
### Formální FK constrainty
| Směr | Vazba | Popis |
|---|---|---|
| NES → KAR | `NES.IDPAC → KAR.IDPAC` | Každá neschopenka patří pacientovi v kartotéce |
| HPN → NES | `HPN.IDNES → NES.ID` | Formuláře/hlášení HPN odkazují na konkrétní neschopenku |
### Logické vazby (bez FK constraintu)
| Tabulka | Pole | Poznámka |
|---|---|---|
| NESD | `NESD.IDNES → NES.ID` | Předání/převzetí PN vazba jen kódem, ne constraintem |
| HISTDOC | `HPN.IDHISTDOC → HISTDOC.ID` | Dokumenty k formulářům vazba přes HPN |
### Indexy na NES
| Index | Unique | Pole |
|---|---|---|
| PK_NES | Ano | ID |
| FK_NES_KAR | Ne | IDPAC |
| NES_POTVRZENI_VYDANO | Ne | POTVRZENI_VYDANO |
### Poznámka
Medicus obecně používá minimum DB constraintů většina vazeb je řešena aplikačním kódem.
`NESD` a `HISTDOC` nemají formální FK na `NES`, přesto jsou klíčové pro zobrazení PN v UI.
---
## Červené zvýraznění
Sloupce "Posl. potvrzení" a "Dní od potvr." jsou červeně zvýrazněny pokud:
- Od posledního potvrzení uplynulo více než 14 dní, nebo
- PN nemá žádné potvrzení a trvá déle než 14 dní (zobrazí se s `(!)`)
## Tisk
Skript používá `win32api.ShellExecute` s příkazem `'print'` odešle PDF
na výchozí tiskárnu Windows.
## Automatizace
Plánované spouštění každé pondělí a pátek ráno přes Windows Task Scheduler
zatím nenastaveno, připravit až bude skript stabilní.
## Soubory
| Soubor | Obsah |
|---|---|
| `pn_report.py` | Hlavní skript DB dotaz, generování PDF, tisk |
| `PN.md` | Tento soubor dokumentace |
+320
View File
@@ -0,0 +1,320 @@
"""
pn_report.py Report aktivních pracovních neschopností
Generuje PDF a posílá na výchozí tiskárnu.
Spuštění:
python pn_report.py # vytvoří PDF a vytiskne
python pn_report.py --no-print # jen vytvoří PDF (pro testování)
Požadavky:
pip install reportlab pywin32
"""
import fdb
import sys
import os
import tempfile
from datetime import date, datetime
from reportlab.lib.pagesizes import A4
from reportlab.lib import colors
from reportlab.lib.units import cm
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
# ---------------------------------------------------------------------------
# Konfigurace
# ---------------------------------------------------------------------------
DB_DSN = r'localhost:c:\medicus 3\data\medicus.fdb'
DB_USER = 'SYSDBA'
DB_PASS = 'masterkey'
DB_CHARSET = 'win1250'
# Pokud chcete soubor uložit trvale, nastavte výstupní adresář:
OUTPUT_DIR = None # None = dočasný soubor, smaže se po tisku
#OUTPUT_DIR = r'u:\Dropbox\!!!Days\Downloads Z230'
# ---------------------------------------------------------------------------
# Dotaz aktivní PN (konnes IS NULL nebo v budoucnosti, storno='F')
# ---------------------------------------------------------------------------
SQL = """
SELECT
nes.id AS idnes,
nes.idpac,
TRIM(kar.prijmeni) || ', ' || TRIM(kar.jmeno) AS jmeno,
kar.rodcis,
nes.zacnes,
nes.konnes,
nes.diagno,
COALESCE(nes.ecn, nes.cisnes) AS cisnes,
(SELECT MAX(h.datum) FROM hpn h
WHERE h.idnes = nes.id AND h.typ = 'P' AND h.storno = 'F') AS posl_potvrzeni
FROM nes, kar
WHERE nes.zacnes <= current_date
AND nes.konnes IS NULL
AND nes.idpac = kar.idpac
AND nes.pracne = 'A'
AND nes.storno <> 'T'
AND (
NOT EXISTS (SELECT id FROM nesd WHERE nesd.idnes = nes.id)
OR (SELECT FIRST 1 kam FROM nesd WHERE nesd.idnes = nes.id
ORDER BY nesd.datum DESC, nesd.id DESC) = 'N'
)
ORDER BY kar.prijmeni ASC, kar.jmeno ASC
"""
# ---------------------------------------------------------------------------
# Pomocné funkce
# ---------------------------------------------------------------------------
def fmt_date(val):
"""Datum → DD.MM.YYYY nebo prázdný řetězec."""
if val is None:
return ''
if isinstance(val, (date, datetime)):
return val.strftime('%d.%m.%Y')
return str(val)
def fmt_str(val):
if val is None:
return ''
return str(val).strip()
def delka_pn(zacnes, konnes):
"""Počet dnů PN (od začátku do dnes / do konce)."""
if zacnes is None:
return ''
end = konnes if konnes else date.today()
if isinstance(zacnes, datetime):
zacnes = zacnes.date()
if isinstance(end, datetime):
end = end.date()
if isinstance(zacnes, date) and isinstance(end, date):
days = (end - zacnes).days + 1
return str(days)
return ''
# ---------------------------------------------------------------------------
# Načtení fontu s českou diakritikou
# ---------------------------------------------------------------------------
def register_font():
"""
Zkusí zaregistrovat DejaVuSans (umí win1250 znaky).
Fallback: Helvetica (bez diakritiky nouzové řešení).
"""
font_paths = [
r'C:\Windows\Fonts\DejaVuSans.ttf',
r'C:\Windows\Fonts\arial.ttf',
r'C:\Windows\Fonts\segoeui.ttf',
]
for path in font_paths:
if os.path.exists(path):
name = os.path.splitext(os.path.basename(path))[0]
try:
pdfmetrics.registerFont(TTFont(name, path))
pdfmetrics.registerFont(TTFont(name + '-Bold',
path.replace('.ttf', 'bd.ttf') if 'arial' in path.lower()
else path.replace('.ttf', '-Bold.ttf')
if os.path.exists(path.replace('.ttf', '-Bold.ttf'))
else path
))
return name, name + '-Bold'
except Exception:
continue
return 'Helvetica', 'Helvetica-Bold'
# ---------------------------------------------------------------------------
# Generování PDF
# ---------------------------------------------------------------------------
def build_pdf(rows, output_path, font_name, font_bold):
today_str = date.today().strftime('%d.%m.%Y')
weekday_cs = ['pondělí', 'úterý', 'středa', 'čtvrtek', 'pátek', 'sobota', 'neděle']
weekday = weekday_cs[date.today().weekday()]
doc = SimpleDocTemplate(
output_path,
pagesize=A4,
topMargin=1.5*cm,
bottomMargin=1.5*cm,
leftMargin=1.5*cm,
rightMargin=1.5*cm,
)
styles = getSampleStyleSheet()
def para(text, size=10, bold=False, align='LEFT', color=colors.black):
fn = font_bold if bold else font_name
al = {'LEFT': 0, 'CENTER': 1, 'RIGHT': 2}.get(align, 0)
from reportlab.platypus import Paragraph as P
from reportlab.lib.styles import ParagraphStyle
st = ParagraphStyle('x', fontName=fn, fontSize=size,
textColor=color, alignment=al, leading=size*1.3)
return P(text, st)
story = []
# Záhlaví
story.append(para('MUDr. Buzalková Michaela ordinace praktického lékaře',
size=9, color=colors.grey))
story.append(para(f'Aktivní pracovní neschopnosti',
size=16, bold=True))
story.append(para(f'Vytištěno: {weekday} {today_str} | Počet záznamů: {len(rows)}',
size=9, color=colors.grey))
story.append(Spacer(1, 0.4*cm))
if not rows:
story.append(para('Žádné aktivní pracovní neschopnosti.', size=12))
doc.build(story)
return
# Záhlaví tabulky
headers = ['#', 'Příjmení a jméno', 'Rod. číslo', 'Začátek PN',
'Dnů', 'Diagnóza', 'Posl. potvrzení', 'Dní od potvr.']
col_widths = [0.7*cm, 5.5*cm, 2.8*cm, 2.4*cm, 1.4*cm, 2.2*cm, 3.0*cm, 2.2*cm]
table_data = [headers]
overdue_rows = [] # indexy řádků kde je potvrzení po splatnosti
for idx, row in enumerate(rows, start=1):
idnes, idpac, jmeno, rodcis, zacnes, konnes, diagno, cisnes, posl_potvrzeni = row
# Počet dní od posledního potvrzení
if posl_potvrzeni is not None:
pp = posl_potvrzeni.date() if isinstance(posl_potvrzeni, datetime) else posl_potvrzeni
dni_od = (date.today() - pp).days
dni_od_str = str(dni_od)
if dni_od > 14:
overdue_rows.append(idx + 1) # +1 kvůli záhlaví
else:
# Žádné potvrzení počítáme od začátku PN
zac = zacnes.date() if isinstance(zacnes, datetime) else zacnes
if zac:
dni_od = (date.today() - zac).days
dni_od_str = str(dni_od) + ' (!)'
if dni_od > 14:
overdue_rows.append(idx + 1)
else:
dni_od_str = ''
table_data.append([
str(idx),
fmt_str(jmeno),
fmt_str(rodcis),
fmt_date(zacnes),
delka_pn(zacnes, konnes),
fmt_str(diagno),
fmt_date(posl_potvrzeni) if posl_potvrzeni else '',
dni_od_str,
])
tbl = Table(table_data, colWidths=col_widths, repeatRows=1)
style = TableStyle([
# Záhlaví
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#2F5496')),
('TEXTCOLOR', (0, 0), (-1, 0), colors.white),
('FONTNAME', (0, 0), (-1, 0), font_bold),
('FONTSIZE', (0, 0), (-1, 0), 8),
('ALIGN', (0, 0), (-1, 0), 'CENTER'),
('BOTTOMPADDING',(0, 0), (-1, 0), 5),
('TOPPADDING', (0, 0), (-1, 0), 5),
# Data
('FONTNAME', (0, 1), (-1, -1), font_name),
('FONTSIZE', (0, 1), (-1, -1), 8),
('ALIGN', (0, 1), (0, -1), 'CENTER'), # #
('ALIGN', (5, 1), (5, -1), 'RIGHT'), # Dnů
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('TOPPADDING', (0, 1), (-1, -1), 3),
('BOTTOMPADDING',(0, 1), (-1, -1), 3),
# Mřížka
('GRID', (0, 0), (-1, -1), 0.3, colors.HexColor('#AAAAAA')),
('LINEBELOW', (0, 0), (-1, 0), 1, colors.HexColor('#2F5496')),
# Zebra
*[('BACKGROUND', (0, i), (-1, i), colors.HexColor('#DCE6F1'))
for i in range(2, len(table_data), 2)],
# Červené zvýraznění potvrzení po splatnosti (> 14 dní)
*[('BACKGROUND', (6, i), (7, i), colors.HexColor('#F4CCCC'))
for i in overdue_rows],
*[('TEXTCOLOR', (6, i), (7, i), colors.HexColor('#CC0000'))
for i in overdue_rows],
*[('FONTNAME', (6, i), (7, i), font_bold)
for i in overdue_rows],
])
tbl.setStyle(style)
story.append(tbl)
# Patička
story.append(Spacer(1, 0.5*cm))
story.append(para(f'--- konec reportu ({len(rows)} záznamů) ---',
size=8, color=colors.grey, align='CENTER'))
doc.build(story)
# ---------------------------------------------------------------------------
# Tisk
# ---------------------------------------------------------------------------
def print_pdf(path):
"""Pošle PDF na výchozí tiskárnu přes Windows ShellExecute."""
try:
import win32api
win32api.ShellExecute(0, 'print', path, None, '.', 0)
print(f'Odesláno na tiskárnu: {path}')
except ImportError:
# Fallback otevře soubor v PDF prohlížeči (ruční tisk)
print('pywin32 není nainstalován, otevírám PDF...')
os.startfile(path)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
no_print = '--no-print' in sys.argv
# Připojení k DB
print('Připojuji se k DB...')
conn = fdb.connect(dsn=DB_DSN, user=DB_USER, password=DB_PASS, charset=DB_CHARSET)
cur = conn.cursor()
print('Načítám aktivní PN...')
cur.execute(SQL)
rows = cur.fetchall()
conn.close()
print(f'Nalezeno {len(rows)} aktivních PN.')
# Font
font_name, font_bold = register_font()
print(f'Font: {font_name}')
# Výstupní soubor
if OUTPUT_DIR:
os.makedirs(OUTPUT_DIR, exist_ok=True)
out_path = os.path.join(OUTPUT_DIR,
date.today().strftime('%Y-%m-%d') + '_pn_report.pdf')
else:
fd, out_path = tempfile.mkstemp(suffix='_pn_report.pdf')
os.close(fd)
print(f'Generuji PDF: {out_path}')
build_pdf(rows, out_path, font_name, font_bold)
print('PDF hotovo.')
if no_print:
print('(tisk přeskočen --no-print)')
# Otevřeme pro náhled
os.startfile(out_path)
else:
print_pdf(out_path)
# Dočasný soubor necháme tiskárna ho potřebuje přečíst
# Windows ho smaže sám po zpracování tisku (temp adresář)
if __name__ == '__main__':
main()
+28
View File
@@ -0,0 +1,28 @@
import fdb, sys
sys.stdout.reconfigure(encoding='utf-8')
conn = fdb.connect(dsn=r'localhost:c:\medicus 3\data\medicus.fdb', user='SYSDBA', password='masterkey', charset='win1250')
cur = conn.cursor()
sql = """
SELECT nes.id, TRIM(kar.prijmeni) || ', ' || TRIM(kar.jmeno) AS jmeno,
nes.zacnes,
(SELECT MAX(h.datum) FROM hpn h
WHERE h.idnes = nes.id AND h.typ = 'P' AND h.storno = 'F') AS posl_potvrzeni
FROM nes, kar
WHERE nes.zacnes <= current_date
AND nes.konnes IS NULL
AND nes.idpac = kar.idpac
AND nes.pracne = 'A'
AND nes.storno <> 'T'
AND (
NOT EXISTS (SELECT id FROM nesd WHERE nesd.idnes = nes.id)
OR (SELECT FIRST 1 kam FROM nesd WHERE nesd.idnes = nes.id
ORDER BY nesd.datum DESC, nesd.id DESC) = 'N'
)
ORDER BY kar.prijmeni ASC
"""
cur.execute(sql)
for row in cur.fetchall():
print(row)
conn.close()
Binary file not shown.
+117
View File
@@ -0,0 +1,117 @@
import sys, io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
import fdb
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment
from datetime import date, timedelta
import os
conn = fdb.connect(
dsn=r'localhost:c:\medicus 3\data\medicus.fdb',
user='SYSDBA', password='masterkey', charset='win1250'
)
cur = conn.cursor()
zacatek = date(2025, 1, 1)
konec = date.today()
dny = []
d = zacatek
while d <= konec:
dny.append(d)
d += timedelta(days=1)
print(f"Počítám {len(dny)} dní ({zacatek} {konec})...")
vysledky = []
for i, den in enumerate(dny):
# Počet registrovaných
cur.execute(f"""
SELECT COUNT(*) FROM KAR
WHERE vyrazen = 'N'
AND EXISTS (
SELECT id FROM registr r
JOIN icp i ON r.idicp = i.idicp
WHERE r.idpac = kar.idpac
AND r.datum <= '{den}'
AND (r.datum_zruseni IS NULL OR r.datum_zruseni >= '{den}')
AND r.priznak IN ('V','D','A')
AND i.icp = '09305001'
AND i.odb = '001'
)
""")
pocet = cur.fetchone()[0]
# Zaregistrovaní tento den
cur.execute(f"""
SELECT k.RODCIS, k.PRIJMENI, k.JMENO
FROM REGISTR r JOIN KAR k ON k.IDPAC = r.IDPAC
WHERE r.datum = '{den}'
AND r.priznak IN ('V','D','A')
ORDER BY k.PRIJMENI, k.JMENO
""")
zaregistrovani = [f"{row[0]} {row[1].strip()} {row[2]}" for row in cur.fetchall()]
# Odregistrovaní tento den
cur.execute(f"""
SELECT k.RODCIS, k.PRIJMENI, k.JMENO
FROM REGISTR r JOIN KAR k ON k.IDPAC = r.IDPAC
WHERE r.datum_zruseni = '{den}'
ORDER BY k.PRIJMENI, k.JMENO
""")
odregistrovani = [f"{row[0]} {row[1].strip()} {row[2]}" for row in cur.fetchall()]
vysledky.append((den, pocet, zaregistrovani, odregistrovani))
if (i + 1) % 30 == 0:
print(f" {i+1}/{len(dny)}: {den}{pocet}")
conn.close()
# Excel
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Registrace"
hlavicka_font = Font(bold=True, color="FFFFFF")
hlavicka_fill = PatternFill("solid", fgColor="2E75B6")
ws.column_dimensions['A'].width = 14
ws.column_dimensions['B'].width = 14
ws.column_dimensions['C'].width = 10
ws.column_dimensions['D'].width = 45
ws.column_dimensions['E'].width = 45
for col, nazev in enumerate(['Datum', 'Registrovaných', 'Změna', 'Zaregistrováno', 'Odregistrováno'], start=1):
cell = ws.cell(row=1, column=col, value=nazev)
cell.font = hlavicka_font
cell.fill = hlavicka_fill
cell.alignment = Alignment(horizontal='center')
predchozi = None
for row_i, (den, pocet, zaregistrovani, odregistrovani) in enumerate(vysledky, start=2):
ws.cell(row=row_i, column=1, value=den).number_format = 'DD.MM.YYYY'
ws.cell(row=row_i, column=2, value=pocet).alignment = Alignment(horizontal='center')
if predchozi is not None:
zmena = pocet - predchozi
cell = ws.cell(row=row_i, column=3, value=zmena)
cell.alignment = Alignment(horizontal='center')
if zmena > 0:
cell.font = Font(color="00AA00", bold=True)
elif zmena < 0:
cell.font = Font(color="CC0000", bold=True)
predchozi = pocet
if zaregistrovani:
cell = ws.cell(row=row_i, column=4, value="\n".join(zaregistrovani))
cell.alignment = Alignment(wrap_text=True, vertical='top')
cell.font = Font(color="00AA00")
if odregistrovani:
cell = ws.cell(row=row_i, column=5, value="\n".join(odregistrovani))
cell.alignment = Alignment(wrap_text=True, vertical='top')
cell.font = Font(color="CC0000")
ws.freeze_panes = 'A2'
vystup = os.path.join(os.path.dirname(__file__), 'registrace_2025_dnes.xlsx')
wb.save(vystup)
print(f"\nUloženo: {vystup}")
+143
View File
@@ -0,0 +1,143 @@
# MedicusWithClaudePosudek poznámky pro Clauda
## O co jde
Lékařské posudky vystavované MUDr. Buzalkovou. Prozatím řešíme posudky k řízení motorových vozidel.
Nový zákon ukládá povinnost odesílat posudky k řízení do **centrálního registru** tuto funkci Medicus přidal v aktualizaci z konce března 2026.
---
## Tabulky
### HISTDOC hlavní tabulka pro všechny posudky
Všechny posudky jsou záznamy v `HISTDOC`, lišící se hodnotou sloupce `TYP`.
Klíčové sloupce:
| Sloupec | Popis |
|---|---|
| `ID` | primární klíč |
| `TYP` | typ dokumentu (viz níže) |
| `DATUM` | datum vystavení posudku |
| `IDPACI` | FK → KAR.IDPAC (pacient) |
| `DATA` | obsah posudku text ve formátu key=value (viz níže) |
| `PORCISLO` | pořadové číslo posudku (= PorCislo v DATA) |
| `STAV` | stav záznamu (Z = zavřeno) |
| `PRINTED` | T/F byl vytištěn |
| `IDUZIV` | FK → UZIVATEL.IDUZI kdo vystavil (4 = MUDr. Buzalková) |
| `CREATED` | timestamp vytvoření záznamu |
**Vazba:** žádná přímá vazba na jiné tabulky (vyšetření, dekurz apod.) posudek je svébytný dokument.
### TYP hodnoty relevantní pro posudky řidičů
| TYP | Popis | Počet (k 2026-03-31) |
|---|---|---|
| `MOTORVO` | ruční posudek k řízení motorových vozidel | 1530 |
| `EPOSMRO` | elektronické podání posudku do centrálního registru | 2 |
Ostatní typy posudků v HISTDOC (pro referenci):
- `ZBROJPR`, `ZBROJP2` zbrojní průkaz
- `ZPUPRN` způsobilost pro práci
- `ZDRSTA3``ZDRSTA5`, `ZDRSTAV`, `ZDRINF` zdravotní stav (různé varianty)
- ... (celkem desítky typů)
### HISTDOC_EPOSUDEK evidence odeslání do registru
Doplňková tabulka k EPOSMRO záznamům v HISTDOC.
| Sloupec | Popis |
|---|---|
| `ID_HISTDOC` | FK → HISTDOC.ID (záznam EPOSMRO) |
| `ID_PODANI` | UUID přidělené centrálním registrem |
| `ODESLANO` | timestamp odeslání |
| `STATUS` | O = odesláno |
| `VERZE` | verze záznamu (base64 interní hodnota) |
### VS_POSUDKY prázdná, zatím nepoužívaná
Sloupce: ID, IDPAC, DATA (BLOB), DATUM, POSTYPE. Pravděpodobně připravena pro budoucí využití.
---
## Workflow: ruční posudek → elektronické podání
1. Lékař v Medicusu vyplní posudek → vznikne `HISTDOC` TYP=`MOTORVO`
2. Medicus automaticky odešle do centrálního registru → vznikne `HISTDOC` TYP=`EPOSMRO` + záznam v `HISTDOC_EPOSUDEK`
3. Oba záznamy mají stejné `IDPACI` + `DATUM` → podle toho je párujeme
Příklad (pacient Vráček, 30.3.2026):
- HISTDOC ID=34743, TYP=MOTORVO, CREATED=13:12
- HISTDOC ID=34746, TYP=EPOSMRO, CREATED=13:21
- HISTDOC_EPOSUDEK: STATUS=O, ODESLANO=13:21
---
## Formát DATA (key=value) MOTORVO
```
JmenoPac=Radomil Vráček
DatNar=D:27.03.1956
Prukaz=207069669 ← číslo řidičského průkazu
DatKonec=D:30.03.2028 ← platnost posudku do
DatumVyd=D:30.03.2026 ← datum vydání
Bydliste=K Šafránce 507/16, 19000 Praha 9-Střížkov
DruhProh=periodická ← druh prohlídky
Posouzeni=T ← T = způsobilý (F = nezpůsobilý?)
Posouzeni2=F ← T = nezpůsobilý (druhá volba)
ZpusobJe=B:0 ← skupiny bez podmínky
ZpusobPodminka=B:1 ← B:1 = má podmínku
SkupinaPodminka=sk. B brýle
PorCislo=2600037
KonecDleZakona=D
DatumPrevzeti=D:30.03.2026
```
**Výsledek posouzení** (kombinace Posouzeni + Posouzeni2 + ZpusobPodminka):
- `Posouzeni=T` + `Posouzeni2=F` + `ZpusobPodminka=B:0` → způsobilý
- `Posouzeni=T` + `Posouzeni2=F` + `ZpusobPodminka=B:1` → způsobilý s podmínkou
- `Posouzeni=T` + `Posouzeni2=T` → nezpůsobilý
## Formát DATA (key=value) EPOSMRO
```
Lekar=MUDr. Michaela Buzalková
KRZPID=130153584 ← ID lékaře v registru
ICO=68366370
ICP=09305001
Pacient=Radomil Vráček
RID=8705636888 ← číslo řidičáku
DatumNarozeni=D:27.03.1956
StavPosudkuKodVerze=zneplatneny|1.0.0
StavPosudkuNazev=Zneplatněný ← stav posudku v registru
TypAkceNazev=vytvoření
TypAkceKodVerze=akce_ro_1|1.0.0
DruhProhlidkyNazev=pravidelná
DruhProhlidkyKodVerze=Pravidelna|1.0.0
DruhPosudkuNazev=řidičské oprávnění pro seniory
DruhPosudkuKodVerze=SenioriRo|1.0.0
SkupinaZadatelRidicNazev=skupina 1
SkupinyRidicskehoOpravneniSeznam=B
HarmonizovaneNarodniKody=$:~HNK1:011:01.01 Brýle5:01.012:HK1:B0: ← kódy omezení (brýle)
VysledekKodVerze=ZpusobilySPodminkou|1.0.0
VysledekNazev=způsobilý s podmínkou
DatumVystaveni=D:30.03.2026
PlatnostDo=D:30.03.2028
```
**StavPosudku = "Zneplatněný"** neznamená chybu jde o akci, kdy lékař odvolá způsobilost pacienta (např. po mrtvici, epileptickém záchvatu apod.). Medicus pak odešle do registru zneplatnění existujícího posudku.
---
## Soubory v projektu
- `posudky_report.py` generuje Excel s listy MOTORVO a EPOSMRO
- `CLAUDE_NOTES.md` tento soubor
## Report (posudky_report.py)
- Výstup: `u:\Dropbox\!!!Days\Downloads Z230\YYYY-MM-DD_HH-MM-SS_Přehled posudků řidičák.xlsx`
- Maže předchozí verzi před zápisem nové
- List MOTORVO: 1530 záznamů, sloupec `ePosudek` = ANO (zeleně) / NE podle toho, zda byl odeslán ePosudek (párování IDPACI + DATUM)
- List EPOSMRO: 2 záznamy, detail elektronického podání
+237
View File
@@ -0,0 +1,237 @@
import fdb
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment
from openpyxl.utils import get_column_letter
from datetime import datetime
import os
import sys
# --- Připojení ---
conn = fdb.connect(
dsn=r'localhost:c:\medicus 3\data\medicus.fdb',
user='SYSDBA', password='masterkey', charset='win1250'
)
cur = conn.cursor()
# --- Výstupní soubor ---
output_dir = r'u:\Dropbox\!!!Days\Downloads Z230'
now = datetime.now()
filename = now.strftime('%Y-%m-%d_%H-%M-%S') + '_Přehled posudků řidičák.xlsx'
output_path = os.path.join(output_dir, filename)
# --- Smazání předchozích verzí ---
for f in os.listdir(output_dir):
if f.endswith('_Přehled posudků řidičák.xlsx'):
os.remove(os.path.join(output_dir, f))
wb = openpyxl.Workbook()
# =====================
# Pomocné funkce
# =====================
HEADER_FILL = PatternFill('solid', fgColor='2F5496')
HEADER_FONT = Font(bold=True, color='FFFFFF')
ZEBRA_FILL = PatternFill('solid', fgColor='DCE6F1')
GREEN_FILL = PatternFill('solid', fgColor='C6EFCE')
GREEN_FONT = Font(bold=True, color='276221')
def style_header(ws):
for cell in ws[1]:
cell.fill = HEADER_FILL
cell.font = HEADER_FONT
cell.alignment = Alignment(horizontal='center')
def autofit(ws):
for col in ws.columns:
max_len = max((len(str(cell.value)) if cell.value is not None else 0) for cell in col)
ws.column_dimensions[get_column_letter(col[0].column)].width = min(max_len + 2, 50)
def fmt(val):
if val is None:
return ''
return val
def parse_data(data_str):
"""Parsuje key=value text z HISTDOC.DATA do slovníku."""
result = {}
if not data_str:
return result
for line in data_str.splitlines():
if '=' in line:
key, _, val = line.partition('=')
result[key.strip()] = val.strip()
return result
def parse_date(val):
"""Převede 'D:DD.MM.YYYY' na datetime.date, nebo vrátí původní hodnotu."""
if val and val.startswith('D:'):
try:
return datetime.strptime(val[2:], '%d.%m.%Y').date()
except ValueError:
return val
return val
# =====================
# List 1 MOTORVO (ruční posudky k řízení)
# =====================
ws1 = wb.active
ws1.title = 'MOTORVO'
# Množina (IDPACI, DATUM) kde existuje EPOSMRO
cur.execute("""
SELECT IDPACI, DATUM FROM HISTDOC WHERE TYP = 'EPOSMRO'
""")
eposmro_keys = set((r[0], r[1]) for r in cur.fetchall())
cur.execute("""
SELECT h.ID, h.DATUM, h.IDPACI,
k.PRIJMENI, k.JMENO, k.RODCIS,
h.DATA, h.PORCISLO, h.STAV, h.PRINTED, h.IDUZIV, h.CREATED
FROM HISTDOC h
JOIN KAR k ON k.IDPAC = h.IDPACI
WHERE h.TYP = 'MOTORVO'
ORDER BY h.ID DESC
""")
raw_rows = cur.fetchall()
headers = [
'ID', 'DATUM', 'IDPACI', 'PRIJMENI', 'JMENO', 'RODCIS',
'PorCislo', 'DatumVyd', 'DatKonec', 'DruhProh',
'Posouzeni', 'ZpusobPodminka', 'SkupinaPodminka',
'Skupiny',
'ePosudek',
'STAV', 'PRINTED', 'IDUZIV', 'CREATED'
]
ws1.append(headers)
for i, row in enumerate(raw_rows, start=2):
(hid, datum, idpac, prijmeni, jmeno, rodcis,
data_blob, porcislo, stav, printed, iduziv, created) = row
data = parse_data(data_blob)
posouzeni = ''
if data.get('Posouzeni') == 'T':
if data.get('Posouzeni2') == 'T':
posouzeni = 'nezpůsobilý'
elif data.get('ZpusobPodminka') == 'B:1':
posouzeni = 'způsobilý s podmínkou'
else:
posouzeni = 'způsobilý'
skupiny = data.get('SkupinyRidicskehoOpravneniSeznam', '')
if not skupiny:
# MOTORVO nemá SkupinyRidicskehoOpravneniSeznam, zkusíme ZpusobJe
skupiny = data.get('ZpusobJe', '')
ws1.append([
hid,
fmt(datum),
idpac,
fmt(prijmeni),
fmt(jmeno),
fmt(rodcis),
fmt(porcislo or data.get('PorCislo', '')),
parse_date(data.get('DatumVyd', '')),
parse_date(data.get('DatKonec', '')),
fmt(data.get('DruhProh', '')),
posouzeni,
fmt(data.get('ZpusobPodminka', '')),
fmt(data.get('SkupinaPodminka', '')),
fmt(skupiny),
'ANO' if (idpac, datum) in eposmro_keys else 'NE',
fmt(stav),
fmt(printed),
fmt(iduziv),
fmt(created),
])
if i % 2 == 0:
for cell in ws1[i]:
cell.fill = ZEBRA_FILL
# Sloupec ePosudek zvýraznit ANO zeleně
epos_col = headers.index('ePosudek') + 1
cell = ws1.cell(row=i, column=epos_col)
if cell.value == 'ANO':
cell.fill = GREEN_FILL
cell.font = GREEN_FONT
style_header(ws1)
ws1.freeze_panes = 'A2'
autofit(ws1)
# =====================
# List 2 EPOSMRO (elektronická podání do registru)
# =====================
ws2 = wb.create_sheet('EPOSMRO')
cur.execute("""
SELECT h.ID, h.DATUM, h.IDPACI,
k.PRIJMENI, k.JMENO, k.RODCIS,
h.DATA, h.STAV, h.CREATED,
e.ID_PODANI, e.ODESLANO, e.STATUS
FROM HISTDOC h
JOIN KAR k ON k.IDPAC = h.IDPACI
LEFT JOIN HISTDOC_EPOSUDEK e ON e.ID_HISTDOC = h.ID
WHERE h.TYP = 'EPOSMRO'
ORDER BY h.ID DESC
""")
epos_rows = cur.fetchall()
headers2 = [
'ID', 'DATUM', 'IDPACI', 'PRIJMENI', 'JMENO', 'RODCIS',
'DatumVyd', 'DatKonec', 'DruhProhlidky', 'DruhPosudku',
'Vysledek', 'StavPosudku', 'TypAkce',
'STAV', 'CREATED',
'ID_PODANI', 'ODESLANO', 'STATUS_ODESL'
]
ws2.append(headers2)
for i, row in enumerate(epos_rows, start=2):
(hid, datum, idpac, prijmeni, jmeno, rodcis,
data_blob, stav, created,
id_podani, odeslano, status_odesl) = row
data = parse_data(data_blob)
ws2.append([
hid,
fmt(datum),
idpac,
fmt(prijmeni),
fmt(jmeno),
fmt(rodcis),
parse_date(data.get('DatumVystaveni', '')),
parse_date(data.get('PlatnostDo', '')),
fmt(data.get('DruhProhlidkyNazev', '')),
fmt(data.get('DruhPosudkuNazev', '')),
fmt(data.get('VysledekNazev', '')),
fmt(data.get('StavPosudkuNazev', '')),
fmt(data.get('TypAkceNazev', '')),
fmt(stav),
fmt(created),
fmt(id_podani),
fmt(odeslano),
fmt(status_odesl),
])
if i % 2 == 0:
for cell in ws2[i]:
cell.fill = ZEBRA_FILL
style_header(ws2)
ws2.freeze_panes = 'A2'
autofit(ws2)
# =====================
# Uložení
# =====================
conn.close()
wb.save(output_path)
sys.stdout.buffer.write(f'Ulozeno: {output_path}\n'.encode('utf-8'))
sys.stdout.buffer.write(f'MOTORVO: {len(raw_rows)} radku, EPOSMRO: {len(epos_rows)} radku\n'.encode('utf-8'))
+99
View File
@@ -215,3 +215,102 @@ UNION SELECT
UNION SELECT
first 1 cast('Pojistovna' as varchar(11)) as ID, cast(NULL as VARCHAR(254)) as VAR1, cast(NULL as VARCHAR(70)) as VAR2, cast(NULL as DATE) as DATE1, cast(NULL as DATE) as DATE2, cast(NULL as TIMESTAMP) as TIME1, cast(P.IDICP as INTEGER) as INT1, NULL as TEXT1, cast(NULL as NUMERIC(15,2)) as NUM1, cast(NULL as NUMERIC(15,2)) as NUM2 from ICP P join ICZ Z on (Z.IDICZ = P.IDICZ) where Z.POJ = '207' and P.ODB = '001'
```
---
## Ošetřující lékař pacienta
### Kde je uložen
Ošetřující lékař není přímo v tabulce `KAR`. Je uložen v tabulce **`KARUZIV`** a čte se
přes stored procedure **`KARUZIV_SEL`**.
### Tabulka KARUZIV
Vazba pacient → lékař. Jeden pacient může mít více záznamů (více lékařů/pracovišť).
| Sloupec | Popis |
|------------|-------|
| `IDPAC` | FK na KAR pacient |
| `IDLEKAR` | FK na LEKARI externí lékař (specialista, cizí ordinace) |
| `IDUZI` | FK na UZIVATEL interní uživatel Medicusu (vlastní lékař) |
| `IDPRAC` | FK na PRACOVISTE pracoviště |
| `IDODD` | FK na ODDEL oddělení |
| `AUTOMAT` | `'F'` = ručně přiřazen, `'T'` = automaticky |
Pokud je vyplněn `IDLEKAR` → jde se do tabulky `LEKARI` (cizí lékaři).
Pokud je vyplněn `IDUZI` → jde se do tabulky `UZIVATEL` (lékaři v Medicusu).
### Tabulka REGISTR
Druhý zdroj registrace pacienta u lékaře/pojišťovny.
| Sloupec | Popis |
|------------------|-------|
| `IDPAC` | FK na KAR |
| `IDICP` | FK na ICP identifikace pracoviště/pojišťovny |
| `IDUZI` | FK na UZIVATEL lékař (nepovinný, dohledává se přes ICP) |
| `DATUM` | Datum začátku registrace |
| `DATUM_ZRUSENI` | Datum zrušení (NULL = stále platná) |
| `PRIZNAK` | `'V'`/`'D'`/`'A'` = aktivní; `'Z'`/`'N'` = zrušená/neaktivní |
### Stored procedure KARUZIV_SEL(IIDPAC, INCL_AUTOMAT)
Parametry:
- `IIDPAC` IDPAC pacienta
- `INCL_AUTOMAT` `'T'` = vrátit i automaticky přiřazené, `'F'` = jen ruční
Vrací sloupce: `ID, IDPAC, IDODD, ODD, IDUZI, IDPRAC, IDLEKAR, AUTOMAT, TITUL, PRIJMENI, JMENO, TITUL2, ODBORN`
**Logika (3 průchody):**
1. `KARUZIV` kde `AUTOMAT = 'F'` ručně přiřazení lékaři
2. `KARUZIV` kde `AUTOMAT = 'T'` automaticky přiřazení (jen pokud `INCL_AUTOMAT = 'T'`)
3. `REGISTR` aktivní registrace (datum platný, `PRIZNAK``'Z'`/`'N'`, nezrušená)
- přes `IDICP``ICP``PRACOVISTE``PRACUZIV``UZIVATEL`
### Použití v panelu pacienta (UNION dotaz)
```sql
-- Ošetřující lékař praktický (odbornost 001 nebo 002)
SELECT ... FROM KARUZIV_SEL(:IDPAC, 'T') WHERE ODBORN in ('001', '002')
-- → UNION část ID = 'OseLekPrak'
-- Všichni lékaři přiřazení ke kartě
SELECT ... FROM KARUZIV_SEL(:IDPAC, 'T')
-- → UNION část ID = 'OseLek'
```
### Zapojené tabulky (přehled)
```
KAR
└── KARUZIV ──► LEKARI (externí lékaři, specialisté)
└► UZIVATEL (interní lékaři v Medicusu)
└► PRACOVISTE (pracoviště / odbornost)
└► ODDEL (oddělení)
└── REGISTR ──► ICP (identifikace pracoviště)
└► PRACOVISTE ──► PRACUZIV ──► UZIVATEL
```
### Barevné rozlišení v GUI Medicusu
- **Černá** = záznam pochází z `KARUZIV` (explicitně přiřazený ošetřující lékař, `IDUZI` vyplněno)
- **Červená** = záznam pochází z `REGISTR` (registrující lékař SP vrací `ID = 0 - REGISTR.ID`)
- **Červená (ext.)** = záznam z `KARUZIV` kde je vyplněno `IDLEKAR` (externí lékař z tabulky `LEKARI`)
### Duplikát ošetřujícího lékaře known issue
`KARUZIV_SEL` prochází **vždy oba zdroje** (KARUZIV i REGISTR) bez ohledu na to, zda už byl lékař nalezen. Pokud má pacient záznam v KARUZIV (černá) i v REGISTR (červená) se stejným lékařem, zobrazí se **dvakrát**.
Příčina: SP neobsahuje podmínku „přeskoč REGISTR, pokud KARUZIV již vrátil výsledky".
**Stav ordinace Buzalková (duben 2026):**
- Všech 1620 registrovaných pacientů má v `KARUZIV` záznam IDUZI=4 (Michaela, černá)
- 1537 pacientů má v `REGISTR` IDUZI=4 (Michaela, červená) → duplikát
- Chování je konzistentní, ale GUI zobrazuje oba řádky čeká se na vyjádření supportu Medicusu
**Možná řešení (zatím neaplikováno):**
- A) Smazat KARUZIV záznamy → zůstane jen červená z REGISTR (jeden řádek)
- B) Nastavit REGISTR.IDUZI zpět na NULL → REGISTR path hledá přes PRACOVISTE (najde Michalu jako první NOSVYK='A') → duplikát stále, ale přes jiný lookup
- C) Řešení přes support Medicusu
+155
View File
@@ -0,0 +1,155 @@
"""
Najde datum poslední preventivní prohlídky (výkon 01022 nebo 01021)
pro každého registrovaného pacienta z VZPARC DRUH=98 (výkonové dávky pojišťovnám).
Výsledek zapíše do KAR.POZNAMKA ve formátu:
[[prev_prohlidka:YYYY-MM-DD 01022, YYYY-MM-DD]]
kde první datum je datum PP a druhé je nejdřívější možný termín příští (23 měsíců).
"""
import fdb
import re
import sys
from datetime import date
from dateutil.relativedelta import relativedelta
sys.stdout = open(sys.stdout.fileno(), mode='w', encoding='utf-8', buffering=1)
KODY_PP = {'01022', '01021'}
conn = fdb.connect(
dsn=r'localhost:c:\medicus 3\data\medicus.fdb',
user='SYSDBA', password='masterkey', charset='win1250'
)
cur = conn.cursor()
# --- 1. Načti registrované pacienty (RC → IDPAC, aktuální POZNAMKA) ---
dnes = date.today().strftime('%Y-%m-%d')
cur.execute(f"""
SELECT KAR.IDPAC, KAR.RODCIS, KAR.PRIJMENI, KAR.JMENO, KAR.POZNAMKA
FROM KAR
WHERE KAR.VYRAZEN = 'N'
AND EXISTS (
SELECT r.ID FROM REGISTR r
JOIN ICP i ON r.IDICP = i.IDICP
WHERE r.IDPAC = KAR.IDPAC
AND r.DATUM <= '{dnes}'
AND (r.DATUM_ZRUSENI IS NULL OR r.DATUM_ZRUSENI >= '{dnes}')
AND r.PRIZNAK IN ('V','D','A')
AND i.ICP = '09305001'
AND i.ODB = '001'
)
""")
pacienti = {} # RODCIS -> {'idpac', 'prijmeni', 'jmeno', 'poznamka'}
for row in cur.fetchall():
idpac, rodcis, prijmeni, jmeno, poznamka = row
if rodcis:
pacienti[rodcis.strip()] = {
'idpac': idpac,
'prijmeni': prijmeni,
'jmeno': jmeno,
'poznamka': poznamka or '',
}
print(f"Registrovaných pacientů: {len(pacienti)}")
# --- 2. Projdi všechny VZPARC DRUH=98 dávky a hledej výkony 01022/01021 ---
# Výsledek: RC -> (nejnovejsi_datum, kod)
vysledky = {} # RC -> (nejnovejsi_datum, kod)
cur.execute("""
SELECT DAVKA FROM VZPARC
WHERE DRUH = '98' AND DAVKA IS NOT NULL
""")
davky = cur.fetchall()
print(f"VZPARC DRUH=98 blobů celkem: {len(davky)}")
for (kdavka_raw,) in davky:
if not kdavka_raw:
continue
# Dekódování CP852
try:
text = kdavka_raw.encode('cp1250', errors='replace').decode('cp852', errors='replace')
except Exception:
continue
if not text.startswith('DP98'):
continue
aktualni_rc = None
for line in text.splitlines():
if not line:
continue
typ = line[0]
if typ == 'A':
# RC je na pevné pozici 34, délka 10
if len(line) >= 44:
aktualni_rc = line[34:44].strip()
else:
aktualni_rc = None
elif typ == 'V' and aktualni_rc:
# V + DDMMYYYY + KOD(5)
if len(line) < 14:
continue
try:
dd = int(line[1:3])
mm = int(line[3:5])
yyyy = int(line[5:9])
kod = line[9:14].strip()
except (ValueError, IndexError):
continue
if kod not in KODY_PP:
continue
try:
datum = date(yyyy, mm, dd)
except ValueError:
continue
# Ulož jen nejnovější datum
if aktualni_rc not in vysledky or datum > vysledky[aktualni_rc][0]:
vysledky[aktualni_rc] = (datum, kod)
print(f"Pacientů s nalezenou PP (01022/01021): {len(vysledky)}")
# --- 3. Zapiš do KAR.POZNAMKA ---
TAG_RE = re.compile(r'\[\[prev_prohlidka:[^\]]*\]\]\s*#zaps[^#]*#')
TAG_FORMAT = '[[prev_prohlidka:{pp_datum} {kod}, {pristi_datum}]] #zapsáno {zapsano}#'
zapsano = 0
preskoceno = 0
nenalezeno = 0
from datetime import datetime
ted = datetime.now().strftime('%d-%m-%Y %H:%M')
for rc, (datum_pp, kod) in sorted(vysledky.items()):
if rc not in pacienti:
nenalezeno += 1
continue
pac = pacienti[rc]
pristi = datum_pp + relativedelta(months=23)
tag = TAG_FORMAT.format(
pp_datum=datum_pp.strftime('%d-%m-%Y'),
kod=kod,
pristi_datum=pristi.strftime('%d-%m-%Y'),
zapsano=ted,
)
stara = pac['poznamka']
# Odstraň starý tag a vlož nový na začátek
nova = TAG_RE.sub('', stara).lstrip('\n')
nova = tag + ('\n' + nova if nova else '')
cur.execute(
"UPDATE KAR SET POZNAMKA=? WHERE IDPAC=?",
(nova, pac['idpac'])
)
zapsano += 1
conn.commit()
print(f"Zapsáno: {zapsano}, nenalezeno v KAR: {nenalezeno}, přeskočeno: {preskoceno}")
conn.close()
+9
View File
@@ -0,0 +1,9 @@
# Medicus TO DO LIST
## Otevřené úkoly
1. **Změna Jourová → Buzalková v ošetřujících lékařích**
Zjistit od supportu Medicusu jak správně přiřadit ošetřujícího lékaře tak,
aby se v GUI zobrazoval pouze jeden řádek (černý). Aktuálně se zobrazuje
duplikát černá (z KARUZIV) + červená (z REGISTR). Viz poznámky v
`MedicusWithClaudeSelects/SELECTS.md` sekce "Duplikát ošetřujícího lékaře".