This commit is contained in:
2025-10-18 07:41:44 +02:00
parent 52e8853639
commit 7332ee0734
2 changed files with 328 additions and 7 deletions

View File

@@ -0,0 +1,322 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from pathlib import Path
import re
import time
import unicodedata
import pymysql
from pymysql.cursors import DictCursor
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout, Page
# ================== CONFIG ==================
STATE_FILE = Path("medevio_storage.json")
MYSQL_CFG = dict(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
cursorclass=DictCursor,
autocommit=False,
)
# Column that goes into the Medevio URL.
# If your Medevio patient UUID is stored in a different column, change this:
UUID_COLUMN = "rid" # Medevio UUID in your table
FLAG_COLUMN = "pozchripkavytvoren" # set to 1 on success
FLAG_TS_COL = "pozchripka_vytv_at" # timestamp when created
# Optional: set your personal RID here to test on a single card; set to None for batch mode
TEST_RID = None # e.g. "fcb2414b-067b-4ca2-91b2-6c36a86d4cbb"
# TEST_RID = "fcb2414b-067b-4ca2-91b2-6c36a86d4cbb"
MESSAGE_TEXT = (
"Dobrý den, vakcína proti chřipce je k dispozici. "
"Vy nemáte účet v Medeviu a tedy si nemůžete vybrat termín, takže to zkusíme udělat manuálně. "
"Hlavní očkovací dny jsou úterý 07/10 a úterý 14/10, kdy očkujeme i COVID, kdo chce. Chřipku samostatně možno i kdykoliv jindy. Tak dejte vědět, jaký termín se Vám hodí a já si to poznamenám."
)
PATIENT_URL_TMPL = "https://my.medevio.cz/mudr-buzalkova/klinika/pacienti?pacient={uuid}"
BATCH_LIMIT = 50 # change if you want to limit how many to process in one run
PAUSE_BETWEEN = 1.0 # seconds between patients (UI courtesy)
# ===========================================
RC_DIGITS = re.compile(r"\D+")
def mark_flag_skipped(conn, rid: str):
"""
Pokud už požadavek na chřipku existuje:
- nastaví pozchripkavytvoren = 1
- zapíše aktuální čas do pozchripka_vytv_at
"""
with conn.cursor() as cur:
cur.execute(
f"UPDATE patients_extracted "
f"SET {FLAG_COLUMN}=1, {FLAG_TS_COL}=NOW() "
f"WHERE rid=%s",
(rid,)
)
conn.commit()
# ---------- DB helpers ----------
def ensure_flag_columns(conn):
"""Create required columns if missing (portable)."""
needed = {
FLAG_COLUMN: "TINYINT(1) NULL",
FLAG_TS_COL: "DATETIME NULL",
}
with conn.cursor() as cur:
for col, coldef in needed.items():
cur.execute("""
SELECT COUNT(*) AS cnt
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = 'patients_extracted'
AND COLUMN_NAME = %s
""", (col,))
if cur.fetchone()["cnt"] == 0:
cur.execute(f"ALTER TABLE `patients_extracted` ADD COLUMN `{col}` {coldef}")
conn.commit()
def fetch_batch(conn):
"""
Batch mode (TEST_RID is None):
- flu_reply = 'ano'
- mamedevioucet true-ish
- rc starts with '7' (after removing slash/spaces) <-- keep/adjust as you need
- pozchripkavytvoren is NULL
- uuid column present
Test mode (TEST_RID set): returns only that rid.
"""
if TEST_RID:
sql = f"""
SELECT
rid, jmeno, prijmeni, rc,
`{UUID_COLUMN}` AS uuid
FROM patients_extracted
WHERE rid = %s
LIMIT 1
"""
with conn.cursor() as cur:
cur.execute(sql, (TEST_RID,))
return cur.fetchall()
sql = f"""
SELECT
rid, jmeno, prijmeni, rc,
`{UUID_COLUMN}` AS uuid
FROM patients_extracted
WHERE flu_reply = 'ano'
AND mamedevioucet = 0
AND {FLAG_COLUMN} IS NULL
AND `{UUID_COLUMN}` IS NOT NULL
AND `{UUID_COLUMN}` <> ''
ORDER BY prijmeni, jmeno
LIMIT %s
"""
with conn.cursor() as cur:
cur.execute(sql, (BATCH_LIMIT,))
return cur.fetchall()
def mark_flag_success(conn, rid: str):
with conn.cursor() as cur:
cur.execute(
f"UPDATE patients_extracted "
f"SET {FLAG_COLUMN}=1, {FLAG_TS_COL}=NOW() "
f"WHERE rid=%s",
(rid,)
)
conn.commit()
# ---------- UI helpers ----------
def _strip_diacritics(s: str) -> str:
"""Return s without diacritics (e.g., 'chřipka' -> 'chripka')."""
return ''.join(c for c in unicodedata.normalize('NFKD', s) if not unicodedata.combining(c))
def has_existing_chripka_request(page: Page, timeout_ms: int = 15000) -> bool:
"""
Detect an existing 'Očkování - Chřipka' request on the patient card.
- Checks both card view (data-testid='patient-request-item' h4) and
table/row view (data-testid='patient-request-row' strong).
- Case/diacritics-insensitive.
"""
try:
page.get_by_text("Historie požadavků").wait_for(timeout=timeout_ms)
except PWTimeout:
# Some layouts may render without this exact header proceed anyway.
pass
# Let the list render
page.wait_for_timeout(600)
titles = []
try:
titles += page.locator("[data-testid='patient-request-item'] h4").all_text_contents()
except Exception:
pass
try:
titles += page.locator("[data-testid='patient-request-row'] strong").all_text_contents()
except Exception:
pass
# Fallback if no headings were captured: read whole items/rows
if not titles:
try:
titles += page.locator("[data-testid='patient-request-item']").all_text_contents()
except Exception:
pass
try:
titles += page.locator("[data-testid='patient-request-row']").all_text_contents()
except Exception:
pass
if not titles:
return False
pat = re.compile(r"\bchripka\b", re.IGNORECASE)
for t in titles:
if pat.search(_strip_diacritics(t)):
return True
return False
def create_flu_request_for_uuid(uuid: str) -> str:
"""
Automate Medevio UI for one patient:
- Open patient card
- If a Chřipka request already exists, return 'skipped'
- Else create 'Očkování - Chřipka' and send MESSAGE_TEXT -> return 'created'
- On failure to send, return 'failed'
"""
with sync_playwright() as p:
browser = p.chromium.launch(headless=False, slow_mo=200)
context = browser.new_context(storage_state=str(STATE_FILE))
ptcard = context.new_page()
url = PATIENT_URL_TMPL.format(uuid=uuid)
ptcard.goto(url, wait_until="networkidle")
# Ensure the card loaded (best-effort)
try:
ptcard.get_by_text("Historie požadavků").wait_for(timeout=7_000)
except PWTimeout:
pass
# ----- pre-check for existing Chřipka request -----
if has_existing_chripka_request(ptcard):
browser.close()
return "skipped"
# ----- Create new request -----
ptcard.get_by_role("button", name="Nový požadavek").click()
ptcard.wait_for_timeout(300)
ptcard.keyboard.type("očkování - chřipka")
ptcard.locator("[role='option']", has_text="Očkování - Chřipka").first.click()
ptcard.get_by_role("button", name="Vytvořit požadavek").click()
time.sleep(2)
# Wait until back on card and the list is visible again
try:
ptcard.get_by_text("Historie požadavků").wait_for(timeout=10_000)
except PWTimeout:
pass
time.sleep(1.0)
# Open the new request (prefer the tile that mentions Chřipka)
try:
ptcard.locator("div[data-testid='patient-request-item']").first.wait_for(timeout=10_000)
chripka_card = ptcard.locator("div[data-testid='patient-request-item']").filter(
has=ptcard.locator("h4", has_text=re.compile(r"(?i)ch[řr]ipka"))
).first
if chripka_card.count() == 0:
ptcard.locator("div[data-testid='patient-request-item']").first.click(timeout=5_000)
else:
chripka_card.click(timeout=5_000)
except Exception:
# fallback: try the first request item
try:
ptcard.locator("div[data-testid='patient-request-item']").first.click(timeout=5_000)
except Exception:
browser.close()
return "failed"
# ----- Send the message -----
try:
ptcard.get_by_placeholder("Napište odpověď").wait_for(timeout=10_000)
ptcard.get_by_placeholder("Napište odpověď").fill(MESSAGE_TEXT)
time.sleep(1.2)
for sel in [
"button:has-text('Odeslat')",
"button:has-text('Odeslat zprávu')",
"button:has-text('Odeslat SMS')",
"button:has-text('Odeslat do aplikace')",
]:
try:
ptcard.click(sel, timeout=4000)
browser.close()
return "created"
except Exception:
continue
except Exception:
pass
browser.close()
return "failed"
# ---------- main ----------
def main():
conn = pymysql.connect(**MYSQL_CFG)
try:
ensure_flag_columns(conn)
rows = fetch_batch(conn)
if not rows:
print("Nenalezen žádný pacient pro zpracování.")
return
print(f"Zpracujeme {len(rows)} pacientů…")
processed = ok = fail = skipped = 0
for r in rows:
processed += 1
rid = r["rid"]
uuid = r["uuid"]
name = f"{r.get('prijmeni','')}, {r.get('jmeno','')}"
rc = r.get("rc", "")
print(f"[{processed:>3}] {name} | RC {rc} | UUID {uuid}")
try:
result = create_flu_request_for_uuid(uuid)
if result == "created":
mark_flag_success(conn, rid)
ok += 1
print(" ✓ vytvořeno + odesláno, DB flag nastaven")
elif result == "skipped":
mark_flag_skipped(conn, rid)
skipped += 1
print(" ↷ již existuje požadavek na chřipku přeskočeno")
else:
fail += 1
print(" ✗ nepodařilo se odeslat zprávu (tlačítko 'Odeslat' nenalezeno?)")
except Exception as e:
fail += 1
conn.rollback()
print(f" ✗ chyba: {type(e).__name__}: {e}")
time.sleep(PAUSE_BETWEEN)
print(f"Hotovo. processed={processed}, ok={ok}, skipped={skipped}, fail={fail}")
finally:
conn.close()
if __name__ == "__main__":
main()