Files
Vladimir Buzalka 2bdac59676 notebookvb
2026-06-14 12:07:35 +02:00

112 lines
3.3 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
recept_pending.py — fronta "čekajících" žádostí o recept, u kterých si agent
NENÍ jistý identifikací pacienta a potřebuje potvrzení člověka přes Telegram.
E-mailový agent sem zapíše záznam (stav 'ceka') a NIC dalšího nedělá.
Resolver (recept_resolver.py) záznamy bere, pošle otázku do Telegramu a podle
odpovědi člověka založí požadavek správnému pacientovi.
Úložiště: JSON soubor _pending_recepty.json vedle tohoto modulu (atomický zápis).
Stavy záznamu: 'ceka''zalozeno' | 'preskoceno'.
"""
import json
import os
import tempfile
import uuid
from datetime import datetime
from pathlib import Path
STORE = Path(__file__).resolve().parent / "_pending_recepty.json"
def _load() -> list:
if not STORE.exists():
return []
try:
return json.loads(STORE.read_text(encoding="utf-8"))
except Exception:
return []
def _save(items: list) -> None:
tmp = tempfile.NamedTemporaryFile(
"w", encoding="utf-8", dir=str(STORE.parent), delete=False, suffix=".tmp"
)
try:
json.dump(items, tmp, ensure_ascii=False, indent=2)
tmp.flush()
os.fsync(tmp.fileno())
tmp.close()
os.replace(tmp.name, STORE)
except Exception:
try:
os.unlink(tmp.name)
except Exception:
pass
raise
def pridej(*, email_message_id: str, email_subject: str = "", sender: str = "",
leky_str: str = "", pozn_str: str = "", skore=None,
duvody=None, kandidati=None) -> dict:
"""Přidá nový čekající dotaz. leky_str/pozn_str se použijí při pozdějším
založení požadavku, kandidáti se ukážou člověku v otázce."""
items = _load()
rec = {
"id": uuid.uuid4().hex,
"vytvoreno": datetime.now().isoformat(timespec="seconds"),
"email_message_id": email_message_id,
"email_subject": email_subject,
"sender": sender,
"leky_str": leky_str,
"pozn_str": pozn_str,
"skore": skore,
"duvody": duvody or [],
"kandidati": kandidati or [],
"otazka_message_id": None, # vyplní resolver po odeslání otázky
"stav": "ceka", # ceka | zalozeno | preskoceno
"vysledek": None,
}
items.append(rec)
_save(items)
return rec
def cekajici() -> list:
return [r for r in _load() if r.get("stav") == "ceka"]
def cekajici_bez_otazky() -> list:
"""Záznamy, na které se ještě neposlala otázka do Telegramu."""
return [r for r in _load()
if r.get("stav") == "ceka" and not r.get("otazka_message_id")]
def je_mail_pending(email_message_id: str) -> bool:
return any(r.get("email_message_id") == email_message_id
and r.get("stav") == "ceka" for r in _load())
def najdi_dle_otazky(otazka_message_id) -> dict | None:
if otazka_message_id is None:
return None
for r in _load():
if r.get("otazka_message_id") == otazka_message_id:
return r
return None
def aktualizuj(rec_id: str, **fields) -> dict | None:
items = _load()
out = None
for r in items:
if r.get("id") == rec_id:
r.update(fields)
out = r
break
if out is not None:
_save(items)
return out