190 lines
6.5 KiB
Python
190 lines
6.5 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
import re
|
|
import time
|
|
import pymysql
|
|
from pymysql.cursors import DictCursor
|
|
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
|
|
|
|
# =====================================================
|
|
STATE_FILE = Path("medevio_storage.json") # saved login state from your login script
|
|
|
|
MYSQL_CFG = dict(
|
|
host="192.168.1.76",
|
|
port=3307,
|
|
user="root",
|
|
password="Vlado9674+",
|
|
database="medevio",
|
|
cursorclass=DictCursor,
|
|
autocommit=False,
|
|
)
|
|
|
|
UUID_COLUMN = "rid" # column with Medevio UUID
|
|
FLAG_COLUMN = "pozchripkavytvoren" # bool flag we update after success
|
|
FLAG_TS_COL = "pozchripka_vytv_at" # optional timestamp when request created
|
|
|
|
MESSAGE_TEXT = (
|
|
"Dobrý den, vakcína proti chřipce je k dispozici, "
|
|
"zítra (úterý 23.9) budeme očkovat od 13-17 hodin, "
|
|
"prosím potvrďte jestli můžete přijít a jaký čas se Vám hodí."
|
|
)
|
|
PATIENT_URL_TMPL = "https://my.medevio.cz/mudr-buzalkova/klinika/pacienti?pacient={uuid}"
|
|
RC_DIGITS = re.compile(r"\D+")
|
|
# =====================================================
|
|
|
|
def normalize_rc(rc: str) -> str:
|
|
"""Return digits-only RC (removes slash/spaces)."""
|
|
return RC_DIGITS.sub("", rc or "")
|
|
|
|
def ensure_flag_columns(conn):
|
|
"""Create required columns if missing (works for all MySQL/MariaDB)."""
|
|
needed = {
|
|
FLAG_COLUMN: "TINYINT(1) NULL",
|
|
FLAG_TS_COL: "DATETIME NULL",
|
|
}
|
|
with conn.cursor() as cur:
|
|
for col, coldef in needed.items():
|
|
cur.execute("""
|
|
SELECT COUNT(*) AS cnt
|
|
FROM INFORMATION_SCHEMA.COLUMNS
|
|
WHERE TABLE_SCHEMA = DATABASE()
|
|
AND TABLE_NAME = 'patients_extracted'
|
|
AND COLUMN_NAME = %s
|
|
""", (col,))
|
|
if cur.fetchone()["cnt"] == 0:
|
|
ddl = f"ALTER TABLE `patients_extracted` ADD COLUMN `{col}` {coldef}"
|
|
print("Adding column:", ddl)
|
|
cur.execute(ddl)
|
|
conn.commit()
|
|
|
|
def fetch_uuid_by_rc(conn, rc_digits: str) -> dict | None:
|
|
"""
|
|
Return row with rid (primary key), medevio UUID, jmeno, prijmeni for the given RC.
|
|
Prints the query and parameter for debugging.
|
|
"""
|
|
sql = (
|
|
f"SELECT rid, `{UUID_COLUMN}` AS uuid, jmeno, prijmeni, rc "
|
|
"FROM patients_extracted "
|
|
"WHERE REPLACE(REPLACE(rc,'/',''),' ','') = %s "
|
|
"LIMIT 1"
|
|
)
|
|
print("DEBUG SQL:", sql, "| param:", rc_digits)
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, (rc_digits,))
|
|
row = cur.fetchone()
|
|
|
|
print("DEBUG result:", row)
|
|
return row
|
|
|
|
def mark_flag_success(conn, rid: str):
|
|
"""Update the flag once the Medevio request is created."""
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
f"UPDATE patients_extracted "
|
|
f"SET {FLAG_COLUMN}=1, {FLAG_TS_COL}=NOW() "
|
|
f"WHERE rid=%s",
|
|
(rid,)
|
|
)
|
|
conn.commit()
|
|
|
|
def create_flu_request_for_uuid(uuid: str) -> bool:
|
|
"""Automates Medevio UI to create 'Očkování - Chřipka' request and send MESSAGE_TEXT."""
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=False, slow_mo=200)
|
|
context = browser.new_context(storage_state=str(STATE_FILE))
|
|
ptcard = context.new_page()
|
|
|
|
url = PATIENT_URL_TMPL.format(uuid=uuid)
|
|
ptcard.goto(url, wait_until="networkidle")
|
|
|
|
# ensure patient card loaded
|
|
ptcard.get_by_text("Historie požadavků").wait_for(timeout=15_000)
|
|
|
|
# create new request
|
|
ptcard.get_by_role("button", name="Nový požadavek").click()
|
|
ptcard.wait_for_timeout(300)
|
|
ptcard.keyboard.type("očkování - chřipka")
|
|
ptcard.locator("[role='option']", has_text="Očkování - Chřipka").first.click()
|
|
ptcard.get_by_role("button", name="Vytvořit požadavek").click()
|
|
time.sleep(5)
|
|
# wait until back on card
|
|
try:
|
|
ptcard.get_by_text("Historie požadavků").wait_for(timeout=7_000)
|
|
except PWTimeout:
|
|
ptcard.goto(url, wait_until="networkidle")
|
|
ptcard.get_by_text("Historie požadavků").wait_for(timeout=10_000)
|
|
|
|
ptcard.reload(wait_until="networkidle")
|
|
ptcard.get_by_text("Historie požadavků").wait_for(timeout=10_000)
|
|
time.sleep(2)
|
|
|
|
# open the new request
|
|
try:
|
|
ptcard.locator("div[data-testid='patient-request-item']").first.wait_for(timeout=10_000)
|
|
chripka_card = ptcard.locator("div[data-testid='patient-request-item']").filter(
|
|
has=ptcard.locator("h4:has-text('Očkování - Chřipka')")
|
|
).first
|
|
chripka_card.click(timeout=5_000)
|
|
except Exception:
|
|
ptcard.locator("div[data-testid='patient-request-item']").first.click(timeout=5_000)
|
|
|
|
# send the message
|
|
try:
|
|
ptcard.wait_for_url("**/pozadavky?pozadavek=*", timeout=10_000)
|
|
except PWTimeout:
|
|
pass
|
|
ptcard.get_by_placeholder("Napište odpověď").wait_for(timeout=10_000)
|
|
ptcard.get_by_placeholder("Napište odpověď").fill(MESSAGE_TEXT)
|
|
|
|
for sel in [
|
|
"button:has-text('Odeslat')",
|
|
"button:has-text('Odeslat zprávu')",
|
|
"button:has-text('Odeslat SMS')",
|
|
"button:has-text('Odeslat do aplikace')",
|
|
]:
|
|
try:
|
|
ptcard.click(sel, timeout=4000)
|
|
browser.close()
|
|
return True
|
|
except Exception:
|
|
continue
|
|
|
|
browser.close()
|
|
return False
|
|
|
|
def main():
|
|
rc_input = input("Zadejte RC (s/bez lomítka, Enter pro konec): ").strip()
|
|
# rc_input="320312460"
|
|
if not rc_input:
|
|
print("Konec.")
|
|
return
|
|
rc = normalize_rc(rc_input)
|
|
|
|
conn = pymysql.connect(**MYSQL_CFG)
|
|
try:
|
|
ensure_flag_columns(conn)
|
|
|
|
row = fetch_uuid_by_rc(conn, rc)
|
|
if not row or not row.get("uuid"):
|
|
print(f"✗ Pacient s RC {rc} nenalezen nebo nemá sloupec {UUID_COLUMN}.")
|
|
return
|
|
|
|
print(f"→ Nalezen: {row.get('prijmeni','')} {row.get('jmeno','')} "
|
|
f"| RC {row.get('rc','')} | UUID {row['uuid']} | rid {row['rid']}")
|
|
|
|
if create_flu_request_for_uuid(row["uuid"]):
|
|
mark_flag_success(conn, row["rid"])
|
|
print("✅ Požadavek chřipka vytvořen a DB aktualizována.")
|
|
else:
|
|
print("✗ Nepodařilo se odeslat zprávu v požadavku.")
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|