Files
medevio/Medevio4.py
2025-09-22 07:19:26 +02:00

263 lines
8.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#Tento kod se pripoji do kartoteky Medevio, zmeni na 100 pacientu na stranu, nactene
# medevio_dump_patients_html_to_mysql.py
import time
import json
from pathlib import Path
from datetime import datetime
from typing import Set
import mysql.connector
from mysql.connector import errorcode
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
# ---------- CONFIG ----------
STATE_FILE = r"medevio_storage.json"
BASE_LIST_URL = "https://my.medevio.cz/mudr-buzalkova/klinika/pacienti"
SAVE_DELAY_SECONDS = 10 # throttle: 10 sec per patient
# MySQL connection settings (fill in)
MYSQL_CFG = dict(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
)
TABLE_NAME = "patients_html" # schema created automatically
# ---------- DB helpers ----------
def db_connect():
try:
conn = mysql.connector.connect(**MYSQL_CFG)
return conn
except mysql.connector.Error as e:
raise SystemExit(f"MySQL connection failed: {e}")
def db_ensure_table(conn):
ddl = f"""
CREATE TABLE IF NOT EXISTS `{TABLE_NAME}` (
patient_id VARCHAR(64) PRIMARY KEY,
html LONGTEXT NOT NULL,
fetched_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
"""
cur = conn.cursor()
cur.execute(ddl)
conn.commit()
cur.close()
def db_existing_ids(conn) -> Set[str]:
ids = set()
cur = conn.cursor()
cur.execute(f"SELECT patient_id FROM `{TABLE_NAME}`")
for (pid,) in cur.fetchall():
ids.add(pid)
cur.close()
return ids
def db_upsert_html(conn, patient_id: str, html: str):
cur = conn.cursor()
cur.execute(
f"""INSERT INTO `{TABLE_NAME}` (patient_id, html, fetched_at)
VALUES (%s, %s, NOW())
ON DUPLICATE KEY UPDATE html = VALUES(html), fetched_at = VALUES(fetched_at)""",
(patient_id, html),
)
conn.commit()
cur.close()
# ---------- Playwright helpers ----------
def wait_for_grid_ready(page):
# grid present & at least one row (be generous on timeout)
page.wait_for_selector("div[role='rowgroup']", timeout=20000)
page.wait_for_selector("div[role='row'][data-id]", timeout=20000)
def set_page_size_100(page):
# Click the page-size combobox (CZ/EN + generic)
for loc in [
page.get_by_role("combobox", name="Řádků na stránce:"),
page.get_by_role("combobox", name="Rows per page:"),
page.locator("div.MuiTablePagination-root [role='combobox']"),
]:
if loc.count():
loc.first.click()
break
# Select 100 (MUI menu often renders in a portal)
opt = page.get_by_role("option", name="100")
if not opt.count():
opt = page.locator("//li[normalize-space(.)='100']")
opt.first.wait_for(state="visible", timeout=5000)
opt.first.click()
# Wait for rows to refresh
try:
page.wait_for_selector("div[role='row'][data-id]", timeout=10000)
except PWTimeout:
time.sleep(0.8)
def harvest_ids_on_current_page(page) -> Set[str]:
ids = set()
for sel in ["div[role='row'][data-id]", "div.MuiDataGrid-row[data-id]"]:
for row in page.locator(sel).all():
pid = row.get_attribute("data-id")
if pid:
ids.add(pid)
return ids
def click_next_page(page) -> bool:
# Prefer ARIA label
nxt = page.get_by_role("button", name="Go to next page")
if nxt.count():
try:
if nxt.first.is_enabled():
nxt.first.click()
return True
except Exception:
pass
# Fallback (CZ)
nxt2 = page.get_by_role("button", name="Další")
if nxt2.count():
try:
if nxt2.first.is_enabled():
nxt2.first.click()
return True
except Exception:
pass
return False
def ensure_detail_open(page) -> bool:
# Detail drawer/dialog visible?
for sel in ["[role='dialog']", "div.MuiDrawer-paper", "div[aria-modal='true']"]:
loc = page.locator(sel)
if loc.count() and loc.first.is_visible():
return True
return False
# ---------- Main workflow ----------
def collect_all_patient_ids(context) -> Set[str]:
page = context.new_page()
page.set_default_timeout(15000)
page.set_default_navigation_timeout(30000)
# Use domcontentloaded (SPAs often keep network busy)
page.goto(BASE_LIST_URL, wait_until="domcontentloaded")
if "/prihlaseni" in page.url.lower():
raise SystemExit("Session expired → refresh medevio_storage.json via the login script.")
wait_for_grid_ready(page)
# optional: print label like "125 z 1856"
try:
label = page.locator("p.MuiTablePagination-displayedRows").first.inner_text()
print("Pagination label BEFORE:", label)
except Exception:
pass
# Set 100/page
try:
set_page_size_100(page)
try:
label = page.locator("p.MuiTablePagination-displayedRows").first.inner_text()
print("Pagination label AFTER :", label)
except Exception:
pass
except Exception as e:
print(f"Warning: could not set page size to 100: {e!r}")
all_ids: Set[str] = set()
page_index = 1
while True:
wait_for_grid_ready(page)
ids_now = harvest_ids_on_current_page(page)
print(f"Page {page_index}: harvested {len(ids_now)} ids")
all_ids |= ids_now
# Try to go next; if cannot, break
if not click_next_page(page):
break
# Wait for DOM to actually update (new rows)
try:
page.wait_for_load_state("domcontentloaded", timeout=10000)
except PWTimeout:
pass
time.sleep(0.5)
page_index += 1
page.close()
print(f"Total unique IDs collected: {len(all_ids)}")
return all_ids
def fetch_and_store_patient_html(context, conn, patient_id: str):
page = context.new_page()
page.set_default_timeout(15000)
page.set_default_navigation_timeout(30000)
url = f"{BASE_LIST_URL}?pacient={patient_id}"
page.goto(url, wait_until="domcontentloaded")
# If detail didnt open, fallback: go to list, click row
if not ensure_detail_open(page):
page.goto(BASE_LIST_URL, wait_until="domcontentloaded")
try:
page.wait_for_selector(f"div[role='row'][data-id='{patient_id}']", timeout=15000)
page.locator(f"div[role='row'][data-id='{patient_id}']").first.click()
# wait for drawer/dialog
page.wait_for_selector("[role='dialog'], div.MuiDrawer-paper, div[aria-modal='true']", timeout=12000)
except PWTimeout:
print(f"[{patient_id}] detail panel did not open — skipping")
page.close()
return
# Save full HTML of the page (includes the open detail drawer)
html = page.content()
db_upsert_html(conn, patient_id, html)
print(f"[{patient_id}] saved HTML ({len(html)} bytes) at {datetime.now().isoformat(timespec='seconds')}")
page.close()
# Throttle per your requirement
time.sleep(SAVE_DELAY_SECONDS)
def main():
# Check storage exists
if not Path(STATE_FILE).exists():
raise SystemExit(f"Storage not found: {STATE_FILE}")
# DB ready
conn = db_connect()
db_ensure_table(conn)
already = db_existing_ids(conn)
print(f"Already in DB: {len(already)} ids")
with sync_playwright() as p:
browser = p.chromium.launch(headless=False) # set False to watch
context = browser.new_context(storage_state=STATE_FILE)
# 1) Collect all IDs from the listing (all pages)
# all_ids = collect_all_patient_ids(context)
all_ids=db_existing_ids(conn)
# 2) Iterate and store HTML (skip existing)
todo = [pid for pid in sorted(all_ids) if pid not in already]
print(f"To fetch now: {len(todo)} ids (skipping {len(all_ids)-len(todo)} already saved)")
for i, pid in enumerate(todo, 1):
try:
fetch_and_store_patient_html(context, conn, pid)
except Exception as e:
print(f"[{pid}] ERROR: {e!r} — continuing with next")
browser.close()
conn.close()
print("Done.")
if __name__ == "__main__":
main()