208 lines
7.1 KiB
Python
208 lines
7.1 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from pathlib import Path
|
|
import re
|
|
import time
|
|
import pymysql
|
|
from pymysql.cursors import DictCursor
|
|
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
|
|
|
|
# ================== CONFIG ==================
|
|
STATE_FILE = Path("medevio_storage.json")
|
|
|
|
MYSQL_CFG = dict(
|
|
host="192.168.1.76",
|
|
port=3307,
|
|
user="root",
|
|
password="Vlado9674+",
|
|
database="medevio",
|
|
cursorclass=DictCursor,
|
|
autocommit=False,
|
|
)
|
|
|
|
# Column that goes into the Medevio URL.
|
|
# If your Medevio patient UUID is stored in a different column, change this:
|
|
UUID_COLUMN = "rid" # Medevio UUID in your table
|
|
FLAG_COLUMN = "pozchripkavytvoren" # set to 1 on success
|
|
FLAG_TS_COL = "pozchripka_vytv_at" # timestamp when created
|
|
|
|
MESSAGE_TEXT = (
|
|
"Dobrý den, vakcína proti chřipce je k dispozici, "
|
|
"zítra (úterý 23.9) budeme očkovat od 13-17 hodin, "
|
|
"prosím, otevřete si tento požadavek a vyberte si termín. Můžete si samozřejmě vybrat i kterýkoliv jiný den, ale hromadně očkujeme další 4 úterky. Další 4 úterky najdete spoustu termínů."
|
|
)
|
|
|
|
PATIENT_URL_TMPL = "https://my.medevio.cz/mudr-buzalkova/klinika/pacienti?pacient={uuid}"
|
|
BATCH_LIMIT = 2 # change if you want to limit how many to process in one run
|
|
PAUSE_BETWEEN = 1.0 # seconds between patients (UI courtesy)
|
|
# ===========================================
|
|
|
|
RC_DIGITS = re.compile(r"\D+")
|
|
|
|
def ensure_flag_columns(conn):
|
|
"""Create required columns if missing (portable)."""
|
|
needed = {
|
|
FLAG_COLUMN: "TINYINT(1) NULL",
|
|
FLAG_TS_COL: "DATETIME NULL",
|
|
}
|
|
with conn.cursor() as cur:
|
|
for col, coldef in needed.items():
|
|
cur.execute("""
|
|
SELECT COUNT(*) AS cnt
|
|
FROM INFORMATION_SCHEMA.COLUMNS
|
|
WHERE TABLE_SCHEMA = DATABASE()
|
|
AND TABLE_NAME = 'patients_extracted'
|
|
AND COLUMN_NAME = %s
|
|
""", (col,))
|
|
if cur.fetchone()["cnt"] == 0:
|
|
cur.execute(f"ALTER TABLE `patients_extracted` ADD COLUMN `{col}` {coldef}")
|
|
conn.commit()
|
|
|
|
def fetch_batch(conn):
|
|
"""
|
|
Get patients where:
|
|
- flu_reply = 'ano'
|
|
- mamedevioucet is true-ish
|
|
- rc starts with '8' (after removing slash/spaces)
|
|
- pozchripkavytvoren is NULL
|
|
- uuid column is present
|
|
"""
|
|
sql = f"""
|
|
SELECT
|
|
rid, jmeno, prijmeni, rc,
|
|
`{UUID_COLUMN}` AS uuid
|
|
FROM patients_extracted
|
|
WHERE flu_reply = 'ano'
|
|
AND (mamedevioucet = 1 OR mamedevioucet = TRUE OR mamedevioucet = '1')
|
|
AND REPLACE(REPLACE(rc,'/',''),' ','') LIKE '0%%'
|
|
AND {FLAG_COLUMN} IS NULL
|
|
AND `{UUID_COLUMN}` IS NOT NULL
|
|
AND `{UUID_COLUMN}` <> ''
|
|
ORDER BY prijmeni, jmeno
|
|
LIMIT %s
|
|
"""
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, (BATCH_LIMIT,))
|
|
return cur.fetchall()
|
|
|
|
def mark_flag_success(conn, rid: str):
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
f"UPDATE patients_extracted "
|
|
f"SET {FLAG_COLUMN}=1, {FLAG_TS_COL}=NOW() "
|
|
f"WHERE rid=%s",
|
|
(rid,)
|
|
)
|
|
conn.commit()
|
|
|
|
def create_flu_request_for_uuid(uuid: str) -> bool:
|
|
"""Automate Medevio UI for one patient: create 'Očkování - Chřipka' and send MESSAGE_TEXT."""
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=False, slow_mo=200)
|
|
context = browser.new_context(storage_state=str(STATE_FILE))
|
|
ptcard = context.new_page()
|
|
|
|
url = PATIENT_URL_TMPL.format(uuid=uuid)
|
|
ptcard.goto(url, wait_until="networkidle")
|
|
|
|
# ensure patient card loaded
|
|
ptcard.get_by_text("Historie požadavků").wait_for(timeout=15_000)
|
|
|
|
# create new request
|
|
ptcard.get_by_role("button", name="Nový požadavek").click()
|
|
ptcard.wait_for_timeout(300)
|
|
ptcard.keyboard.type("očkování - chřipka")
|
|
ptcard.locator("[role='option']", has_text="Očkování - Chřipka").first.click()
|
|
ptcard.get_by_role("button", name="Vytvořit požadavek").click()
|
|
time.sleep(2)
|
|
# # wait until back on card
|
|
# try:
|
|
# ptcard.get_by_text("Historie požadavků").wait_for(timeout=7_000)
|
|
# except PWTimeout:
|
|
# ptcard.goto(url, wait_until="networkidle")
|
|
# ptcard.get_by_text("Historie požadavků").wait_for(timeout=10_000)
|
|
|
|
# ptcard.reload(wait_until="networkidle")
|
|
ptcard.get_by_text("Historie požadavků").wait_for(timeout=10_000)
|
|
time.sleep(2)
|
|
|
|
# open the new request
|
|
try:
|
|
ptcard.locator("div[data-testid='patient-request-item']").first.wait_for(timeout=10_000)
|
|
chripka_card = ptcard.locator("div[data-testid='patient-request-item']").filter(
|
|
has=ptcard.locator("h4:has-text('Očkování - Chřipka')")
|
|
).first
|
|
chripka_card.click(timeout=5_000)
|
|
except Exception:
|
|
ptcard.locator("div[data-testid='patient-request-item']").first.click(timeout=5_000)
|
|
|
|
# send the message
|
|
# try:
|
|
# ptcard.wait_for_url("**/pozadavky?pozadavek=*", timeout=10_000)
|
|
# except PWTimeout:
|
|
# pass
|
|
ptcard.get_by_placeholder("Napište odpověď").wait_for(timeout=10_000)
|
|
ptcard.get_by_placeholder("Napište odpověď").fill(MESSAGE_TEXT)
|
|
time.sleep(2)
|
|
for sel in [
|
|
"button:has-text('Odeslat')",
|
|
"button:has-text('Odeslat zprávu')",
|
|
"button:has-text('Odeslat SMS')",
|
|
"button:has-text('Odeslat do aplikace')",
|
|
]:
|
|
try:
|
|
ptcard.click(sel, timeout=4000)
|
|
browser.close()
|
|
return True
|
|
except Exception:
|
|
continue
|
|
|
|
browser.close()
|
|
return False
|
|
|
|
def main():
|
|
conn = pymysql.connect(**MYSQL_CFG)
|
|
try:
|
|
ensure_flag_columns(conn)
|
|
|
|
rows = fetch_batch(conn)
|
|
if not rows:
|
|
print("Nenalezen žádný pacient pro zpracování.")
|
|
return
|
|
|
|
print(f"Zpracujeme {len(rows)} pacientů…")
|
|
|
|
processed = ok = fail = 0
|
|
for r in rows:
|
|
processed += 1
|
|
rid = r["rid"]
|
|
uuid = r["uuid"]
|
|
name = f"{r.get('prijmeni','')}, {r.get('jmeno','')}"
|
|
rc = r.get("rc","")
|
|
|
|
print(f"[{processed:>3}] {name} | RC {rc} | UUID {uuid}")
|
|
|
|
try:
|
|
success = create_flu_request_for_uuid(uuid)
|
|
if success:
|
|
mark_flag_success(conn, rid)
|
|
ok += 1
|
|
print(" ✓ vytvořeno + odesláno, DB flag nastaven")
|
|
else:
|
|
fail += 1
|
|
print(" ✗ nepodařilo se odeslat zprávu (tlačítko 'Odeslat' nenalezeno)")
|
|
except Exception as e:
|
|
fail += 1
|
|
conn.rollback()
|
|
print(f" ✗ chyba: {type(e).__name__}: {e}")
|
|
|
|
time.sleep(PAUSE_BETWEEN)
|
|
|
|
print(f"Hotovo. processed={processed}, ok={ok}, fail={fail}")
|
|
finally:
|
|
conn.close()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|