Files
medevio/60 ScansProcessing/extract_patient_info_novy.py
T
2026-04-27 11:00:40 +02:00

450 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Zpracování naskenovaných PDF — nová verze.
1. Preview originálu + Claude Vision API
2. Rename dialog
3. 5 variant komprese → uživatel vybere
4. Uložit do Processed, smazat originál
"""
import base64
import gc
import io
import json
import os
import re
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
if sys.platform == "win32":
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
import anthropic
from pdf2image import convert_from_path
sys.path.insert(0, str(Path(__file__).parent.parent))
from Knihovny.najdi_dropbox import get_dropbox_root
from Knihovny.najdi_medicus import get_medicus_config
def _load_env():
env_path = Path(__file__).parent.parent / ".env"
if env_path.exists():
for line in env_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if "=" in line and not line.startswith("#"):
k, v = line.split("=", 1)
os.environ[k.strip()] = v.strip()
_load_env()
POPPLER_PATH = r"C:/Poppler/Library/bin"
_DROPBOX = Path(get_dropbox_root())
TO_PROCESS = _DROPBOX / r"Ordinace\Dokumentace_ke_zpracování\Ricoh Fi-8040\KeZpracování"
PROCESSED = _DROPBOX / r"Ordinace\Dokumentace_ke_zpracování\Ricoh Fi-8040\Zpracováno"
CORRECTIONS_FILE = Path(__file__).parent / "corrections.json"
DOKUMENTACE = _DROPBOX / r"Ordinace\Dokumentace_zpracovaná"
import threading
_dokumentace_index: set[str] = set()
_dokumentace_ready = threading.Event()
def _load_dokumentace_index_bg():
if DOKUMENTACE.exists():
names = {f.name for f in DOKUMENTACE.iterdir() if f.is_file()}
else:
names = set()
global _dokumentace_index
_dokumentace_index = names
_dokumentace_ready.set()
print(f" Index dokumentace: {len(names)} souborů načteno.")
def start_dokumentace_index():
t = threading.Thread(target=_load_dokumentace_index_bg, daemon=True)
t.start()
VIEWER = Path(__file__).parent / "preview_viewer.py"
RENAME_DIALOG = Path(__file__).parent / "rename_dialog.py"
VARIANT_PICKER = Path(__file__).parent / "variant_picker.py"
# 5 kompresních variant
COMPRESS_VARIANTS = [
("300 DPI / q90", 300, 90),
("200 DPI / q85", 200, 85),
("150 DPI / q80", 150, 80),
("120 DPI / q75", 120, 75),
( "96 DPI / q70", 96, 70),
]
# ─── Komprese jedné varianty ──────────────────────────────────────────────────
def compress_to_temp(pdf_path: Path, dpi: int, quality: int) -> Path:
import fitz
src = fitz.open(str(pdf_path))
mat = fitz.Matrix(dpi / 72.0, dpi / 72.0)
out = fitz.open()
for page in src:
pix = page.get_pixmap(matrix=mat, colorspace=fitz.csRGB)
img_bytes = pix.tobytes("jpeg", jpg_quality=quality)
img_doc = fitz.open("pdf", fitz.open("jpeg", img_bytes).convert_to_pdf())
rect = page.rect
np = out.new_page(width=rect.width, height=rect.height)
np.show_pdf_page(np.rect, img_doc, 0)
src.close()
tmp = Path(tempfile.mktemp(suffix=".pdf"))
out.save(tmp, deflate=True, garbage=4)
out.close()
return tmp
# ─── Medicus ověření ─────────────────────────────────────────────────────────
def _medicus_connect():
try:
import fdb
cfg = get_medicus_config()
return fdb.connect(dsn=cfg.dsn, user="SYSDBA", password="masterkey", charset="win1250")
except Exception as e:
print(f" [Medicus] Nepřipojeno: {e}")
return None
def _lookup_by_rc(cur, rc_digits: str) -> dict | None:
cur.execute(
"SELECT IDPAC, PRIJMENI, JMENO, RODCIS FROM KAR "
"WHERE REPLACE(RODCIS, '/', '') = ?", (rc_digits,)
)
row = cur.fetchone()
if row:
return {"idpac": row[0], "prijmeni": row[1].strip(), "jmeno": row[2].strip(), "rodcis": row[3].strip()}
return None
def _rc_candidates(rc: str) -> list[str]:
similar = {"0": "8", "8": "0", "1": "7", "7": "1", "5": "6", "6": "5", "3": "8"}
candidates = set()
for i in range(len(rc)):
candidates.add(rc[:i] + rc[i+1:])
for i in range(len(rc) + 1):
candidates.add(rc[:i] + "0" + rc[i:])
for i, ch in enumerate(rc):
if ch in similar:
candidates.add(rc[:i] + similar[ch] + rc[i+1:])
candidates.discard(rc)
return sorted(c for c in candidates if len(c) in (9, 10))
def _rc_checksum_ok(rc: str) -> bool:
digits = re.sub(r"\D", "", rc)
if len(digits) == 10:
return int(digits) % 11 == 0
return True
def verify_patient(rc_raw: str) -> dict:
rc = re.sub(r"\D", "", rc_raw or "")
if not rc:
return {"status": "not_found", "patient": None, "rc_corrected": None}
con = _medicus_connect()
if con is None:
return {"status": "offline", "patient": None, "rc_corrected": None}
try:
cur = con.cursor()
patient = _lookup_by_rc(cur, rc)
if patient:
return {"status": "ok", "patient": patient, "rc_corrected": None}
candidates = _rc_candidates(rc)
matches = [(c, _lookup_by_rc(cur, c)) for c in candidates]
matches = [(c, p) for c, p in matches if p]
if not matches:
return {"status": "not_found", "patient": None, "rc_corrected": None}
matches.sort(key=lambda x: (0 if _rc_checksum_ok(x[0]) else 1))
best_rc, best_patient = matches[0]
return {"status": "fuzzy", "patient": best_patient, "rc_corrected": best_rc, "all_matches": matches}
finally:
con.close()
def check_duplicates(rc: str, datum: str) -> list[str]:
if not rc or not datum:
return []
# Počkej max 15s na dokončení indexu (typicky hotovo za dobu volání Claude)
_dokumentace_ready.wait(timeout=15)
prefix = f"{rc} {datum}"
return [name for name in _dokumentace_index if name.startswith(prefix)]
# ─── Korekce (few-shot příklady) ─────────────────────────────────────────────
def load_corrections() -> list[dict]:
if CORRECTIONS_FILE.exists():
return json.loads(CORRECTIONS_FILE.read_text(encoding="utf-8"))
return []
def save_correction(original: str, corrected: str):
corrections = load_corrections()
for c in corrections:
if c["original"] == original and c["corrected"] == corrected:
return
corrections.append({"original": original, "corrected": corrected})
CORRECTIONS_FILE.write_text(
json.dumps(corrections, ensure_ascii=False, indent=2), encoding="utf-8"
)
print(f" ✓ Korekce uložena ({len(corrections)} celkem)")
def build_corrections_prompt() -> str:
corrections = load_corrections()
if not corrections:
return ""
lines = ["Příklady korekcí z minulých běhů (uč se z nich):"]
for c in corrections[-10:]:
lines.append(f' - špatně: "{c["original"]}"')
lines.append(f' správně: "{c["corrected"]}"')
return "\n".join(lines) + "\n\n"
# ─── Claude Vision API ────────────────────────────────────────────────────────
def extract_info(pdf_path: Path) -> dict:
print(" Převádím na obrázek...")
suffix = pdf_path.suffix.lower()
if suffix in (".jpg", ".jpeg", ".png"):
from PIL import Image
img = Image.open(pdf_path)
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=95)
img.close()
else:
images = convert_from_path(str(pdf_path), poppler_path=POPPLER_PATH, dpi=300)
buf = io.BytesIO()
images[0].save(buf, format="JPEG", quality=95)
del images
gc.collect()
image_b64 = base64.standard_b64encode(buf.getvalue()).decode("utf-8")
prompt = (
build_corrections_prompt() +
"Toto je naskenovaná lékařská zpráva v češtině. "
"Vrať JSON s těmito poli:\n"
"- \"jmeno\": celé jméno pacienta (příjmení + jméno + případný titul)\n"
"- \"rodne_cislo\": rodné číslo pacienta BEZ lomítka (pouze číslice)\n"
"- \"datum_zpravy\": datum zprávy ve formátu YYYY-MM-DD\n"
"- \"typ_dokumentu\": typ dokumentu — "
"\"LZ {oddělení}\" = ambulantní/lékařská zpráva (např. \"LZ chirurgie\", \"LZ kardiologie\", \"LZ plicní\", \"LZ ORL\"); "
"\"PZ {oddělení}\" = propouštěcí zpráva z hospitalizace (např. \"PZ interna\", \"PZ neurologie\"). "
"Jiné typy: \"Laboratoř\", \"CT břicha\", \"MRI páteře\", \"kolonoskopie\", "
"\"operační protokol oční\", \"poukaz FT\", \"diagnostická mamografie\" atd.\n"
"- \"poznamka\": krátká klinická poznámka česky, max 80 znaků. "
"DŮLEŽITÉ: pokud zpráva obsahuje sekci \"Závěr:\" nebo \"Závěr vyšetření:\", "
"použij VÝHRADNĚ obsah této sekce — je nejdůležitější. "
"Teprve pokud závěr chybí, shrň obsah z celé zprávy. "
"U laboratorních výsledků uváděj POUZE hodnoty mimo normu (patologické nálezy) — hodnoty v normě vynech. "
"Osmolalitu nikdy nezmiňuj ani jako patologický nález. "
"Pokud výsledky obsahují glomerulární filtraci (eGFR nebo C_CKD-EPI), přidej její klasifikaci velkými písmeny podle CKD-EPI: "
"eGFR ≥ 90 → CHRI G1, 6089 → CHRI G2, 4559 → CHRI G3a, 3044 → CHRI G3b, 1529 → CHRI G4, < 15 → CHRI G5.\n"
"- \"nazev_souboru\": název souboru ve formátu "
"\"{rodne_cislo} {datum_zpravy} {Příjmení}, {Jméno} [{typ_dokumentu}] [{poznamka}].pdf\" "
"(jméno bez titulu, RČ bez lomítka)\n"
"- \"rotace\": o kolik stupňů CCW je třeba otočit obrázek aby byl text čitelně na výšku nebo šířku "
"(hodnoty: 0, 90, 180, 270). Pokud je text již správně orientovaný, vrať 0.\n\n"
"Pokud pole nenajdeš, použij null. Nepiš nic jiného než JSON."
)
print(" Volám Claude Vision API...")
client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=400,
messages=[{"role": "user", "content": [
{"type": "image", "source": {"type": "base64", "media_type": "image/jpeg", "data": image_b64}},
{"type": "text", "text": prompt},
]}],
)
usage = response.usage
print(f" Tokeny: {usage.input_tokens} in + {usage.output_tokens} out = ${usage.input_tokens*3/1e6 + usage.output_tokens*15/1e6:.4f}")
raw = response.content[0].text.strip()
if raw.startswith("```"):
raw = raw.split("```")[1]
if raw.startswith("json"):
raw = raw[4:]
try:
return json.loads(raw.strip())
except json.JSONDecodeError:
print(f" VAROVÁNÍ: nelze parsovat JSON: {raw!r}")
return {"nazev_souboru": None, "raw": raw}
# ─── Subprocess helpers ───────────────────────────────────────────────────────
def open_preview(pdf_path: Path) -> tuple[subprocess.Popen, Path]:
geom_file = Path(tempfile.mktemp(suffix=".json"))
proc = subprocess.Popen([sys.executable, str(VIEWER), str(pdf_path), f"--write-geometry={geom_file}"])
return proc, geom_file
def read_preview_bottom(geom_file: Path, timeout: float = 5.0) -> int:
import time
deadline = time.time() + timeout
while time.time() < deadline:
if geom_file.exists():
geom = json.loads(geom_file.read_text(encoding="utf-8"))
geom_file.unlink(missing_ok=True)
return geom["y"] + geom["h"] + 30 # +30 pro title bar
time.sleep(0.1)
geom_file.unlink(missing_ok=True)
return None
def run_rename_dialog(nazev: str, info_lines: list, below_y: int = None) -> str | None:
tmp = Path(tempfile.mktemp(suffix=".json"))
tmp.write_text(json.dumps({"nazev": nazev, "info_lines": info_lines}, ensure_ascii=False), encoding="utf-8")
args = [sys.executable, str(RENAME_DIALOG), str(tmp)]
if below_y is not None:
args.append(f"--below-y={below_y}")
proc = subprocess.run(args, capture_output=True, text=True, encoding="utf-8")
tmp.unlink(missing_ok=True)
out = proc.stdout.strip()
return json.loads(out).get("value") if out else None
def run_variant_picker(variants_data: list) -> str | None:
tmp = Path(tempfile.mktemp(suffix=".json"))
tmp.write_text(json.dumps(variants_data, ensure_ascii=False), encoding="utf-8")
proc = subprocess.run(
[sys.executable, str(VARIANT_PICKER), str(tmp)],
capture_output=True, text=True, encoding="utf-8",
)
tmp.unlink(missing_ok=True)
out = proc.stdout.strip()
return json.loads(out).get("chosen") if out else None
# ─── Hlavní flow ──────────────────────────────────────────────────────────────
def process_file(pdf_path: Path):
print(f"\nSoubor: {pdf_path.name}")
# Spusť načítání indexu dokumentace na pozadí — hotovo za dobu volání Claude
start_dokumentace_index()
# 1. Otevři preview originálu
preview, geom_file = open_preview(pdf_path)
below_y = read_preview_bottom(geom_file)
# 2. Claude Vision API
info = extract_info(pdf_path)
nazev = info.get("nazev_souboru") or pdf_path.name
# 3. Medicus ověření + fuzzy matching RČ
rc_from_scan = re.sub(r"\D", "", info.get("rodne_cislo") or "")
print(f" Ověřuji v Medicus (RČ: {rc_from_scan})...")
verif = verify_patient(rc_from_scan)
# Oprava RČ při fuzzy matchi
if verif["status"] == "fuzzy" and verif.get("rc_corrected") and nazev:
nazev = nazev.replace(rc_from_scan, verif["rc_corrected"], 1)
print(f" → RČ opraveno: {rc_from_scan}{verif['rc_corrected']}")
# Info řádky pro dialog
status = verif["status"]
patient = verif.get("patient")
info_lines = []
if status == "ok":
info_lines.append(f"✓ Medicus: {patient['prijmeni']} {patient['jmeno']} | RČ {patient['rodcis']}")
elif status == "fuzzy":
info_lines.append(f"⚠ RČ ze skenu '{rc_from_scan}' → opraveno na {verif['rc_corrected']}")
info_lines.append(f" Pacient: {patient['prijmeni']} {patient['jmeno']} | RČ {patient['rodcis']}")
elif status == "not_found":
info_lines.append(f"✗ RČ '{rc_from_scan}' nenalezeno v Medicus")
else:
info_lines.append("— Medicus nedostupný (offline)")
# Duplicity
rc_final = re.sub(r"\D", "", verif["patient"]["rodcis"] if patient else rc_from_scan)
duplicity = check_duplicates(rc_final, info.get("datum_zpravy") or "")
if duplicity:
info_lines.append(f"⚠ DUPLICITA: {', '.join(duplicity)}")
if not info_lines:
info_lines = ["[Claude nevrátil název — uprav ručně]"]
print(" Otevírám dialog pro schválení názvu...")
final_name = run_rename_dialog(nazev, info_lines, below_y=below_y)
preview.terminate()
if not final_name:
print(" Přeskočeno.")
return
if not final_name.endswith(".pdf"):
final_name += ".pdf"
final_name = re.sub(r'[<>:"/\\|?*]', '', final_name)
if nazev and final_name != nazev:
save_correction(nazev, final_name)
print(f" Schválený název: {final_name}")
# 4. Generuj kompresní varianty (originál + 5 variant)
print(" Generuji kompresní varianty...")
temp_files = []
orig_kb = round(pdf_path.stat().st_size / 1024)
variants_data = [{"path": str(pdf_path), "label": "Originál", "size_kb": orig_kb}]
for label, dpi, quality in COMPRESS_VARIANTS:
tmp = compress_to_temp(pdf_path, dpi, quality)
size_kb = round(tmp.stat().st_size / 1024)
temp_files.append(tmp)
variants_data.append({"path": str(tmp), "label": label, "size_kb": size_kb})
print(f" {label}: {size_kb} kB")
# 5. Vyber variantu
print(" Vyber variantu v okně...")
chosen = run_variant_picker(variants_data)
if not chosen:
print(" Žádná varianta nevybrána, přeskakuji.")
for t in temp_files:
t.unlink(missing_ok=True)
return
# 6. Ulož do Processed
PROCESSED.mkdir(exist_ok=True)
dest = PROCESSED / final_name
if dest.exists():
print(f" VAROVÁNÍ: '{final_name}' již existuje, přeskakuji.")
else:
shutil.copy2(chosen, dest)
pdf_path.unlink()
print(f" ✓ Uloženo: {dest.name}")
for t in temp_files:
t.unlink(missing_ok=True) # originál mezi temp_files není, je bezpečné
def process_folder(folder: Path):
files = sorted(f for f in folder.iterdir() if f.suffix.lower() in (".pdf", ".jpg", ".jpeg", ".png"))
if not files:
print(f"Žádné soubory v: {folder}")
return
print(f"Nalezeno {len(files)} soubor(ů).")
for f in files:
try:
process_file(f)
except Exception as e:
print(f" CHYBA: {e}")
print("\nHotovo.")
if __name__ == "__main__":
PROCESSED.mkdir(exist_ok=True)
TO_PROCESS.mkdir(exist_ok=True)
target = Path(sys.argv[1]) if len(sys.argv) > 1 else TO_PROCESS
if target.is_file():
process_file(target)
elif target.is_dir():
process_folder(target)
else:
print("Použití: python extract_patient_info_novy.py [soubor.pdf nebo složka]")
sys.exit(1)