469 lines
18 KiB
Python
469 lines
18 KiB
Python
"""
|
|
Zpracování naskenovaných PDF — nová verze.
|
|
1. Preview originálu + Claude Vision API
|
|
2. Rename dialog
|
|
3. 5 variant komprese → uživatel vybere
|
|
4. Uložit do Processed, smazat originál
|
|
"""
|
|
import base64
|
|
import gc
|
|
import io
|
|
import json
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
if sys.platform == "win32":
|
|
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
|
|
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
|
|
|
|
import anthropic
|
|
from pdf2image import convert_from_path
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
from Knihovny.najdi_dropbox import get_dropbox_root
|
|
from Knihovny.najdi_medicus import get_medicus_config
|
|
|
|
def _load_env():
|
|
env_path = Path(__file__).parent.parent / ".env"
|
|
if env_path.exists():
|
|
for line in env_path.read_text(encoding="utf-8").splitlines():
|
|
line = line.strip()
|
|
if "=" in line and not line.startswith("#"):
|
|
k, v = line.split("=", 1)
|
|
os.environ[k.strip()] = v.strip()
|
|
|
|
_load_env()
|
|
|
|
POPPLER_PATH = r"C:/Poppler/Library/bin"
|
|
CORRECTIONS = True # True = corrections.json se načítá a ukládá; False = ignorovat
|
|
|
|
_DROPBOX = Path(get_dropbox_root())
|
|
TO_PROCESS = _DROPBOX / r"Ordinace\Dokumentace_ke_zpracování\Ricoh Fi-8040\KeZpracování"
|
|
PROCESSED = _DROPBOX / r"Ordinace\Dokumentace_ke_zpracování\Ricoh Fi-8040\Zpracováno"
|
|
CORRECTIONS_FILE = Path(__file__).parent / "corrections.json"
|
|
NAMING_RULES_FILE = Path(__file__).parent / "naming_rules.md"
|
|
DOKUMENTACE = _DROPBOX / r"Ordinace\Dokumentace_zpracovaná"
|
|
|
|
import threading
|
|
|
|
_dokumentace_index: set[str] = set()
|
|
_dokumentace_ready = threading.Event()
|
|
|
|
def _load_dokumentace_index_bg():
|
|
if DOKUMENTACE.exists():
|
|
names = {f.name for f in DOKUMENTACE.iterdir() if f.is_file()}
|
|
else:
|
|
names = set()
|
|
global _dokumentace_index
|
|
_dokumentace_index = names
|
|
_dokumentace_ready.set()
|
|
print(f" Index dokumentace: {len(names)} souborů načteno.")
|
|
|
|
def start_dokumentace_index():
|
|
t = threading.Thread(target=_load_dokumentace_index_bg, daemon=True)
|
|
t.start()
|
|
|
|
VIEWER = Path(__file__).parent / "preview_viewer.py"
|
|
RENAME_DIALOG = Path(__file__).parent / "rename_dialog.py"
|
|
VARIANT_PICKER = Path(__file__).parent / "variant_picker.py"
|
|
|
|
# 5 kompresních variant
|
|
COMPRESS_VARIANTS = [
|
|
("300 DPI / q90", 300, 90),
|
|
("200 DPI / q85", 200, 85),
|
|
("150 DPI / q80", 150, 80),
|
|
("120 DPI / q75", 120, 75),
|
|
( "96 DPI / q70", 96, 70),
|
|
]
|
|
|
|
|
|
# ─── Komprese jedné varianty ──────────────────────────────────────────────────
|
|
|
|
def compress_to_temp(pdf_path: Path, dpi: int, quality: int) -> Path:
|
|
import fitz
|
|
src = fitz.open(str(pdf_path))
|
|
mat = fitz.Matrix(dpi / 72.0, dpi / 72.0)
|
|
out = fitz.open()
|
|
for page in src:
|
|
pix = page.get_pixmap(matrix=mat, colorspace=fitz.csRGB)
|
|
img_bytes = pix.tobytes("jpeg", jpg_quality=quality)
|
|
img_doc = fitz.open("pdf", fitz.open("jpeg", img_bytes).convert_to_pdf())
|
|
rect = page.rect
|
|
np = out.new_page(width=rect.width, height=rect.height)
|
|
np.show_pdf_page(np.rect, img_doc, 0)
|
|
src.close()
|
|
tmp = Path(tempfile.mktemp(suffix=".pdf"))
|
|
out.save(tmp, deflate=True, garbage=4)
|
|
out.close()
|
|
return tmp
|
|
|
|
|
|
# ─── Medicus ověření ─────────────────────────────────────────────────────────
|
|
|
|
def _medicus_connect():
|
|
try:
|
|
import fdb
|
|
cfg = get_medicus_config()
|
|
return fdb.connect(dsn=cfg.dsn, user="SYSDBA", password="masterkey", charset="win1250")
|
|
except Exception as e:
|
|
print(f" [Medicus] Nepřipojeno: {e}")
|
|
return None
|
|
|
|
def _lookup_by_rc(cur, rc_digits: str) -> dict | None:
|
|
cur.execute(
|
|
"SELECT IDPAC, PRIJMENI, JMENO, RODCIS FROM KAR "
|
|
"WHERE REPLACE(RODCIS, '/', '') = ?", (rc_digits,)
|
|
)
|
|
row = cur.fetchone()
|
|
if row:
|
|
return {"idpac": row[0], "prijmeni": row[1].strip(), "jmeno": row[2].strip(), "rodcis": row[3].strip()}
|
|
return None
|
|
|
|
def _rc_candidates(rc: str) -> list[str]:
|
|
similar = {"0": "8", "8": "0", "1": "7", "7": "1", "5": "6", "6": "5", "3": "8"}
|
|
candidates = set()
|
|
for i in range(len(rc)):
|
|
candidates.add(rc[:i] + rc[i+1:])
|
|
for i in range(len(rc) + 1):
|
|
candidates.add(rc[:i] + "0" + rc[i:])
|
|
for i, ch in enumerate(rc):
|
|
if ch in similar:
|
|
candidates.add(rc[:i] + similar[ch] + rc[i+1:])
|
|
candidates.discard(rc)
|
|
return sorted(c for c in candidates if len(c) in (9, 10))
|
|
|
|
def _rc_checksum_ok(rc: str) -> bool:
|
|
digits = re.sub(r"\D", "", rc)
|
|
if len(digits) == 10:
|
|
return int(digits) % 11 == 0
|
|
return True
|
|
|
|
def verify_patient(rc_raw: str) -> dict:
|
|
rc = re.sub(r"\D", "", rc_raw or "")
|
|
if not rc:
|
|
return {"status": "not_found", "patient": None, "rc_corrected": None}
|
|
con = _medicus_connect()
|
|
if con is None:
|
|
return {"status": "offline", "patient": None, "rc_corrected": None}
|
|
try:
|
|
cur = con.cursor()
|
|
patient = _lookup_by_rc(cur, rc)
|
|
if patient:
|
|
return {"status": "ok", "patient": patient, "rc_corrected": None}
|
|
candidates = _rc_candidates(rc)
|
|
matches = [(c, _lookup_by_rc(cur, c)) for c in candidates]
|
|
matches = [(c, p) for c, p in matches if p]
|
|
if not matches:
|
|
return {"status": "not_found", "patient": None, "rc_corrected": None}
|
|
matches.sort(key=lambda x: (0 if _rc_checksum_ok(x[0]) else 1))
|
|
best_rc, best_patient = matches[0]
|
|
return {"status": "fuzzy", "patient": best_patient, "rc_corrected": best_rc, "all_matches": matches}
|
|
finally:
|
|
con.close()
|
|
|
|
def check_duplicates(rc: str, datum: str) -> list[str]:
|
|
if not rc or not datum:
|
|
return []
|
|
# Počkej max 15s na dokončení indexu (typicky hotovo za dobu volání Claude)
|
|
_dokumentace_ready.wait(timeout=15)
|
|
prefix = f"{rc} {datum}"
|
|
return [name for name in _dokumentace_index if name.startswith(prefix)]
|
|
|
|
|
|
# ─── Korekce (few-shot příklady) ─────────────────────────────────────────────
|
|
|
|
def load_corrections() -> list[dict]:
|
|
if CORRECTIONS_FILE.exists():
|
|
return json.loads(CORRECTIONS_FILE.read_text(encoding="utf-8"))
|
|
return []
|
|
|
|
def save_correction(original: str, corrected: str):
|
|
if not CORRECTIONS:
|
|
return
|
|
corrections = load_corrections()
|
|
for c in corrections:
|
|
if c["original"] == original and c["corrected"] == corrected:
|
|
return
|
|
corrections.append({"original": original, "corrected": corrected})
|
|
CORRECTIONS_FILE.write_text(
|
|
json.dumps(corrections, ensure_ascii=False, indent=2), encoding="utf-8"
|
|
)
|
|
print(f" ✓ Korekce uložena ({len(corrections)} celkem)")
|
|
|
|
def load_naming_rules() -> str:
|
|
if NAMING_RULES_FILE.exists():
|
|
content = NAMING_RULES_FILE.read_text(encoding="utf-8").strip()
|
|
if content:
|
|
return f"Pravidla pro pojmenování souborů (dodržuj vždy):\n{content}\n\n"
|
|
return ""
|
|
|
|
def build_corrections_prompt() -> str:
|
|
if not CORRECTIONS:
|
|
return ""
|
|
corrections = load_corrections()
|
|
if not corrections:
|
|
return ""
|
|
lines = ["Příklady korekcí z minulých běhů (uč se z nich):"]
|
|
for c in corrections[-10:]:
|
|
lines.append(f' - špatně: "{c["original"]}"')
|
|
lines.append(f' správně: "{c["corrected"]}"')
|
|
return "\n".join(lines) + "\n\n"
|
|
|
|
|
|
# ─── Claude Vision API ────────────────────────────────────────────────────────
|
|
|
|
def extract_info(pdf_path: Path) -> dict:
|
|
print(" Převádím na obrázek...")
|
|
suffix = pdf_path.suffix.lower()
|
|
if suffix in (".jpg", ".jpeg", ".png"):
|
|
from PIL import Image
|
|
img = Image.open(pdf_path)
|
|
buf = io.BytesIO()
|
|
img.save(buf, format="JPEG", quality=95)
|
|
img.close()
|
|
else:
|
|
images = convert_from_path(str(pdf_path), poppler_path=POPPLER_PATH, dpi=300)
|
|
buf = io.BytesIO()
|
|
images[0].save(buf, format="JPEG", quality=95)
|
|
del images
|
|
gc.collect()
|
|
image_b64 = base64.standard_b64encode(buf.getvalue()).decode("utf-8")
|
|
|
|
prompt = (
|
|
load_naming_rules() +
|
|
build_corrections_prompt() +
|
|
"Toto je naskenovaná lékařská zpráva v češtině. "
|
|
"Vrať JSON s těmito poli:\n"
|
|
"- \"jmeno\": celé jméno pacienta (příjmení + jméno + případný titul)\n"
|
|
"- \"rodne_cislo\": rodné číslo pacienta BEZ lomítka (pouze číslice)\n"
|
|
"- \"datum_zpravy\": datum zprávy ve formátu YYYY-MM-DD\n"
|
|
"- \"typ_dokumentu\": typ dokumentu — "
|
|
"\"LZ {oddělení}\" = ambulantní/lékařská zpráva (např. \"LZ chirurgie\", \"LZ kardiologie\", \"LZ plicní\", \"LZ ORL\"); "
|
|
"\"PZ {oddělení}\" = propouštěcí zpráva z hospitalizace (např. \"PZ interna\", \"PZ neurologie\"). "
|
|
"Jiné typy: \"Laboratoř\", \"CT břicha\", \"MRI páteře\", \"kolonoskopie\", "
|
|
"\"operační protokol oční\", \"poukaz FT\", \"diagnostická mamografie\" atd.\n"
|
|
"- \"poznamka\": krátká klinická poznámka česky, max 80 znaků. "
|
|
"DŮLEŽITÉ: pokud zpráva obsahuje sekci \"Závěr:\" nebo \"Závěr vyšetření:\", "
|
|
"použij VÝHRADNĚ obsah této sekce — je nejdůležitější. "
|
|
"Teprve pokud závěr chybí, shrň obsah z celé zprávy.\n"
|
|
"- \"nazev_souboru\": název souboru ve formátu "
|
|
"\"{rodne_cislo} {datum_zpravy} {Příjmení}, {Jméno} [{typ_dokumentu}] [{poznamka}].pdf\" "
|
|
"(jméno bez titulu, RČ bez lomítka)\n"
|
|
"- \"rotace\": o kolik stupňů CCW je třeba otočit obrázek aby byl text čitelně na výšku nebo šířku "
|
|
"(hodnoty: 0, 90, 180, 270). Pokud je text již správně orientovaný, vrať 0.\n\n"
|
|
"Pokud pole nenajdeš, použij null. Nepiš nic jiného než JSON."
|
|
)
|
|
|
|
print(" Volám Claude Vision API...")
|
|
try:
|
|
client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
|
|
response = client.messages.create(
|
|
model="claude-sonnet-4-6",
|
|
max_tokens=400,
|
|
messages=[{"role": "user", "content": [
|
|
{"type": "image", "source": {"type": "base64", "media_type": "image/jpeg", "data": image_b64}},
|
|
{"type": "text", "text": prompt},
|
|
]}],
|
|
)
|
|
usage = response.usage
|
|
print(f" Tokeny: {usage.input_tokens} in + {usage.output_tokens} out = ${usage.input_tokens*3/1e6 + usage.output_tokens*15/1e6:.4f}")
|
|
|
|
raw = response.content[0].text.strip()
|
|
if raw.startswith("```"):
|
|
raw = raw.split("```")[1]
|
|
if raw.startswith("json"):
|
|
raw = raw[4:]
|
|
try:
|
|
return json.loads(raw.strip())
|
|
except json.JSONDecodeError:
|
|
print(f" VAROVÁNÍ: nelze parsovat JSON: {raw!r}")
|
|
return {"nazev_souboru": None, "raw": raw}
|
|
except Exception as e:
|
|
print(f" VAROVÁNÍ: Claude API selhalo ({e}) — otevírám dialog pro ruční vyplnění.")
|
|
return {"nazev_souboru": None}
|
|
|
|
|
|
# ─── Subprocess helpers ───────────────────────────────────────────────────────
|
|
|
|
def open_preview(pdf_path: Path) -> tuple[subprocess.Popen, Path]:
|
|
geom_file = Path(tempfile.mktemp(suffix=".json"))
|
|
proc = subprocess.Popen([sys.executable, str(VIEWER), str(pdf_path), f"--write-geometry={geom_file}"])
|
|
return proc, geom_file
|
|
|
|
|
|
def read_preview_bottom(geom_file: Path, timeout: float = 5.0) -> int:
|
|
import time
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
if geom_file.exists():
|
|
geom = json.loads(geom_file.read_text(encoding="utf-8"))
|
|
geom_file.unlink(missing_ok=True)
|
|
return geom["y"] + geom["h"] + 30 # +30 pro title bar
|
|
time.sleep(0.1)
|
|
geom_file.unlink(missing_ok=True)
|
|
return None
|
|
|
|
|
|
def run_rename_dialog(nazev: str, info_lines: list, below_y: int = None) -> str | None:
|
|
tmp = Path(tempfile.mktemp(suffix=".json"))
|
|
tmp.write_text(json.dumps({"nazev": nazev, "info_lines": info_lines}, ensure_ascii=False), encoding="utf-8")
|
|
args = [sys.executable, str(RENAME_DIALOG), str(tmp)]
|
|
if below_y is not None:
|
|
args.append(f"--below-y={below_y}")
|
|
env = {**os.environ, "PYTHONIOENCODING": "utf-8", "PYTHONUTF8": "1"}
|
|
proc = subprocess.run(args, capture_output=True, text=True, encoding="utf-8", env=env)
|
|
tmp.unlink(missing_ok=True)
|
|
out = proc.stdout.strip()
|
|
return json.loads(out).get("value") if out else None
|
|
|
|
|
|
def run_variant_picker(variants_data: list) -> str | None:
|
|
tmp = Path(tempfile.mktemp(suffix=".json"))
|
|
tmp.write_text(json.dumps(variants_data, ensure_ascii=False), encoding="utf-8")
|
|
proc = subprocess.run(
|
|
[sys.executable, str(VARIANT_PICKER), str(tmp)],
|
|
capture_output=True, text=True, encoding="utf-8",
|
|
)
|
|
tmp.unlink(missing_ok=True)
|
|
if proc.returncode != 0 or not proc.stdout.strip():
|
|
print(f" [variant_picker] returncode={proc.returncode}")
|
|
if proc.stderr.strip():
|
|
print(f" [variant_picker] CHYBA:\n{proc.stderr.strip()}")
|
|
out = proc.stdout.strip()
|
|
return json.loads(out).get("chosen") if out else None
|
|
|
|
|
|
# ─── Hlavní flow ──────────────────────────────────────────────────────────────
|
|
|
|
def process_file(pdf_path: Path):
|
|
print(f"\nSoubor: {pdf_path.name}")
|
|
|
|
# Spusť načítání indexu dokumentace na pozadí — hotovo za dobu volání Claude
|
|
start_dokumentace_index()
|
|
|
|
# 1. Otevři preview originálu
|
|
preview, geom_file = open_preview(pdf_path)
|
|
below_y = read_preview_bottom(geom_file)
|
|
|
|
# 2. Claude Vision API
|
|
info = extract_info(pdf_path)
|
|
nazev = info.get("nazev_souboru") or pdf_path.name
|
|
|
|
# 3. Medicus ověření + fuzzy matching RČ
|
|
rc_from_scan = re.sub(r"\D", "", info.get("rodne_cislo") or "")
|
|
print(f" Ověřuji v Medicus (RČ: {rc_from_scan})...")
|
|
verif = verify_patient(rc_from_scan)
|
|
|
|
# Oprava RČ při fuzzy matchi
|
|
if verif["status"] == "fuzzy" and verif.get("rc_corrected") and nazev:
|
|
nazev = nazev.replace(rc_from_scan, verif["rc_corrected"], 1)
|
|
print(f" → RČ opraveno: {rc_from_scan} → {verif['rc_corrected']}")
|
|
|
|
# Info řádky pro dialog
|
|
status = verif["status"]
|
|
patient = verif.get("patient")
|
|
info_lines = []
|
|
if status == "ok":
|
|
info_lines.append(f"✓ Medicus: {patient['prijmeni']} {patient['jmeno']} | RČ {patient['rodcis']}")
|
|
elif status == "fuzzy":
|
|
info_lines.append(f"⚠ RČ ze skenu '{rc_from_scan}' → opraveno na {verif['rc_corrected']}")
|
|
info_lines.append(f" Pacient: {patient['prijmeni']} {patient['jmeno']} | RČ {patient['rodcis']}")
|
|
elif status == "not_found":
|
|
info_lines.append(f"✗ RČ '{rc_from_scan}' nenalezeno v Medicus")
|
|
else:
|
|
info_lines.append("— Medicus nedostupný (offline)")
|
|
|
|
# Duplicity
|
|
rc_final = re.sub(r"\D", "", verif["patient"]["rodcis"] if patient else rc_from_scan)
|
|
duplicity = check_duplicates(rc_final, info.get("datum_zpravy") or "")
|
|
if duplicity:
|
|
info_lines.append(f"⚠ DUPLICITA: {', '.join(duplicity)}")
|
|
|
|
if not info_lines:
|
|
info_lines = ["[Claude nevrátil název — uprav ručně]"]
|
|
print(" Otevírám dialog pro schválení názvu...")
|
|
final_name = run_rename_dialog(nazev, info_lines, below_y=below_y)
|
|
|
|
preview.terminate()
|
|
|
|
if not final_name:
|
|
print(" Přeskočeno.")
|
|
return
|
|
|
|
if not final_name.endswith(".pdf"):
|
|
final_name += ".pdf"
|
|
final_name = re.sub(r'[<>:"/\\|?*]', '', final_name)
|
|
|
|
if nazev and final_name != nazev:
|
|
save_correction(nazev, final_name)
|
|
|
|
print(f" Schválený název: {final_name}")
|
|
|
|
# 4. Generuj kompresní varianty (originál + 5 variant)
|
|
print(" Generuji kompresní varianty...")
|
|
temp_files = []
|
|
orig_kb = round(pdf_path.stat().st_size / 1024)
|
|
variants_data = [{"path": str(pdf_path), "label": "Originál", "size_kb": orig_kb}]
|
|
for label, dpi, quality in COMPRESS_VARIANTS:
|
|
tmp = compress_to_temp(pdf_path, dpi, quality)
|
|
size_kb = round(tmp.stat().st_size / 1024)
|
|
temp_files.append(tmp)
|
|
variants_data.append({"path": str(tmp), "label": label, "size_kb": size_kb})
|
|
print(f" {label}: {size_kb} kB")
|
|
|
|
# 5. Vyber variantu
|
|
print(" Vyber variantu v okně...")
|
|
chosen = run_variant_picker(variants_data)
|
|
|
|
if not chosen:
|
|
print(" Žádná varianta nevybrána, přeskakuji.")
|
|
for t in temp_files:
|
|
t.unlink(missing_ok=True)
|
|
return
|
|
|
|
# 6. Ulož do Processed
|
|
PROCESSED.mkdir(exist_ok=True)
|
|
dest = PROCESSED / final_name
|
|
if dest.exists():
|
|
print(f" Přepisuji existující: {dest.name}")
|
|
shutil.copy2(chosen, dest)
|
|
pdf_path.unlink()
|
|
print(f" ✓ Uloženo: {dest.name}")
|
|
|
|
for t in temp_files:
|
|
t.unlink(missing_ok=True) # originál mezi temp_files není, je bezpečné
|
|
|
|
|
|
def process_folder(folder: Path):
|
|
files = sorted(f for f in folder.iterdir() if f.suffix.lower() in (".pdf", ".jpg", ".jpeg", ".png"))
|
|
if not files:
|
|
print(f"Žádné soubory v: {folder}")
|
|
return
|
|
print(f"Nalezeno {len(files)} soubor(ů).")
|
|
for f in files:
|
|
try:
|
|
process_file(f)
|
|
except Exception as e:
|
|
print(f" CHYBA: {e}")
|
|
print("\nHotovo.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
PROCESSED.mkdir(exist_ok=True)
|
|
TO_PROCESS.mkdir(exist_ok=True)
|
|
|
|
target = Path(sys.argv[1]) if len(sys.argv) > 1 else TO_PROCESS
|
|
|
|
if target.is_file():
|
|
process_file(target)
|
|
elif target.is_dir():
|
|
process_folder(target)
|
|
else:
|
|
print("Použití: python extract_patient_info_novy.py [soubor.pdf nebo složka]")
|
|
sys.exit(1)
|