#!/usr/bin/env python3 # -*- coding: utf-8 -*- from pathlib import Path import re import time import unicodedata import pymysql from pymysql.cursors import DictCursor from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout, Page # ================== CONFIG ================== STATE_FILE = Path("medevio_storage.json") MYSQL_CFG = dict( host="192.168.1.76", port=3307, user="root", password="Vlado9674+", database="medevio", cursorclass=DictCursor, autocommit=False, ) # Column that goes into the Medevio URL. # If your Medevio patient UUID is stored in a different column, change this: UUID_COLUMN = "rid" # Medevio UUID in your table FLAG_COLUMN = "pozchripkavytvoren" # set to 1 on success FLAG_TS_COL = "pozchripka_vytv_at" # timestamp when created # Optional: set your personal RID here to test on a single card; set to None for batch mode TEST_RID = None # e.g. "fcb2414b-067b-4ca2-91b2-6c36a86d4cbb" # TEST_RID = "fcb2414b-067b-4ca2-91b2-6c36a86d4cbb" MESSAGE_TEXT = ( "Dobrý den, vakcína proti chřipce je k dispozici. " "Vy nemáte účet v Medeviu a tedy si nemůžete vybrat termín, takže to zkusíme udělat manuálně. " "Hlavní očkovací dny jsou úterý 07/10 a úterý 14/10, kdy očkujeme i COVID, kdo chce. Chřipku samostatně možno i kdykoliv jindy. Tak dejte vědět, jaký termín se Vám hodí a já si to poznamenám." ) PATIENT_URL_TMPL = "https://my.medevio.cz/mudr-buzalkova/klinika/pacienti?pacient={uuid}" BATCH_LIMIT = 50 # change if you want to limit how many to process in one run PAUSE_BETWEEN = 1.0 # seconds between patients (UI courtesy) # =========================================== RC_DIGITS = re.compile(r"\D+") def mark_flag_skipped(conn, rid: str): """ Pokud už požadavek na chřipku existuje: - nastaví pozchripkavytvoren = 1 - zapíše aktuální čas do pozchripka_vytv_at """ with conn.cursor() as cur: cur.execute( f"UPDATE patients_extracted " f"SET {FLAG_COLUMN}=1, {FLAG_TS_COL}=NOW() " f"WHERE rid=%s", (rid,) ) conn.commit() # ---------- DB helpers ---------- def ensure_flag_columns(conn): """Create required columns if missing (portable).""" needed = { FLAG_COLUMN: "TINYINT(1) NULL", FLAG_TS_COL: "DATETIME NULL", } with conn.cursor() as cur: for col, coldef in needed.items(): cur.execute(""" SELECT COUNT(*) AS cnt FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'patients_extracted' AND COLUMN_NAME = %s """, (col,)) if cur.fetchone()["cnt"] == 0: cur.execute(f"ALTER TABLE `patients_extracted` ADD COLUMN `{col}` {coldef}") conn.commit() def fetch_batch(conn): """ Batch mode (TEST_RID is None): - flu_reply = 'ano' - mamedevioucet true-ish - rc starts with '7' (after removing slash/spaces) <-- keep/adjust as you need - pozchripkavytvoren is NULL - uuid column present Test mode (TEST_RID set): returns only that rid. """ if TEST_RID: sql = f""" SELECT rid, jmeno, prijmeni, rc, `{UUID_COLUMN}` AS uuid FROM patients_extracted WHERE rid = %s LIMIT 1 """ with conn.cursor() as cur: cur.execute(sql, (TEST_RID,)) return cur.fetchall() sql = f""" SELECT rid, jmeno, prijmeni, rc, `{UUID_COLUMN}` AS uuid FROM patients_extracted WHERE flu_reply = 'ano' AND mamedevioucet = 0 AND {FLAG_COLUMN} IS NULL AND `{UUID_COLUMN}` IS NOT NULL AND `{UUID_COLUMN}` <> '' ORDER BY prijmeni, jmeno LIMIT %s """ with conn.cursor() as cur: cur.execute(sql, (BATCH_LIMIT,)) return cur.fetchall() def mark_flag_success(conn, rid: str): with conn.cursor() as cur: cur.execute( f"UPDATE patients_extracted " f"SET {FLAG_COLUMN}=1, {FLAG_TS_COL}=NOW() " f"WHERE rid=%s", (rid,) ) conn.commit() # ---------- UI helpers ---------- def _strip_diacritics(s: str) -> str: """Return s without diacritics (e.g., 'chřipka' -> 'chripka').""" return ''.join(c for c in unicodedata.normalize('NFKD', s) if not unicodedata.combining(c)) def has_existing_chripka_request(page: Page, timeout_ms: int = 15000) -> bool: """ Detect an existing 'Očkování - Chřipka' request on the patient card. - Checks both card view (data-testid='patient-request-item' h4) and table/row view (data-testid='patient-request-row' strong). - Case/diacritics-insensitive. """ try: page.get_by_text("Historie požadavků").wait_for(timeout=timeout_ms) except PWTimeout: # Some layouts may render without this exact header – proceed anyway. pass # Let the list render page.wait_for_timeout(600) titles = [] try: titles += page.locator("[data-testid='patient-request-item'] h4").all_text_contents() except Exception: pass try: titles += page.locator("[data-testid='patient-request-row'] strong").all_text_contents() except Exception: pass # Fallback if no headings were captured: read whole items/rows if not titles: try: titles += page.locator("[data-testid='patient-request-item']").all_text_contents() except Exception: pass try: titles += page.locator("[data-testid='patient-request-row']").all_text_contents() except Exception: pass if not titles: return False pat = re.compile(r"\bchripka\b", re.IGNORECASE) for t in titles: if pat.search(_strip_diacritics(t)): return True return False def create_flu_request_for_uuid(uuid: str) -> str: """ Automate Medevio UI for one patient: - Open patient card - If a Chřipka request already exists, return 'skipped' - Else create 'Očkování - Chřipka' and send MESSAGE_TEXT -> return 'created' - On failure to send, return 'failed' """ with sync_playwright() as p: browser = p.chromium.launch(headless=False, slow_mo=200) context = browser.new_context(storage_state=str(STATE_FILE)) ptcard = context.new_page() url = PATIENT_URL_TMPL.format(uuid=uuid) ptcard.goto(url, wait_until="networkidle") # Ensure the card loaded (best-effort) try: ptcard.get_by_text("Historie požadavků").wait_for(timeout=7_000) except PWTimeout: pass # ----- pre-check for existing Chřipka request ----- if has_existing_chripka_request(ptcard): browser.close() return "skipped" # ----- Create new request ----- ptcard.get_by_role("button", name="Nový požadavek").click() ptcard.wait_for_timeout(300) ptcard.keyboard.type("očkování - chřipka") ptcard.locator("[role='option']", has_text="Očkování - Chřipka").first.click() ptcard.get_by_role("button", name="Vytvořit požadavek").click() time.sleep(2) # Wait until back on card and the list is visible again try: ptcard.get_by_text("Historie požadavků").wait_for(timeout=10_000) except PWTimeout: pass time.sleep(1.0) # Open the new request (prefer the tile that mentions Chřipka) try: ptcard.locator("div[data-testid='patient-request-item']").first.wait_for(timeout=10_000) chripka_card = ptcard.locator("div[data-testid='patient-request-item']").filter( has=ptcard.locator("h4", has_text=re.compile(r"(?i)ch[řr]ipka")) ).first if chripka_card.count() == 0: ptcard.locator("div[data-testid='patient-request-item']").first.click(timeout=5_000) else: chripka_card.click(timeout=5_000) except Exception: # fallback: try the first request item try: ptcard.locator("div[data-testid='patient-request-item']").first.click(timeout=5_000) except Exception: browser.close() return "failed" # ----- Send the message ----- try: ptcard.get_by_placeholder("Napište odpověď").wait_for(timeout=10_000) ptcard.get_by_placeholder("Napište odpověď").fill(MESSAGE_TEXT) time.sleep(1.2) for sel in [ "button:has-text('Odeslat')", "button:has-text('Odeslat zprávu')", "button:has-text('Odeslat SMS')", "button:has-text('Odeslat do aplikace')", ]: try: ptcard.click(sel, timeout=4000) browser.close() return "created" except Exception: continue except Exception: pass browser.close() return "failed" # ---------- main ---------- def main(): conn = pymysql.connect(**MYSQL_CFG) try: ensure_flag_columns(conn) rows = fetch_batch(conn) if not rows: print("Nenalezen žádný pacient pro zpracování.") return print(f"Zpracujeme {len(rows)} pacientů…") processed = ok = fail = skipped = 0 for r in rows: processed += 1 rid = r["rid"] uuid = r["uuid"] name = f"{r.get('prijmeni','')}, {r.get('jmeno','')}" rc = r.get("rc", "") print(f"[{processed:>3}] {name} | RC {rc} | UUID {uuid}") try: result = create_flu_request_for_uuid(uuid) if result == "created": mark_flag_success(conn, rid) ok += 1 print(" ✓ vytvořeno + odesláno, DB flag nastaven") elif result == "skipped": mark_flag_skipped(conn, rid) skipped += 1 print(" ↷ již existuje požadavek na chřipku – přeskočeno") else: fail += 1 print(" ✗ nepodařilo se odeslat zprávu (tlačítko 'Odeslat' nenalezeno?)") except Exception as e: fail += 1 conn.rollback() print(f" ✗ chyba: {type(e).__name__}: {e}") time.sleep(PAUSE_BETWEEN) print(f"Hotovo. processed={processed}, ok={ok}, skipped={skipped}, fail={fail}") finally: conn.close() if __name__ == "__main__": main()