This commit is contained in:
2026-04-28 09:59:40 +02:00
parent 90bd0ecdf5
commit f31b271e93
6 changed files with 131 additions and 11 deletions
@@ -0,0 +1,577 @@
"""
Agent pro extrakci a pojmenování naskenovaných PDF lékařských zpráv.
- Claude Vision API — bez OCR, správná čeština s diakritikou
- Ověření pacienta proti Medicus (KAR), fuzzy matching RČ
- Interaktivní schválení / oprava názvu
- Few-shot learning z uložených korekcí
"""
import base64
import gc
import io
import json
import os
import re
import shutil
import subprocess
import sys
import time
from pathlib import Path
# Windows: nastav stdout/stderr na UTF-8
if sys.platform == "win32":
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
import anthropic
from pdf2image import convert_from_path
sys.path.insert(0, str(Path(__file__).parent.parent))
from Knihovny.najdi_dropbox import get_dropbox_root
from Knihovny.najdi_medicus import get_medicus_config
POPPLER_PATH = r"C:/Poppler/Library/bin"
CORRECTIONS_FILE = Path(__file__).parent / "corrections.json"
_DROPBOX = Path(get_dropbox_root())
TO_PROCESS = _DROPBOX / r"Ordinace\Dokumentace_ke_zpracování\Ricoh Fi-8040\KeZpracování"
PROCESSED = _DROPBOX / r"Ordinace\Dokumentace_ke_zpracování\Ricoh Fi-8040\Zpracováno"
DOKUMENTACE = _DROPBOX / r"Ordinace\Dokumentace_zpracovaná"
# ─── Konfigurace ──────────────────────────────────────────────────────────────
def _load_env():
env_path = Path(__file__).parent.parent / ".env"
if env_path.exists():
for line in env_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if "=" in line and not line.startswith("#"):
k, v = line.split("=", 1)
os.environ[k.strip()] = v.strip()
_load_env()
# ─── Korekce (few-shot příklady) ──────────────────────────────────────────────
def load_corrections() -> list[dict]:
if CORRECTIONS_FILE.exists():
return json.loads(CORRECTIONS_FILE.read_text(encoding="utf-8"))
return []
def save_correction(original: str, corrected: str):
corrections = load_corrections()
for c in corrections:
if c["original"] == original and c["corrected"] == corrected:
return
corrections.append({"original": original, "corrected": corrected})
CORRECTIONS_FILE.write_text(
json.dumps(corrections, ensure_ascii=False, indent=2), encoding="utf-8"
)
print(f" ✓ Korekce uložena ({len(corrections)} celkem)")
def build_corrections_prompt() -> str:
corrections = load_corrections()
if not corrections:
return ""
lines = ["Příklady korekcí z minulých běhů (uč se z nich):"]
for c in corrections[-10:]:
lines.append(f' - špatně: "{c["original"]}"')
lines.append(f' správně: "{c["corrected"]}"')
return "\n".join(lines) + "\n\n"
# ─── Kontrola duplicit ───────────────────────────────────────────────────────
def check_duplicates(rc: str, datum: str) -> list[str]:
"""
Hledá v Dokumentace_zpracovaná soubory se stejným RČ a datem.
Vrátí seznam názvů nalezených souborů.
"""
if not DOKUMENTACE.exists():
return []
prefix = f"{rc} {datum}"
return [f.name for f in DOKUMENTACE.iterdir() if f.name.startswith(prefix)]
# ─── Medicus ověření ──────────────────────────────────────────────────────────
def _medicus_connect():
try:
import fdb
cfg = get_medicus_config()
return fdb.connect(
dsn=cfg.dsn,
user="SYSDBA", password="masterkey", charset="win1250"
)
except Exception as e:
print(f" [Medicus] Nepřipojeno: {e}")
return None
def _lookup_by_rc(cur, rc_digits: str) -> dict | None:
"""Přesné vyhledání podle RČ (bez lomítka)."""
cur.execute(
"SELECT IDPAC, PRIJMENI, JMENO, RODCIS FROM KAR "
"WHERE REPLACE(RODCIS, '/', '') = ?",
(rc_digits,)
)
row = cur.fetchone()
if row:
return {"idpac": row[0], "prijmeni": row[1].strip(), "jmeno": row[2].strip(), "rodcis": row[3].strip()}
return None
def _rc_candidates(rc: str) -> list[str]:
"""
Generuje kandidáty RČ pro fuzzy matching:
- vynechání každé cifry (OCR přečetlo znak navíc)
- vložení nuly na každou pozici (OCR přehlédlo nulu v sekvenci 00)
- záměna podobně vypadajících číslic na každé pozici
Vrátí unikátní seznam kandidátů bez původního RČ.
"""
similar = {"0": "8", "8": "0", "1": "7", "7": "1", "5": "6", "6": "5", "3": "8"}
candidates = set()
# Vynechání jedné cifry (OCR přečetlo znak navíc)
for i in range(len(rc)):
candidates.add(rc[:i] + rc[i+1:])
# Vložení nuly na každou pozici (nejčastější chyba: sekvence 00 přečtena jako 0)
for i in range(len(rc) + 1):
candidates.add(rc[:i] + "0" + rc[i:])
# Záměna podobné cifry na každé pozici
for i, ch in enumerate(rc):
if ch in similar:
candidates.add(rc[:i] + similar[ch] + rc[i+1:])
candidates.discard(rc)
candidates = {c for c in candidates if len(c) in (9, 10)}
return sorted(candidates)
def _rc_checksum_ok(rc: str) -> bool:
"""Ověří dělitelnost 11 pro 10místná RČ (platí pro narozené po 1.1.1954)."""
digits = re.sub(r"\D", "", rc)
if len(digits) == 10:
return int(digits) % 11 == 0
return True # 9místná RČ nemají checksum
def verify_patient(rc_raw: str) -> dict:
"""
Ověří pacienta v Medicus.
Vrací:
status: "ok" | "fuzzy" | "not_found" | "offline"
patient: dict nebo None
rc_corrected: opravené RČ (pokud fuzzy) nebo None
"""
rc = re.sub(r"\D", "", rc_raw or "")
if not rc:
return {"status": "not_found", "patient": None, "rc_corrected": None}
con = _medicus_connect()
if con is None:
return {"status": "offline", "patient": None, "rc_corrected": None}
try:
cur = con.cursor()
# 1. Přesná shoda
patient = _lookup_by_rc(cur, rc)
if patient:
return {"status": "ok", "patient": patient, "rc_corrected": None}
# 2. Fuzzy matching — zkus kandidáty, preferuj ty s platným checksumem
candidates = _rc_candidates(rc)
matches = []
for cand in candidates:
p = _lookup_by_rc(cur, cand)
if p:
matches.append((cand, p))
if not matches:
return {"status": "not_found", "patient": None, "rc_corrected": None}
# Seřaď: platný checksum na prvním místě
matches.sort(key=lambda x: (0 if _rc_checksum_ok(x[0]) else 1))
best_rc, best_patient = matches[0]
return {"status": "fuzzy", "patient": best_patient, "rc_corrected": best_rc, "all_matches": matches}
finally:
con.close()
# ─── PDF → obrázek ────────────────────────────────────────────────────────────
def pdf_to_images(pdf_path: str) -> list:
return convert_from_path(pdf_path, poppler_path=POPPLER_PATH, dpi=300)
def image_to_base64(image) -> str:
buf = io.BytesIO()
image.save(buf, format="JPEG", quality=95)
return base64.standard_b64encode(buf.getvalue()).decode("utf-8")
# ─── Extrakce Claude Vision ───────────────────────────────────────────────────
def extract_patient_info(pdf_path: str) -> dict:
pdf_path = Path(pdf_path)
if not pdf_path.exists():
raise FileNotFoundError(f"Soubor nenalezen: {pdf_path}")
print(f"\nNačítám: {pdf_path.name}")
suffix = pdf_path.suffix.lower()
if suffix in (".jpg", ".jpeg", ".png"):
from PIL import Image
img = Image.open(pdf_path)
image_b64 = image_to_base64(img)
img.close()
else:
images = pdf_to_images(str(pdf_path))
image_b64 = image_to_base64(images[0])
del images
gc.collect()
prompt = (
build_corrections_prompt() +
"Toto je naskenovaná lékařská zpráva v češtině. "
"Vrať JSON s těmito poli:\n"
"- \"jmeno\": celé jméno pacienta (příjmení + jméno + případný titul)\n"
"- \"rodne_cislo\": rodné číslo pacienta BEZ lomítka (pouze číslice)\n"
"- \"datum_zpravy\": datum zprávy ve formátu YYYY-MM-DD\n"
"- \"typ_dokumentu\": typ dokumentu — "
"\"LZ {oddělení}\" = ambulantní/lékařská zpráva (např. \"LZ chirurgie\", \"LZ kardiologie\", \"LZ plicní\", \"LZ ORL\"); "
"\"PZ {oddělení}\" = propouštěcí zpráva z hospitalizace (např. \"PZ interna\", \"PZ neurologie\"). "
"Jiné typy: \"Laboratoř\", \"CT břicha\", \"MRI páteře\", \"kolonoskopie\", "
"\"operační protokol oční\", \"poukaz FT\", \"diagnostická mamografie\" atd.\n"
"- \"poznamka\": krátká klinická poznámka česky, max 80 znaků. "
"DŮLEŽITÉ: pokud zpráva obsahuje sekci \"Závěr:\" nebo \"Závěr vyšetření:\", "
"použij VÝHRADNĚ obsah této sekce — je nejdůležitější. "
"Teprve pokud závěr chybí, shrň obsah z celé zprávy.\n"
"- \"nazev_souboru\": název souboru ve formátu "
"\"{rodne_cislo} {datum_zpravy} {Příjmení}, {Jméno} [{typ_dokumentu}] [{poznamka}].pdf\" "
"(jméno bez titulu, RČ bez lomítka)\n"
"- \"rotace\": o kolik stupňů CCW je třeba otočit obrázek aby byl text čitelně na výšku nebo šířku "
"(hodnoty: 0, 90, 180, 270). Pokud je text již správně orientovaný, vrať 0.\n\n"
"Pokud pole nenajdeš, použij null. Nepiš nic jiného než JSON."
)
print(" Volám Claude Vision API...")
client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=400,
messages=[{
"role": "user",
"content": [
{"type": "image", "source": {"type": "base64", "media_type": "image/jpeg", "data": image_b64}},
{"type": "text", "text": prompt},
],
}],
)
usage = response.usage
cost_input = usage.input_tokens * 3 / 1_000_000
cost_output = usage.output_tokens * 15 / 1_000_000
print(f" Tokeny: {usage.input_tokens} in + {usage.output_tokens} out = ${cost_input + cost_output:.4f}")
raw = response.content[0].text.strip()
if raw.startswith("```"):
raw = raw.split("```")[1]
if raw.startswith("json"):
raw = raw[4:]
try:
return json.loads(raw.strip())
except json.JSONDecodeError:
print(f" VAROVÁNÍ: nelze parsovat JSON: {raw!r}")
return {"nazev_souboru": None, "raw": raw}
# ─── Interaktivní schválení ───────────────────────────────────────────────────
def sanitize_filename(name: str) -> str:
return re.sub(r'[<>:"/\\|?*]', '', name)
def _open_preview(root, pdf_path: Path):
"""Otevře náhledové okno PDF/obrázku jako Toplevel. Pracuje s temp kopií — žádné zamykání originálu."""
import tkinter as tk
import tempfile
import shutil as _shutil
try:
from PIL import Image, ImageTk
import fitz
except ImportError:
return
# Temp kopie — prohlížeč nikdy nesahá na originál
tmp = Path(tempfile.mktemp(suffix=pdf_path.suffix))
_shutil.copy2(pdf_path, tmp)
suffix = pdf_path.suffix.lower()
if suffix in (".jpg", ".jpeg", ".png"):
pil_pages = [Image.open(tmp)]
doc = None
else:
try:
doc = fitz.open(str(tmp))
except Exception:
tmp.unlink(missing_ok=True)
return
pil_pages = []
def render(n) -> Image.Image:
if doc is not None:
page = doc[n]
zoom = min(700 / page.rect.width, (sh - 150) / page.rect.height)
pix = page.get_pixmap(matrix=fitz.Matrix(zoom, zoom))
return Image.frombytes("RGB", (pix.width, pix.height), pix.samples)
else:
img = pil_pages[0].copy()
img.thumbnail((700, sh - 150), Image.LANCZOS)
return img
def on_close():
try:
if doc:
doc.close()
except Exception:
pass
tmp.unlink(missing_ok=True)
win.destroy()
page_count = len(doc) if doc else 1
sh = root.winfo_screenheight()
current = [0]
photo_ref = [None]
win = tk.Toplevel(root)
win.title(pdf_path.name)
win.attributes("-topmost", True)
win.resizable(False, False)
win.protocol("WM_DELETE_WINDOW", on_close)
lbl_img = tk.Label(win)
lbl_img.pack()
frame_nav = tk.Frame(win)
frame_nav.pack(pady=4)
lbl_page = tk.Label(frame_nav, font=("Segoe UI", 9))
lbl_page.pack(side="left", padx=10)
def show(n):
current[0] = n
img = render(n)
photo_ref[0] = ImageTk.PhotoImage(img)
lbl_img.config(image=photo_ref[0])
lbl_page.config(text=f"Strana {n + 1} / {page_count}")
btn_prev.config(state="normal" if n > 0 else "disabled")
btn_next.config(state="normal" if n < page_count - 1 else "disabled")
btn_prev = tk.Button(frame_nav, text="◄ Předchozí",
command=lambda: show(current[0] - 1))
btn_prev.pack(side="left")
btn_next = tk.Button(frame_nav, text="Další ►",
command=lambda: show(current[0] + 1))
btn_next.pack(side="left")
show(0)
win.update_idletasks()
win.geometry(f"+0+0")
def _rename_dialog(nazev: str, info_lines: list[str]) -> str | None:
"""
Spustí rename_dialog.py jako subprocess — vyhneme se Tkinter konfliktům s PyCharm.
Vrátí finální název (s .pdf) nebo None = přeskočit.
"""
import tempfile
data = {"nazev": nazev, "info_lines": info_lines}
tmp = Path(tempfile.mktemp(suffix=".json"))
tmp.write_text(json.dumps(data, ensure_ascii=False), encoding="utf-8")
dialog_script = Path(__file__).parent / "rename_dialog.py"
try:
proc = subprocess.run(
[sys.executable, str(dialog_script), str(tmp)],
capture_output=True, text=True, encoding="utf-8",
)
output = proc.stdout.strip()
if output:
return json.loads(output).get("value")
return None
finally:
tmp.unlink(missing_ok=True)
def print_verification(verif: dict, rc_from_scan: str):
"""Vypíše výsledek ověření proti Medicus."""
status = verif["status"]
patient = verif.get("patient")
if status == "ok":
print(f" ✓ Medicus: {patient['prijmeni']} {patient['jmeno']} | RČ {patient['rodcis']}")
elif status == "fuzzy":
rc_corr = verif["rc_corrected"]
print(f" ⚠ Medicus: RČ ze skenu '{rc_from_scan}' nenalezeno")
print(f" → Nalezen podobný pacient: {patient['prijmeni']} {patient['jmeno']} | RČ {patient['rodcis']}")
print(f" → Pravděpodobná oprava RČ: {rc_from_scan}{rc_corr} (OCR chyba)")
if len(verif.get("all_matches", [])) > 1:
print(f" → Další shody: {[m[0] for m in verif['all_matches'][1:]]}")
elif status == "not_found":
print(f" ✗ Medicus: RČ '{rc_from_scan}' nenalezeno ani při fuzzy hledání")
elif status == "offline":
print(f" — Medicus: nedostupný (offline), ověření přeskočeno")
def interactive_rename(pdf_path: Path, info: dict, verif: dict) -> bool:
"""
Otevře tkinter dialog pro schválení / opravu názvu.
Schválený soubor přesune do Processed/ a smaže z ToProcess/.
"""
rc = re.sub(r"\D", "", verif["patient"]["rodcis"] if verif.get("patient") else info.get("rodne_cislo") or "")
datum = info.get("datum_zpravy") or ""
duplicity = check_duplicates(rc, datum)
# Oprava RČ při fuzzy matchi
nazev = info.get("nazev_souboru")
if verif["status"] == "fuzzy" and verif.get("rc_corrected") and nazev:
rc_scan = re.sub(r"\D", "", info.get("rodne_cislo") or "")
nazev = nazev.replace(rc_scan, verif["rc_corrected"], 1)
print(f" → Název aktualizován s opraveným RČ")
# Sestavení info řádků pro dialog
rc_from_scan = re.sub(r"\D", "", info.get("rodne_cislo") or "")
status = verif["status"]
patient = verif.get("patient")
info_lines = []
if status == "ok":
info_lines.append(f"✓ Medicus: {patient['prijmeni']} {patient['jmeno']} | RČ {patient['rodcis']}")
elif status == "fuzzy":
info_lines.append(f"⚠ RČ ze skenu '{rc_from_scan}' → opraveno na {verif['rc_corrected']}")
info_lines.append(f" Pacient: {patient['prijmeni']} {patient['jmeno']} | RČ {patient['rodcis']}")
elif status == "not_found":
info_lines.append(f"✗ RČ '{rc_from_scan}' nenalezeno v Medicus")
else:
info_lines.append("— Medicus nedostupný (offline)")
if duplicity:
info_lines.append(f"⚠ DUPLICITA: {', '.join(duplicity)}")
print()
print("" * 70)
if nazev:
print(f" Navržený název: {nazev}")
print(" Otevírám dialog...")
odpoved = _rename_dialog(nazev or "", info_lines)
if odpoved is None:
print(" Přeskočeno.")
return False
if not odpoved.endswith(".pdf"):
odpoved += ".pdf"
final_name = sanitize_filename(odpoved)
if nazev and nazev != final_name:
save_correction(nazev, final_name)
if not final_name or final_name == ".pdf":
print(" Název je prázdný, přeskakuji.")
return False
dest = PROCESSED / final_name
if dest.exists():
print(f" VAROVÁNÍ: '{final_name}' již existuje v Processed, přeskakuji.")
return False
if pdf_path.suffix.lower() in (".jpg", ".jpeg", ".png"):
from jpg_to_pdf import image_to_pdf
image_to_pdf(pdf_path, dest, rotate_ccw=info.get("rotace") or 0)
else:
shutil.copy2(pdf_path, dest)
pdf_path.unlink()
print(f" ✓ Uloženo: Processed/{final_name}")
return True
# ─── Hlavní logika ────────────────────────────────────────────────────────────
def _start_preview_process(pdf_path: Path):
"""
Otevře náhled PDF jako samostatný subprocess (žádné tkinter threading problémy).
Pracuje s temp kopií — originál zůstane volný.
Vrátí funkci close() pro ukončení procesu.
"""
import tempfile
import shutil as _shutil
tmp = Path(tempfile.mktemp(suffix=pdf_path.suffix))
_shutil.copy2(pdf_path, tmp)
viewer = Path(__file__).parent / "preview_viewer.py"
proc = subprocess.Popen(
[sys.executable, str(viewer), str(tmp), "--delete-on-close"],
)
def close():
try:
proc.terminate()
proc.wait(timeout=3)
except Exception:
pass
try:
tmp.unlink(missing_ok=True)
except Exception:
pass
return close
def process_file(pdf_path: Path):
close_preview = _start_preview_process(pdf_path)
try:
info = extract_patient_info(str(pdf_path))
rc_from_scan = re.sub(r"\D", "", info.get("rodne_cislo") or "")
print(f" Ověřuji v Medicus (RČ: {rc_from_scan})...")
verif = verify_patient(rc_from_scan)
print_verification(verif, rc_from_scan)
interactive_rename(pdf_path, info, verif)
finally:
close_preview()
def process_folder(folder: Path):
pdf_files = sorted(f for f in folder.iterdir()
if f.suffix.lower() in (".pdf", ".jpg", ".jpeg", ".png"))
if not pdf_files:
print(f"Žádná PDF nenalezena v: {folder}")
return
print(f"Nalezeno {len(pdf_files)} PDF soubor(ů).\n")
for pdf_file in pdf_files:
try:
process_file(pdf_file)
except Exception as e:
print(f" CHYBA: {e}")
print("\nHotovo.")
if __name__ == "__main__":
if len(sys.argv) > 1:
target = Path(sys.argv[1])
else:
target = TO_PROCESS
PROCESSED.mkdir(exist_ok=True)
TO_PROCESS.mkdir(exist_ok=True)
if target.is_file() and target.suffix.lower() in (".pdf", ".jpg", ".jpeg", ".png"):
process_file(target)
elif target.is_dir():
process_folder(target)
else:
print("Použití: python extract_patient_info.py [soubor.pdf nebo složka]")
sys.exit(1)