323 lines
11 KiB
Python
323 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
from pathlib import Path
|
||
import re
|
||
import time
|
||
import unicodedata
|
||
import pymysql
|
||
from pymysql.cursors import DictCursor
|
||
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout, Page
|
||
|
||
# ================== CONFIG ==================
|
||
STATE_FILE = Path("medevio_storage.json")
|
||
|
||
MYSQL_CFG = dict(
|
||
host="192.168.1.76",
|
||
port=3307,
|
||
user="root",
|
||
password="Vlado9674+",
|
||
database="medevio",
|
||
cursorclass=DictCursor,
|
||
autocommit=False,
|
||
)
|
||
|
||
# Column that goes into the Medevio URL.
|
||
# If your Medevio patient UUID is stored in a different column, change this:
|
||
UUID_COLUMN = "rid" # Medevio UUID in your table
|
||
FLAG_COLUMN = "pozchripkavytvoren" # set to 1 on success
|
||
FLAG_TS_COL = "pozchripka_vytv_at" # timestamp when created
|
||
|
||
# Optional: set your personal RID here to test on a single card; set to None for batch mode
|
||
TEST_RID = None # e.g. "fcb2414b-067b-4ca2-91b2-6c36a86d4cbb"
|
||
# TEST_RID = "fcb2414b-067b-4ca2-91b2-6c36a86d4cbb"
|
||
|
||
MESSAGE_TEXT = (
|
||
"Dobrý den, vakcína proti chřipce je k dispozici. "
|
||
"Vy nemáte účet v Medeviu a tedy si nemůžete vybrat termín, takže to zkusíme udělat manuálně. "
|
||
"Hlavní očkovací dny jsou úterý 07/10 a úterý 14/10, kdy očkujeme i COVID, kdo chce. Chřipku samostatně možno i kdykoliv jindy. Tak dejte vědět, jaký termín se Vám hodí a já si to poznamenám."
|
||
)
|
||
|
||
PATIENT_URL_TMPL = "https://my.medevio.cz/mudr-buzalkova/klinika/pacienti?pacient={uuid}"
|
||
BATCH_LIMIT = 50 # change if you want to limit how many to process in one run
|
||
PAUSE_BETWEEN = 1.0 # seconds between patients (UI courtesy)
|
||
# ===========================================
|
||
|
||
RC_DIGITS = re.compile(r"\D+")
|
||
|
||
def mark_flag_skipped(conn, rid: str):
|
||
"""
|
||
Pokud už požadavek na chřipku existuje:
|
||
- nastaví pozchripkavytvoren = 1
|
||
- zapíše aktuální čas do pozchripka_vytv_at
|
||
"""
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
f"UPDATE patients_extracted "
|
||
f"SET {FLAG_COLUMN}=1, {FLAG_TS_COL}=NOW() "
|
||
f"WHERE rid=%s",
|
||
(rid,)
|
||
)
|
||
conn.commit()
|
||
|
||
# ---------- DB helpers ----------
|
||
def ensure_flag_columns(conn):
|
||
"""Create required columns if missing (portable)."""
|
||
needed = {
|
||
FLAG_COLUMN: "TINYINT(1) NULL",
|
||
FLAG_TS_COL: "DATETIME NULL",
|
||
}
|
||
with conn.cursor() as cur:
|
||
for col, coldef in needed.items():
|
||
cur.execute("""
|
||
SELECT COUNT(*) AS cnt
|
||
FROM INFORMATION_SCHEMA.COLUMNS
|
||
WHERE TABLE_SCHEMA = DATABASE()
|
||
AND TABLE_NAME = 'patients_extracted'
|
||
AND COLUMN_NAME = %s
|
||
""", (col,))
|
||
if cur.fetchone()["cnt"] == 0:
|
||
cur.execute(f"ALTER TABLE `patients_extracted` ADD COLUMN `{col}` {coldef}")
|
||
conn.commit()
|
||
|
||
|
||
def fetch_batch(conn):
|
||
"""
|
||
Batch mode (TEST_RID is None):
|
||
- flu_reply = 'ano'
|
||
- mamedevioucet true-ish
|
||
- rc starts with '7' (after removing slash/spaces) <-- keep/adjust as you need
|
||
- pozchripkavytvoren is NULL
|
||
- uuid column present
|
||
Test mode (TEST_RID set): returns only that rid.
|
||
"""
|
||
if TEST_RID:
|
||
sql = f"""
|
||
SELECT
|
||
rid, jmeno, prijmeni, rc,
|
||
`{UUID_COLUMN}` AS uuid
|
||
FROM patients_extracted
|
||
WHERE rid = %s
|
||
LIMIT 1
|
||
"""
|
||
with conn.cursor() as cur:
|
||
cur.execute(sql, (TEST_RID,))
|
||
return cur.fetchall()
|
||
|
||
sql = f"""
|
||
SELECT
|
||
rid, jmeno, prijmeni, rc,
|
||
`{UUID_COLUMN}` AS uuid
|
||
FROM patients_extracted
|
||
WHERE flu_reply = 'ano'
|
||
AND mamedevioucet = 0
|
||
AND {FLAG_COLUMN} IS NULL
|
||
AND `{UUID_COLUMN}` IS NOT NULL
|
||
AND `{UUID_COLUMN}` <> ''
|
||
ORDER BY prijmeni, jmeno
|
||
LIMIT %s
|
||
"""
|
||
with conn.cursor() as cur:
|
||
cur.execute(sql, (BATCH_LIMIT,))
|
||
return cur.fetchall()
|
||
|
||
|
||
def mark_flag_success(conn, rid: str):
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
f"UPDATE patients_extracted "
|
||
f"SET {FLAG_COLUMN}=1, {FLAG_TS_COL}=NOW() "
|
||
f"WHERE rid=%s",
|
||
(rid,)
|
||
)
|
||
conn.commit()
|
||
|
||
|
||
# ---------- UI helpers ----------
|
||
def _strip_diacritics(s: str) -> str:
|
||
"""Return s without diacritics (e.g., 'chřipka' -> 'chripka')."""
|
||
return ''.join(c for c in unicodedata.normalize('NFKD', s) if not unicodedata.combining(c))
|
||
|
||
|
||
def has_existing_chripka_request(page: Page, timeout_ms: int = 15000) -> bool:
|
||
"""
|
||
Detect an existing 'Očkování - Chřipka' request on the patient card.
|
||
- Checks both card view (data-testid='patient-request-item' h4) and
|
||
table/row view (data-testid='patient-request-row' strong).
|
||
- Case/diacritics-insensitive.
|
||
"""
|
||
try:
|
||
page.get_by_text("Historie požadavků").wait_for(timeout=timeout_ms)
|
||
except PWTimeout:
|
||
# Some layouts may render without this exact header – proceed anyway.
|
||
pass
|
||
|
||
# Let the list render
|
||
page.wait_for_timeout(600)
|
||
|
||
titles = []
|
||
try:
|
||
titles += page.locator("[data-testid='patient-request-item'] h4").all_text_contents()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
titles += page.locator("[data-testid='patient-request-row'] strong").all_text_contents()
|
||
except Exception:
|
||
pass
|
||
|
||
# Fallback if no headings were captured: read whole items/rows
|
||
if not titles:
|
||
try:
|
||
titles += page.locator("[data-testid='patient-request-item']").all_text_contents()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
titles += page.locator("[data-testid='patient-request-row']").all_text_contents()
|
||
except Exception:
|
||
pass
|
||
|
||
if not titles:
|
||
return False
|
||
|
||
pat = re.compile(r"\bchripka\b", re.IGNORECASE)
|
||
for t in titles:
|
||
if pat.search(_strip_diacritics(t)):
|
||
return True
|
||
return False
|
||
|
||
|
||
def create_flu_request_for_uuid(uuid: str) -> str:
|
||
"""
|
||
Automate Medevio UI for one patient:
|
||
- Open patient card
|
||
- If a Chřipka request already exists, return 'skipped'
|
||
- Else create 'Očkování - Chřipka' and send MESSAGE_TEXT -> return 'created'
|
||
- On failure to send, return 'failed'
|
||
"""
|
||
with sync_playwright() as p:
|
||
browser = p.chromium.launch(headless=False, slow_mo=200)
|
||
context = browser.new_context(storage_state=str(STATE_FILE))
|
||
ptcard = context.new_page()
|
||
|
||
url = PATIENT_URL_TMPL.format(uuid=uuid)
|
||
ptcard.goto(url, wait_until="networkidle")
|
||
|
||
# Ensure the card loaded (best-effort)
|
||
try:
|
||
ptcard.get_by_text("Historie požadavků").wait_for(timeout=7_000)
|
||
except PWTimeout:
|
||
pass
|
||
|
||
# ----- pre-check for existing Chřipka request -----
|
||
if has_existing_chripka_request(ptcard):
|
||
browser.close()
|
||
return "skipped"
|
||
|
||
# ----- Create new request -----
|
||
ptcard.get_by_role("button", name="Nový požadavek").click()
|
||
ptcard.wait_for_timeout(300)
|
||
ptcard.keyboard.type("očkování - chřipka")
|
||
ptcard.locator("[role='option']", has_text="Očkování - Chřipka").first.click()
|
||
ptcard.get_by_role("button", name="Vytvořit požadavek").click()
|
||
time.sleep(2)
|
||
|
||
# Wait until back on card and the list is visible again
|
||
try:
|
||
ptcard.get_by_text("Historie požadavků").wait_for(timeout=10_000)
|
||
except PWTimeout:
|
||
pass
|
||
time.sleep(1.0)
|
||
|
||
# Open the new request (prefer the tile that mentions Chřipka)
|
||
try:
|
||
ptcard.locator("div[data-testid='patient-request-item']").first.wait_for(timeout=10_000)
|
||
chripka_card = ptcard.locator("div[data-testid='patient-request-item']").filter(
|
||
has=ptcard.locator("h4", has_text=re.compile(r"(?i)ch[řr]ipka"))
|
||
).first
|
||
if chripka_card.count() == 0:
|
||
ptcard.locator("div[data-testid='patient-request-item']").first.click(timeout=5_000)
|
||
else:
|
||
chripka_card.click(timeout=5_000)
|
||
except Exception:
|
||
# fallback: try the first request item
|
||
try:
|
||
ptcard.locator("div[data-testid='patient-request-item']").first.click(timeout=5_000)
|
||
except Exception:
|
||
browser.close()
|
||
return "failed"
|
||
|
||
# ----- Send the message -----
|
||
try:
|
||
ptcard.get_by_placeholder("Napište odpověď").wait_for(timeout=10_000)
|
||
ptcard.get_by_placeholder("Napište odpověď").fill(MESSAGE_TEXT)
|
||
time.sleep(1.2)
|
||
for sel in [
|
||
"button:has-text('Odeslat')",
|
||
"button:has-text('Odeslat zprávu')",
|
||
"button:has-text('Odeslat SMS')",
|
||
"button:has-text('Odeslat do aplikace')",
|
||
]:
|
||
try:
|
||
ptcard.click(sel, timeout=4000)
|
||
browser.close()
|
||
return "created"
|
||
except Exception:
|
||
continue
|
||
except Exception:
|
||
pass
|
||
|
||
browser.close()
|
||
return "failed"
|
||
|
||
|
||
# ---------- main ----------
|
||
def main():
|
||
conn = pymysql.connect(**MYSQL_CFG)
|
||
try:
|
||
ensure_flag_columns(conn)
|
||
|
||
rows = fetch_batch(conn)
|
||
if not rows:
|
||
print("Nenalezen žádný pacient pro zpracování.")
|
||
return
|
||
|
||
print(f"Zpracujeme {len(rows)} pacientů…")
|
||
|
||
processed = ok = fail = skipped = 0
|
||
for r in rows:
|
||
processed += 1
|
||
rid = r["rid"]
|
||
uuid = r["uuid"]
|
||
name = f"{r.get('prijmeni','')}, {r.get('jmeno','')}"
|
||
rc = r.get("rc", "")
|
||
|
||
print(f"[{processed:>3}] {name} | RC {rc} | UUID {uuid}")
|
||
|
||
try:
|
||
result = create_flu_request_for_uuid(uuid)
|
||
if result == "created":
|
||
mark_flag_success(conn, rid)
|
||
ok += 1
|
||
print(" ✓ vytvořeno + odesláno, DB flag nastaven")
|
||
elif result == "skipped":
|
||
mark_flag_skipped(conn, rid)
|
||
skipped += 1
|
||
print(" ↷ již existuje požadavek na chřipku – přeskočeno")
|
||
else:
|
||
fail += 1
|
||
print(" ✗ nepodařilo se odeslat zprávu (tlačítko 'Odeslat' nenalezeno?)")
|
||
except Exception as e:
|
||
fail += 1
|
||
conn.rollback()
|
||
print(f" ✗ chyba: {type(e).__name__}: {e}")
|
||
|
||
time.sleep(PAUSE_BETWEEN)
|
||
|
||
print(f"Hotovo. processed={processed}, ok={ok}, skipped={skipped}, fail={fail}")
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|