Files
medevio/07VytvorPozadavekChripka03Checkexisting.py
2025-10-18 07:41:44 +02:00

323 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from pathlib import Path
import re
import time
import unicodedata
import pymysql
from pymysql.cursors import DictCursor
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout, Page
# ================== CONFIG ==================
STATE_FILE = Path("medevio_storage.json")
MYSQL_CFG = dict(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
cursorclass=DictCursor,
autocommit=False,
)
# Column that goes into the Medevio URL.
# If your Medevio patient UUID is stored in a different column, change this:
UUID_COLUMN = "rid" # Medevio UUID in your table
FLAG_COLUMN = "pozchripkavytvoren" # set to 1 on success
FLAG_TS_COL = "pozchripka_vytv_at" # timestamp when created
# Optional: set your personal RID here to test on a single card; set to None for batch mode
TEST_RID = None # e.g. "fcb2414b-067b-4ca2-91b2-6c36a86d4cbb"
# TEST_RID = "fcb2414b-067b-4ca2-91b2-6c36a86d4cbb"
MESSAGE_TEXT = (
"Dobrý den, vakcína proti chřipce je k dispozici. "
"Vy nemáte účet v Medeviu a tedy si nemůžete vybrat termín, takže to zkusíme udělat manuálně. "
"Hlavní očkovací dny jsou úterý 07/10 a úterý 14/10, kdy očkujeme i COVID, kdo chce. Chřipku samostatně možno i kdykoliv jindy. Tak dejte vědět, jaký termín se Vám hodí a já si to poznamenám."
)
PATIENT_URL_TMPL = "https://my.medevio.cz/mudr-buzalkova/klinika/pacienti?pacient={uuid}"
BATCH_LIMIT = 50 # change if you want to limit how many to process in one run
PAUSE_BETWEEN = 1.0 # seconds between patients (UI courtesy)
# ===========================================
RC_DIGITS = re.compile(r"\D+")
def mark_flag_skipped(conn, rid: str):
"""
Pokud už požadavek na chřipku existuje:
- nastaví pozchripkavytvoren = 1
- zapíše aktuální čas do pozchripka_vytv_at
"""
with conn.cursor() as cur:
cur.execute(
f"UPDATE patients_extracted "
f"SET {FLAG_COLUMN}=1, {FLAG_TS_COL}=NOW() "
f"WHERE rid=%s",
(rid,)
)
conn.commit()
# ---------- DB helpers ----------
def ensure_flag_columns(conn):
"""Create required columns if missing (portable)."""
needed = {
FLAG_COLUMN: "TINYINT(1) NULL",
FLAG_TS_COL: "DATETIME NULL",
}
with conn.cursor() as cur:
for col, coldef in needed.items():
cur.execute("""
SELECT COUNT(*) AS cnt
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = 'patients_extracted'
AND COLUMN_NAME = %s
""", (col,))
if cur.fetchone()["cnt"] == 0:
cur.execute(f"ALTER TABLE `patients_extracted` ADD COLUMN `{col}` {coldef}")
conn.commit()
def fetch_batch(conn):
"""
Batch mode (TEST_RID is None):
- flu_reply = 'ano'
- mamedevioucet true-ish
- rc starts with '7' (after removing slash/spaces) <-- keep/adjust as you need
- pozchripkavytvoren is NULL
- uuid column present
Test mode (TEST_RID set): returns only that rid.
"""
if TEST_RID:
sql = f"""
SELECT
rid, jmeno, prijmeni, rc,
`{UUID_COLUMN}` AS uuid
FROM patients_extracted
WHERE rid = %s
LIMIT 1
"""
with conn.cursor() as cur:
cur.execute(sql, (TEST_RID,))
return cur.fetchall()
sql = f"""
SELECT
rid, jmeno, prijmeni, rc,
`{UUID_COLUMN}` AS uuid
FROM patients_extracted
WHERE flu_reply = 'ano'
AND mamedevioucet = 0
AND {FLAG_COLUMN} IS NULL
AND `{UUID_COLUMN}` IS NOT NULL
AND `{UUID_COLUMN}` <> ''
ORDER BY prijmeni, jmeno
LIMIT %s
"""
with conn.cursor() as cur:
cur.execute(sql, (BATCH_LIMIT,))
return cur.fetchall()
def mark_flag_success(conn, rid: str):
with conn.cursor() as cur:
cur.execute(
f"UPDATE patients_extracted "
f"SET {FLAG_COLUMN}=1, {FLAG_TS_COL}=NOW() "
f"WHERE rid=%s",
(rid,)
)
conn.commit()
# ---------- UI helpers ----------
def _strip_diacritics(s: str) -> str:
"""Return s without diacritics (e.g., 'chřipka' -> 'chripka')."""
return ''.join(c for c in unicodedata.normalize('NFKD', s) if not unicodedata.combining(c))
def has_existing_chripka_request(page: Page, timeout_ms: int = 15000) -> bool:
"""
Detect an existing 'Očkování - Chřipka' request on the patient card.
- Checks both card view (data-testid='patient-request-item' h4) and
table/row view (data-testid='patient-request-row' strong).
- Case/diacritics-insensitive.
"""
try:
page.get_by_text("Historie požadavků").wait_for(timeout=timeout_ms)
except PWTimeout:
# Some layouts may render without this exact header proceed anyway.
pass
# Let the list render
page.wait_for_timeout(600)
titles = []
try:
titles += page.locator("[data-testid='patient-request-item'] h4").all_text_contents()
except Exception:
pass
try:
titles += page.locator("[data-testid='patient-request-row'] strong").all_text_contents()
except Exception:
pass
# Fallback if no headings were captured: read whole items/rows
if not titles:
try:
titles += page.locator("[data-testid='patient-request-item']").all_text_contents()
except Exception:
pass
try:
titles += page.locator("[data-testid='patient-request-row']").all_text_contents()
except Exception:
pass
if not titles:
return False
pat = re.compile(r"\bchripka\b", re.IGNORECASE)
for t in titles:
if pat.search(_strip_diacritics(t)):
return True
return False
def create_flu_request_for_uuid(uuid: str) -> str:
"""
Automate Medevio UI for one patient:
- Open patient card
- If a Chřipka request already exists, return 'skipped'
- Else create 'Očkování - Chřipka' and send MESSAGE_TEXT -> return 'created'
- On failure to send, return 'failed'
"""
with sync_playwright() as p:
browser = p.chromium.launch(headless=False, slow_mo=200)
context = browser.new_context(storage_state=str(STATE_FILE))
ptcard = context.new_page()
url = PATIENT_URL_TMPL.format(uuid=uuid)
ptcard.goto(url, wait_until="networkidle")
# Ensure the card loaded (best-effort)
try:
ptcard.get_by_text("Historie požadavků").wait_for(timeout=7_000)
except PWTimeout:
pass
# ----- pre-check for existing Chřipka request -----
if has_existing_chripka_request(ptcard):
browser.close()
return "skipped"
# ----- Create new request -----
ptcard.get_by_role("button", name="Nový požadavek").click()
ptcard.wait_for_timeout(300)
ptcard.keyboard.type("očkování - chřipka")
ptcard.locator("[role='option']", has_text="Očkování - Chřipka").first.click()
ptcard.get_by_role("button", name="Vytvořit požadavek").click()
time.sleep(2)
# Wait until back on card and the list is visible again
try:
ptcard.get_by_text("Historie požadavků").wait_for(timeout=10_000)
except PWTimeout:
pass
time.sleep(1.0)
# Open the new request (prefer the tile that mentions Chřipka)
try:
ptcard.locator("div[data-testid='patient-request-item']").first.wait_for(timeout=10_000)
chripka_card = ptcard.locator("div[data-testid='patient-request-item']").filter(
has=ptcard.locator("h4", has_text=re.compile(r"(?i)ch[řr]ipka"))
).first
if chripka_card.count() == 0:
ptcard.locator("div[data-testid='patient-request-item']").first.click(timeout=5_000)
else:
chripka_card.click(timeout=5_000)
except Exception:
# fallback: try the first request item
try:
ptcard.locator("div[data-testid='patient-request-item']").first.click(timeout=5_000)
except Exception:
browser.close()
return "failed"
# ----- Send the message -----
try:
ptcard.get_by_placeholder("Napište odpověď").wait_for(timeout=10_000)
ptcard.get_by_placeholder("Napište odpověď").fill(MESSAGE_TEXT)
time.sleep(1.2)
for sel in [
"button:has-text('Odeslat')",
"button:has-text('Odeslat zprávu')",
"button:has-text('Odeslat SMS')",
"button:has-text('Odeslat do aplikace')",
]:
try:
ptcard.click(sel, timeout=4000)
browser.close()
return "created"
except Exception:
continue
except Exception:
pass
browser.close()
return "failed"
# ---------- main ----------
def main():
conn = pymysql.connect(**MYSQL_CFG)
try:
ensure_flag_columns(conn)
rows = fetch_batch(conn)
if not rows:
print("Nenalezen žádný pacient pro zpracování.")
return
print(f"Zpracujeme {len(rows)} pacientů…")
processed = ok = fail = skipped = 0
for r in rows:
processed += 1
rid = r["rid"]
uuid = r["uuid"]
name = f"{r.get('prijmeni','')}, {r.get('jmeno','')}"
rc = r.get("rc", "")
print(f"[{processed:>3}] {name} | RC {rc} | UUID {uuid}")
try:
result = create_flu_request_for_uuid(uuid)
if result == "created":
mark_flag_success(conn, rid)
ok += 1
print(" ✓ vytvořeno + odesláno, DB flag nastaven")
elif result == "skipped":
mark_flag_skipped(conn, rid)
skipped += 1
print(" ↷ již existuje požadavek na chřipku přeskočeno")
else:
fail += 1
print(" ✗ nepodařilo se odeslat zprávu (tlačítko 'Odeslat' nenalezeno?)")
except Exception as e:
fail += 1
conn.rollback()
print(f" ✗ chyba: {type(e).__name__}: {e}")
time.sleep(PAUSE_BETWEEN)
print(f"Hotovo. processed={processed}, ok={ok}, skipped={skipped}, fail={fail}")
finally:
conn.close()
if __name__ == "__main__":
main()