Files
medevio/Merevio05ReadWhetherRegisteredMedevio.py
2025-09-21 21:35:39 +02:00

189 lines
6.3 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
from pathlib import Path
from datetime import datetime
import pymysql
from pymysql.cursors import DictCursor
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
# ========= CONFIG =========
MYSQL_CFG = dict(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
cursorclass=DictCursor,
autocommit=False, # we commit in batches
)
# Column in patients_extracted that stores Medevio UUID used in the URL:
UUID_COLUMN = "rid" # <-- change if your column name differs
# Output columns (will be created if missing; MySQL 8.0+ supports IF NOT EXISTS):
REGISTERED_COL = "medevio_registered" # TINYINT(1) NULL/0/1
CHECKED_AT_COL = "medevio_checked_at" # DATETIME NULL
ERROR_COL = "medevio_check_error" # TEXT NULL (optional)
# Medevio routing
PATIENT_URL_TMPL = "https://my.medevio.cz/mudr-buzalkova/klinika/pacienti?pacient={uuid}"
# Login session (created earlier with your script)
STATE_FILE = Path("medevio_storage.json")
# Batch/pace
BATCH_LIMIT = 5 # how many patients per run
SLEEP_SECONDS = 3 # wait between patients (requested)
NAV_TIMEOUT = 20_000 # ms
TEXT_TIMEOUT = 15_000 # ms (for main area/heading)
# Texts indicating NOT registered:
NOT_REGISTERED_STRINGS = [
"Pacientka zatím nemá Medevio účet.",
"Pacient zatím nemá Medevio účet.",
]
# ==========================
SELECT_SQL = f"""
SELECT {UUID_COLUMN} AS uuid, jmeno, prijmeni, rc
FROM patients_extracted
WHERE {UUID_COLUMN} IS NOT NULL
AND {UUID_COLUMN} <> ''
AND {REGISTERED_COL} IS NULL
LIMIT {BATCH_LIMIT};
"""
UPDATE_OK_SQL = f"""
UPDATE patients_extracted
SET {REGISTERED_COL}=%s, {CHECKED_AT_COL}=NOW(), {ERROR_COL}=NULL
WHERE {UUID_COLUMN}=%s
"""
UPDATE_ERR_SQL = f"""
UPDATE patients_extracted
SET {REGISTERED_COL}=NULL, {CHECKED_AT_COL}=NOW(), {ERROR_COL}=%s
WHERE {UUID_COLUMN}=%s
"""
DDL_SQLS = [
f"ALTER TABLE patients_extracted ADD COLUMN {REGISTERED_COL} TINYINT(1) NULL",
f"ALTER TABLE patients_extracted ADD COLUMN {CHECKED_AT_COL} DATETIME NULL",
f"ALTER TABLE patients_extracted ADD COLUMN {ERROR_COL} TEXT NULL",
]
CHECKS_FOR_DDL_SQLS=[
f"SELECT COUNT(*) AS cnt FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'patients_extracted' AND COLUMN_NAME = '{REGISTERED_COL}'",
f"SELECT COUNT(*) AS cnt FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'patients_extracted' AND COLUMN_NAME = '{CHECKED_AT_COL}'",
f"SELECT COUNT(*) AS cnt FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'patients_extracted' AND COLUMN_NAME = '{ERROR_COL}'",
]
def ensure_columns(conn):
with conn.cursor() as cur:
for ddl,ddlcheck in zip(DDL_SQLS,CHECKS_FOR_DDL_SQLS):
cur.execute(ddlcheck)
row = cur.fetchone()
if row["cnt"] == 0:
print("Column missing")
cur.execute(ddl)
print(f"✓ Executed: {ddl}")
else:
print("Column exists")
conn.commit()
def pick_registered_flag(page_text: str) -> int:
text = page_text or ""
# If any NOT-registered phrase is present → 0; otherwise assume registered → 1
for marker in NOT_REGISTERED_STRINGS:
if marker in text:
return 0
return 1
def main():
# --- DB: fetch a batch to process ---
conn = pymysql.connect(**MYSQL_CFG)
try:
ensure_columns(conn)
with conn.cursor() as cur:
cur.execute("SET NAMES utf8mb4 COLLATE utf8mb4_czech_ci")
cur.execute("SET collation_connection = 'utf8mb4_czech_ci'")
cur.execute(SELECT_SQL)
rows = cur.fetchall()
if not rows:
print("No patients to check (all have medevio_registered filled).")
return
print(f"Will process {len(rows)} patients…")
# --- Playwright session ---
with sync_playwright() as p:
browser = p.chromium.launch(headless=False, slow_mo=0)
context = browser.new_context(storage_state=str(STATE_FILE))
page = context.new_page()
page.set_default_timeout(NAV_TIMEOUT)
processed = ok = errs = 0
for r in rows:
processed += 1
# pid = r["id"]
uuid = r["uuid"]
name = f"{r.get('prijmeni','')}, {r.get('jmeno','')}"
rc = r.get("rc","")
url = PATIENT_URL_TMPL.format(uuid=uuid)
print(f"URL pro otevření pacienta je: {url}0")
print(f"[{processed:>3}] {name} | RC {rc} | {uuid}{url}")
try:
page.goto(url, wait_until="domcontentloaded")
# Optionally wait for a stable anchor; fallback to sleep
try:
# A stable bit we saw earlier
page.get_by_text("Historie požadavků").wait_for(timeout=TEXT_TIMEOUT)
except PWTimeout:
pass
# Wait the requested 3 seconds for the UI to settle
time.sleep(SLEEP_SECONDS)
# Get full text and detect
full_text = page.content() # HTML; safer to check visible text too:
vis_text = page.inner_text("body")
registered = pick_registered_flag(full_text) if full_text else pick_registered_flag(vis_text)
with conn.cursor() as cur:
cur.execute(UPDATE_OK_SQL, (registered, pid))
conn.commit()
ok += 1
state = "REGISTERED" if registered == 1 else "NOT REGISTERED"
print(f"{state}")
except Exception as e:
conn.rollback()
errs += 1
msg = f"{type(e).__name__}: {e}"
with conn.cursor() as cur:
cur.execute(UPDATE_ERR_SQL, (msg[:1000], pid))
conn.commit()
print(f" ! ERROR → {msg}")
browser.close()
print(f"Done. processed={processed}, ok={ok}, errors={errs}")
finally:
conn.close()
if __name__ == "__main__":
main()