This commit is contained in:
2026-04-24 07:44:05 +02:00
parent a1cde8b471
commit c2c2a7ef63
33 changed files with 89 additions and 21 deletions
+125
View File
@@ -0,0 +1,125 @@
#fcb2414b-067b-4ca2-91b2-6c36a86d4cbb = Vladimir Buzalka
#0210db7b-8fb0-4b47-b1d8-ec7a10849a63 = Vladko - testovací aplikace
#tento kód otevře pacienta podle jeho UUID a založí mu požadavek chřipka a finito
from pathlib import Path
from datetime import datetime
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
import time
STATE_FILE = Path("../medevio_storage.json")
PATIENT_UUID = "0210db7b-8fb0-4b47-b1d8-ec7a10849a63"
PATIENT_URL = f"https://my.medevio.cz/mudr-buzalkova/klinika/pacienti?pacient={PATIENT_UUID}"
MESSAGE_TEXT = "Dobrý den, vakcína proti chřipce je k dispozici, zítra (úterý 23.9) budeme očkovat od 13-17 hodin, prosím potvrďte jestli můžete přijít a jaký čas se Vám hodí."
def savepage(name: str, page):
"""
Save the current HTML of a Playwright Page to
U:\Dropbox\!!!Days\Downloads Z230\Pages\<name>.html
"""
folder = Path(r"U:\Dropbox\!!!Days\Downloads Z230\Pages")
folder.mkdir(parents=True, exist_ok=True) # ensure the folder exists
# create sortable timestamp like 2025-09-19_14-05-33
ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
filepath = folder / f"{ts}_{name}.html"
with filepath.open("w", encoding="utf-8") as f:
f.write(page.content())
print(f"Saved page snapshot to {filepath}")
def main():
with sync_playwright() as p:
browser = p.chromium.launch(headless=False, slow_mo=200)
context = browser.new_context(storage_state=str(STATE_FILE))
# ---- keep a stable reference to the patient card page ----
ptcard = context.new_page()
ptcard.goto(PATIENT_URL, wait_until="networkidle")
#saving ptcard1
# savepage("ptcard1",ptcard)
ptcard.get_by_text("Historie požadavků").wait_for(timeout=15_000)
# 1) Create new request on the patient card
ptcard.get_by_role("button", name="Nový požadavek").click()
ptcard.wait_for_timeout(300) # small settle
# cursor is already in the "Začněte psát…" field
ptcard.keyboard.type("očkování - chřipka")
ptcard.locator("[role='option']", has_text="Očkování - Chřipka").first.click()
ptcard.get_by_role("button", name="Vytvořit požadavek").click()
#saving ptcard1
# savepage("ptcard2",ptcard)
# 2) Ensure we are back on the patient card again
# (some UIs rerender; either way we want a fresh list)
try:
ptcard.get_by_text("Historie požadavků").wait_for(timeout=7_000)
except PWTimeout:
# If for any reason we are not on the card, navigate back explicitly
ptcard.goto(PATIENT_URL, wait_until="networkidle")
ptcard.get_by_text("Historie požadavků").wait_for(timeout=10_000)
# Optional: hard refresh to get the just-created request at the top
ptcard.reload(wait_until="networkidle")
ptcard.get_by_text("Historie požadavků").wait_for(timeout=10_000)
time.sleep(5)
# 3) Open the “Očkování Chřipka …” request card by its H4 text
# (click the whole card container, not just the heading)
try:
# wait until at least one request card is rendered
ptcard.locator("div[data-testid='patient-request-item']").first.wait_for(timeout=10_000)
# locate the specific card that contains the H4 with "Očkování - Chřipka"
chripka_card = ptcard.locator("div[data-testid='patient-request-item']").filter(
has=ptcard.locator("h4:has-text('Očkování - Chřipka')")
).first
# ensure it's attached/visible then click it
chripka_card.wait_for(timeout=10_000)
chripka_card.click(timeout=5_000)
except Exception as e:
# Fallback: click the very first card on the list (newest)
try:
first_card = ptcard.locator("div[data-testid='patient-request-item']").first
first_card.click(timeout=5_000)
except Exception:
# if even that fails, save snapshot for inspection and raise
savepage("ptcard_click_fail", ptcard)
raise
# 4) Wait for request detail and send the message
# Were now on the detail page
try:
ptcard.wait_for_url("**/pozadavky?pozadavek=*", timeout=10_000)
except PWTimeout:
pass # URL may be SPA; rely on textarea presence
ptcard.get_by_placeholder("Napište odpověď").wait_for(timeout=10_000)
ptcard.get_by_placeholder("Napište odpověď").fill(MESSAGE_TEXT)
sent = False
for sel in ["button:has-text('Odeslat')",
"button:has-text('Odeslat zprávu')",
"button:has-text('Odeslat SMS')",
"button:has-text('Odeslat do aplikace')"]:
try:
ptcard.click(sel, timeout=4000)
sent = True
break
except Exception:
continue
if not sent:
raise RuntimeError("Nepodařilo se najít/kliknout tlačítko Odeslat.")
ptcard.wait_for_timeout(2000)
print("✅ Požadavek vytvořen, otevřen a zpráva odeslána.")
if __name__ == "__main__":
main()
+189
View File
@@ -0,0 +1,189 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from pathlib import Path
from datetime import datetime
import re
import time
import pymysql
from pymysql.cursors import DictCursor
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
# =====================================================
STATE_FILE = Path("../medevio_storage.json") # saved login state from your login script
MYSQL_CFG = dict(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
cursorclass=DictCursor,
autocommit=False,
)
UUID_COLUMN = "rid" # column with Medevio UUID
FLAG_COLUMN = "pozchripkavytvoren" # bool flag we update after success
FLAG_TS_COL = "pozchripka_vytv_at" # optional timestamp when request created
MESSAGE_TEXT = (
"Dobrý den, vakcína proti chřipce je k dispozici, "
"zítra (úterý 23.9) budeme očkovat od 13-17 hodin, "
"prosím potvrďte jestli můžete přijít a jaký čas se Vám hodí."
)
PATIENT_URL_TMPL = "https://my.medevio.cz/mudr-buzalkova/klinika/pacienti?pacient={uuid}"
RC_DIGITS = re.compile(r"\D+")
# =====================================================
def normalize_rc(rc: str) -> str:
"""Return digits-only RC (removes slash/spaces)."""
return RC_DIGITS.sub("", rc or "")
def ensure_flag_columns(conn):
"""Create required columns if missing (works for all MySQL/MariaDB)."""
needed = {
FLAG_COLUMN: "TINYINT(1) NULL",
FLAG_TS_COL: "DATETIME NULL",
}
with conn.cursor() as cur:
for col, coldef in needed.items():
cur.execute("""
SELECT COUNT(*) AS cnt
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = 'patients_extracted'
AND COLUMN_NAME = %s
""", (col,))
if cur.fetchone()["cnt"] == 0:
ddl = f"ALTER TABLE `patients_extracted` ADD COLUMN `{col}` {coldef}"
print("Adding column:", ddl)
cur.execute(ddl)
conn.commit()
def fetch_uuid_by_rc(conn, rc_digits: str) -> dict | None:
"""
Return row with rid (primary key), medevio UUID, jmeno, prijmeni for the given RC.
Prints the query and parameter for debugging.
"""
sql = (
f"SELECT rid, `{UUID_COLUMN}` AS uuid, jmeno, prijmeni, rc "
"FROM patients_extracted "
"WHERE REPLACE(REPLACE(rc,'/',''),' ','') = %s "
"LIMIT 1"
)
print("DEBUG SQL:", sql, "| param:", rc_digits)
with conn.cursor() as cur:
cur.execute(sql, (rc_digits,))
row = cur.fetchone()
print("DEBUG result:", row)
return row
def mark_flag_success(conn, rid: str):
"""Update the flag once the Medevio request is created."""
with conn.cursor() as cur:
cur.execute(
f"UPDATE patients_extracted "
f"SET {FLAG_COLUMN}=1, {FLAG_TS_COL}=NOW() "
f"WHERE rid=%s",
(rid,)
)
conn.commit()
def create_flu_request_for_uuid(uuid: str) -> bool:
"""Automates Medevio UI to create 'Očkování - Chřipka' request and send MESSAGE_TEXT."""
with sync_playwright() as p:
browser = p.chromium.launch(headless=False, slow_mo=200)
context = browser.new_context(storage_state=str(STATE_FILE))
ptcard = context.new_page()
url = PATIENT_URL_TMPL.format(uuid=uuid)
ptcard.goto(url, wait_until="networkidle")
# ensure patient card loaded
ptcard.get_by_text("Historie požadavků").wait_for(timeout=15_000)
# create new request
ptcard.get_by_role("button", name="Nový požadavek").click()
ptcard.wait_for_timeout(300)
ptcard.keyboard.type("očkování - chřipka")
ptcard.locator("[role='option']", has_text="Očkování - Chřipka").first.click()
ptcard.get_by_role("button", name="Vytvořit požadavek").click()
time.sleep(5)
# wait until back on card
try:
ptcard.get_by_text("Historie požadavků").wait_for(timeout=7_000)
except PWTimeout:
ptcard.goto(url, wait_until="networkidle")
ptcard.get_by_text("Historie požadavků").wait_for(timeout=10_000)
ptcard.reload(wait_until="networkidle")
ptcard.get_by_text("Historie požadavků").wait_for(timeout=10_000)
time.sleep(2)
# open the new request
try:
ptcard.locator("div[data-testid='patient-request-item']").first.wait_for(timeout=10_000)
chripka_card = ptcard.locator("div[data-testid='patient-request-item']").filter(
has=ptcard.locator("h4:has-text('Očkování - Chřipka')")
).first
chripka_card.click(timeout=5_000)
except Exception:
ptcard.locator("div[data-testid='patient-request-item']").first.click(timeout=5_000)
# send the message
try:
ptcard.wait_for_url("**/pozadavky?pozadavek=*", timeout=10_000)
except PWTimeout:
pass
ptcard.get_by_placeholder("Napište odpověď").wait_for(timeout=10_000)
ptcard.get_by_placeholder("Napište odpověď").fill(MESSAGE_TEXT)
for sel in [
"button:has-text('Odeslat')",
"button:has-text('Odeslat zprávu')",
"button:has-text('Odeslat SMS')",
"button:has-text('Odeslat do aplikace')",
]:
try:
ptcard.click(sel, timeout=4000)
browser.close()
return True
except Exception:
continue
browser.close()
return False
def main():
rc_input = input("Zadejte RC (s/bez lomítka, Enter pro konec): ").strip()
# rc_input="320312460"
if not rc_input:
print("Konec.")
return
rc = normalize_rc(rc_input)
conn = pymysql.connect(**MYSQL_CFG)
try:
ensure_flag_columns(conn)
row = fetch_uuid_by_rc(conn, rc)
if not row or not row.get("uuid"):
print(f"✗ Pacient s RC {rc} nenalezen nebo nemá sloupec {UUID_COLUMN}.")
return
print(f"→ Nalezen: {row.get('prijmeni','')} {row.get('jmeno','')} "
f"| RC {row.get('rc','')} | UUID {row['uuid']} | rid {row['rid']}")
if create_flu_request_for_uuid(row["uuid"]):
mark_flag_success(conn, row["rid"])
print("✅ Požadavek chřipka vytvořen a DB aktualizována.")
else:
print("✗ Nepodařilo se odeslat zprávu v požadavku.")
finally:
conn.close()
if __name__ == "__main__":
main()
+207
View File
@@ -0,0 +1,207 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from pathlib import Path
import re
import time
import pymysql
from pymysql.cursors import DictCursor
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
# ================== CONFIG ==================
STATE_FILE = Path("../medevio_storage.json")
MYSQL_CFG = dict(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
cursorclass=DictCursor,
autocommit=False,
)
# Column that goes into the Medevio URL.
# If your Medevio patient UUID is stored in a different column, change this:
UUID_COLUMN = "rid" # Medevio UUID in your table
FLAG_COLUMN = "pozchripkavytvoren" # set to 1 on success
FLAG_TS_COL = "pozchripka_vytv_at" # timestamp when created
MESSAGE_TEXT = (
"Dobrý den, vakcína proti chřipce je k dispozici, "
"dnes (úterý 23.9) budeme očkovat od 13-17 hodin, "
"prosím, otevřete si tento požadavek a vyberte si termín. Můžete si samozřejmě vybrat i kterýkoliv jiný den, ale hromadně očkujeme další 4 úterky. Další 4 úterky najdete spoustu termínů."
)
PATIENT_URL_TMPL = "https://my.medevio.cz/mudr-buzalkova/klinika/pacienti?pacient={uuid}"
BATCH_LIMIT = 2 # change if you want to limit how many to process in one run
PAUSE_BETWEEN = 1.0 # seconds between patients (UI courtesy)
# ===========================================
RC_DIGITS = re.compile(r"\D+")
def ensure_flag_columns(conn):
"""Create required columns if missing (portable)."""
needed = {
FLAG_COLUMN: "TINYINT(1) NULL",
FLAG_TS_COL: "DATETIME NULL",
}
with conn.cursor() as cur:
for col, coldef in needed.items():
cur.execute("""
SELECT COUNT(*) AS cnt
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = 'patients_extracted'
AND COLUMN_NAME = %s
""", (col,))
if cur.fetchone()["cnt"] == 0:
cur.execute(f"ALTER TABLE `patients_extracted` ADD COLUMN `{col}` {coldef}")
conn.commit()
def fetch_batch(conn):
"""
Get patients where:
- flu_reply = 'ano'
- mamedevioucet is true-ish
- rc starts with '8' (after removing slash/spaces)
- pozchripkavytvoren is NULL
- uuid column is present
"""
sql = f"""
SELECT
rid, jmeno, prijmeni, rc,
`{UUID_COLUMN}` AS uuid
FROM patients_extracted
WHERE flu_reply = 'ano'
AND (mamedevioucet = 1 OR mamedevioucet = TRUE OR mamedevioucet = '1')
AND REPLACE(REPLACE(rc,'/',''),' ','') LIKE '7%%'
AND {FLAG_COLUMN} IS NULL
AND `{UUID_COLUMN}` IS NOT NULL
AND `{UUID_COLUMN}` <> ''
ORDER BY prijmeni, jmeno
LIMIT %s
"""
with conn.cursor() as cur:
cur.execute(sql, (BATCH_LIMIT,))
return cur.fetchall()
def mark_flag_success(conn, rid: str):
with conn.cursor() as cur:
cur.execute(
f"UPDATE patients_extracted "
f"SET {FLAG_COLUMN}=1, {FLAG_TS_COL}=NOW() "
f"WHERE rid=%s",
(rid,)
)
conn.commit()
def create_flu_request_for_uuid(uuid: str) -> bool:
"""Automate Medevio UI for one patient: create 'Očkování - Chřipka' and send MESSAGE_TEXT."""
with sync_playwright() as p:
browser = p.chromium.launch(headless=False, slow_mo=200)
context = browser.new_context(storage_state=str(STATE_FILE))
ptcard = context.new_page()
url = PATIENT_URL_TMPL.format(uuid=uuid)
ptcard.goto(url, wait_until="networkidle")
# ensure patient card loaded
ptcard.get_by_text("Historie požadavků").wait_for(timeout=15_000)
# create new request
ptcard.get_by_role("button", name="Nový požadavek").click()
ptcard.wait_for_timeout(300)
ptcard.keyboard.type("očkování - chřipka")
ptcard.locator("[role='option']", has_text="Očkování - Chřipka").first.click()
ptcard.get_by_role("button", name="Vytvořit požadavek").click()
time.sleep(2)
# # wait until back on card
# try:
# ptcard.get_by_text("Historie požadavků").wait_for(timeout=7_000)
# except PWTimeout:
# ptcard.goto(url, wait_until="networkidle")
# ptcard.get_by_text("Historie požadavků").wait_for(timeout=10_000)
# ptcard.reload(wait_until="networkidle")
ptcard.get_by_text("Historie požadavků").wait_for(timeout=10_000)
time.sleep(2)
# open the new request
try:
ptcard.locator("div[data-testid='patient-request-item']").first.wait_for(timeout=10_000)
chripka_card = ptcard.locator("div[data-testid='patient-request-item']").filter(
has=ptcard.locator("h4:has-text('Očkování - Chřipka')")
).first
chripka_card.click(timeout=5_000)
except Exception:
ptcard.locator("div[data-testid='patient-request-item']").first.click(timeout=5_000)
# send the message
# try:
# ptcard.wait_for_url("**/pozadavky?pozadavek=*", timeout=10_000)
# except PWTimeout:
# pass
ptcard.get_by_placeholder("Napište odpověď").wait_for(timeout=10_000)
ptcard.get_by_placeholder("Napište odpověď").fill(MESSAGE_TEXT)
time.sleep(2)
for sel in [
"button:has-text('Odeslat')",
"button:has-text('Odeslat zprávu')",
"button:has-text('Odeslat SMS')",
"button:has-text('Odeslat do aplikace')",
]:
try:
ptcard.click(sel, timeout=4000)
browser.close()
return True
except Exception:
continue
browser.close()
return False
def main():
conn = pymysql.connect(**MYSQL_CFG)
try:
ensure_flag_columns(conn)
rows = fetch_batch(conn)
if not rows:
print("Nenalezen žádný pacient pro zpracování.")
return
print(f"Zpracujeme {len(rows)} pacientů…")
processed = ok = fail = 0
for r in rows:
processed += 1
rid = r["rid"]
uuid = r["uuid"]
name = f"{r.get('prijmeni','')}, {r.get('jmeno','')}"
rc = r.get("rc","")
print(f"[{processed:>3}] {name} | RC {rc} | UUID {uuid}")
try:
success = create_flu_request_for_uuid(uuid)
if success:
mark_flag_success(conn, rid)
ok += 1
print(" ✓ vytvořeno + odesláno, DB flag nastaven")
else:
fail += 1
print(" ✗ nepodařilo se odeslat zprávu (tlačítko 'Odeslat' nenalezeno)")
except Exception as e:
fail += 1
conn.rollback()
print(f" ✗ chyba: {type(e).__name__}: {e}")
time.sleep(PAUSE_BETWEEN)
print(f"Hotovo. processed={processed}, ok={ok}, fail={fail}")
finally:
conn.close()
if __name__ == "__main__":
main()
@@ -0,0 +1,322 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from pathlib import Path
import re
import time
import unicodedata
import pymysql
from pymysql.cursors import DictCursor
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout, Page
# ================== CONFIG ==================
STATE_FILE = Path("../medevio_storage.json")
MYSQL_CFG = dict(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
cursorclass=DictCursor,
autocommit=False,
)
# Column that goes into the Medevio URL.
# If your Medevio patient UUID is stored in a different column, change this:
UUID_COLUMN = "rid" # Medevio UUID in your table
FLAG_COLUMN = "pozchripkavytvoren" # set to 1 on success
FLAG_TS_COL = "pozchripka_vytv_at" # timestamp when created
# Optional: set your personal RID here to test on a single card; set to None for batch mode
TEST_RID = None # e.g. "fcb2414b-067b-4ca2-91b2-6c36a86d4cbb"
# TEST_RID = "fcb2414b-067b-4ca2-91b2-6c36a86d4cbb"
MESSAGE_TEXT = (
"Dobrý den, vakcína proti chřipce je k dispozici. "
"Vy nemáte účet v Medeviu a tedy si nemůžete vybrat termín, takže to zkusíme udělat manuálně. "
"Hlavní očkovací dny jsou úterý 07/10 a úterý 14/10, kdy očkujeme i COVID, kdo chce. Chřipku samostatně možno i kdykoliv jindy. Tak dejte vědět, jaký termín se Vám hodí a já si to poznamenám."
)
PATIENT_URL_TMPL = "https://my.medevio.cz/mudr-buzalkova/klinika/pacienti?pacient={uuid}"
BATCH_LIMIT = 50 # change if you want to limit how many to process in one run
PAUSE_BETWEEN = 1.0 # seconds between patients (UI courtesy)
# ===========================================
RC_DIGITS = re.compile(r"\D+")
def mark_flag_skipped(conn, rid: str):
"""
Pokud už požadavek na chřipku existuje:
- nastaví pozchripkavytvoren = 1
- zapíše aktuální čas do pozchripka_vytv_at
"""
with conn.cursor() as cur:
cur.execute(
f"UPDATE patients_extracted "
f"SET {FLAG_COLUMN}=1, {FLAG_TS_COL}=NOW() "
f"WHERE rid=%s",
(rid,)
)
conn.commit()
# ---------- DB helpers ----------
def ensure_flag_columns(conn):
"""Create required columns if missing (portable)."""
needed = {
FLAG_COLUMN: "TINYINT(1) NULL",
FLAG_TS_COL: "DATETIME NULL",
}
with conn.cursor() as cur:
for col, coldef in needed.items():
cur.execute("""
SELECT COUNT(*) AS cnt
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = 'patients_extracted'
AND COLUMN_NAME = %s
""", (col,))
if cur.fetchone()["cnt"] == 0:
cur.execute(f"ALTER TABLE `patients_extracted` ADD COLUMN `{col}` {coldef}")
conn.commit()
def fetch_batch(conn):
"""
Batch mode (TEST_RID is None):
- flu_reply = 'ano'
- mamedevioucet true-ish
- rc starts with '7' (after removing slash/spaces) <-- keep/adjust as you need
- pozchripkavytvoren is NULL
- uuid column present
Test mode (TEST_RID set): returns only that rid.
"""
if TEST_RID:
sql = f"""
SELECT
rid, jmeno, prijmeni, rc,
`{UUID_COLUMN}` AS uuid
FROM patients_extracted
WHERE rid = %s
LIMIT 1
"""
with conn.cursor() as cur:
cur.execute(sql, (TEST_RID,))
return cur.fetchall()
sql = f"""
SELECT
rid, jmeno, prijmeni, rc,
`{UUID_COLUMN}` AS uuid
FROM patients_extracted
WHERE flu_reply = 'ano'
AND mamedevioucet = 0
AND {FLAG_COLUMN} IS NULL
AND `{UUID_COLUMN}` IS NOT NULL
AND `{UUID_COLUMN}` <> ''
ORDER BY prijmeni, jmeno
LIMIT %s
"""
with conn.cursor() as cur:
cur.execute(sql, (BATCH_LIMIT,))
return cur.fetchall()
def mark_flag_success(conn, rid: str):
with conn.cursor() as cur:
cur.execute(
f"UPDATE patients_extracted "
f"SET {FLAG_COLUMN}=1, {FLAG_TS_COL}=NOW() "
f"WHERE rid=%s",
(rid,)
)
conn.commit()
# ---------- UI helpers ----------
def _strip_diacritics(s: str) -> str:
"""Return s without diacritics (e.g., 'chřipka' -> 'chripka')."""
return ''.join(c for c in unicodedata.normalize('NFKD', s) if not unicodedata.combining(c))
def has_existing_chripka_request(page: Page, timeout_ms: int = 15000) -> bool:
"""
Detect an existing 'Očkování - Chřipka' request on the patient card.
- Checks both card view (data-testid='patient-request-item' h4) and
table/row view (data-testid='patient-request-row' strong).
- Case/diacritics-insensitive.
"""
try:
page.get_by_text("Historie požadavků").wait_for(timeout=timeout_ms)
except PWTimeout:
# Some layouts may render without this exact header proceed anyway.
pass
# Let the list render
page.wait_for_timeout(600)
titles = []
try:
titles += page.locator("[data-testid='patient-request-item'] h4").all_text_contents()
except Exception:
pass
try:
titles += page.locator("[data-testid='patient-request-row'] strong").all_text_contents()
except Exception:
pass
# Fallback if no headings were captured: read whole items/rows
if not titles:
try:
titles += page.locator("[data-testid='patient-request-item']").all_text_contents()
except Exception:
pass
try:
titles += page.locator("[data-testid='patient-request-row']").all_text_contents()
except Exception:
pass
if not titles:
return False
pat = re.compile(r"\bchripka\b", re.IGNORECASE)
for t in titles:
if pat.search(_strip_diacritics(t)):
return True
return False
def create_flu_request_for_uuid(uuid: str) -> str:
"""
Automate Medevio UI for one patient:
- Open patient card
- If a Chřipka request already exists, return 'skipped'
- Else create 'Očkování - Chřipka' and send MESSAGE_TEXT -> return 'created'
- On failure to send, return 'failed'
"""
with sync_playwright() as p:
browser = p.chromium.launch(headless=False, slow_mo=200)
context = browser.new_context(storage_state=str(STATE_FILE))
ptcard = context.new_page()
url = PATIENT_URL_TMPL.format(uuid=uuid)
ptcard.goto(url, wait_until="networkidle")
# Ensure the card loaded (best-effort)
try:
ptcard.get_by_text("Historie požadavků").wait_for(timeout=7_000)
except PWTimeout:
pass
# ----- pre-check for existing Chřipka request -----
if has_existing_chripka_request(ptcard):
browser.close()
return "skipped"
# ----- Create new request -----
ptcard.get_by_role("button", name="Nový požadavek").click()
ptcard.wait_for_timeout(300)
ptcard.keyboard.type("očkování - chřipka")
ptcard.locator("[role='option']", has_text="Očkování - Chřipka").first.click()
ptcard.get_by_role("button", name="Vytvořit požadavek").click()
time.sleep(2)
# Wait until back on card and the list is visible again
try:
ptcard.get_by_text("Historie požadavků").wait_for(timeout=10_000)
except PWTimeout:
pass
time.sleep(1.0)
# Open the new request (prefer the tile that mentions Chřipka)
try:
ptcard.locator("div[data-testid='patient-request-item']").first.wait_for(timeout=10_000)
chripka_card = ptcard.locator("div[data-testid='patient-request-item']").filter(
has=ptcard.locator("h4", has_text=re.compile(r"(?i)ch[řr]ipka"))
).first
if chripka_card.count() == 0:
ptcard.locator("div[data-testid='patient-request-item']").first.click(timeout=5_000)
else:
chripka_card.click(timeout=5_000)
except Exception:
# fallback: try the first request item
try:
ptcard.locator("div[data-testid='patient-request-item']").first.click(timeout=5_000)
except Exception:
browser.close()
return "failed"
# ----- Send the message -----
try:
ptcard.get_by_placeholder("Napište odpověď").wait_for(timeout=10_000)
ptcard.get_by_placeholder("Napište odpověď").fill(MESSAGE_TEXT)
time.sleep(1.2)
for sel in [
"button:has-text('Odeslat')",
"button:has-text('Odeslat zprávu')",
"button:has-text('Odeslat SMS')",
"button:has-text('Odeslat do aplikace')",
]:
try:
ptcard.click(sel, timeout=4000)
browser.close()
return "created"
except Exception:
continue
except Exception:
pass
browser.close()
return "failed"
# ---------- main ----------
def main():
conn = pymysql.connect(**MYSQL_CFG)
try:
ensure_flag_columns(conn)
rows = fetch_batch(conn)
if not rows:
print("Nenalezen žádný pacient pro zpracování.")
return
print(f"Zpracujeme {len(rows)} pacientů…")
processed = ok = fail = skipped = 0
for r in rows:
processed += 1
rid = r["rid"]
uuid = r["uuid"]
name = f"{r.get('prijmeni','')}, {r.get('jmeno','')}"
rc = r.get("rc", "")
print(f"[{processed:>3}] {name} | RC {rc} | UUID {uuid}")
try:
result = create_flu_request_for_uuid(uuid)
if result == "created":
mark_flag_success(conn, rid)
ok += 1
print(" ✓ vytvořeno + odesláno, DB flag nastaven")
elif result == "skipped":
mark_flag_skipped(conn, rid)
skipped += 1
print(" ↷ již existuje požadavek na chřipku přeskočeno")
else:
fail += 1
print(" ✗ nepodařilo se odeslat zprávu (tlačítko 'Odeslat' nenalezeno?)")
except Exception as e:
fail += 1
conn.rollback()
print(f" ✗ chyba: {type(e).__name__}: {e}")
time.sleep(PAUSE_BETWEEN)
print(f"Hotovo. processed={processed}, ok={ok}, skipped={skipped}, fail={fail}")
finally:
conn.close()
if __name__ == "__main__":
main()
+80
View File
@@ -0,0 +1,80 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Open the Medevio daily agenda calendar,
inspect the rendered HTML, and probe JS memory
to see what data is exposed.
"""
from playwright.sync_api import sync_playwright
STATE_FILE = "../medevio_storage.json"
AGENDA_URL = (
"https://my.medevio.cz/mudr-buzalkova/klinika/kalendar/agenda-dne/"
"?kalendar=144c4e12-347c-49ca-9ec0-8ca965a4470d&datum=2025-10-17"
)
def main():
with sync_playwright() as pw:
browser = pw.chromium.launch(headless=False, slow_mo=150)
context = browser.new_context(storage_state=STATE_FILE)
page = context.new_page()
print("🔗 Opening agenda-day calendar...")
page.goto(AGENDA_URL, wait_until="networkidle", timeout=90_000)
# -------- Check login --------
body = (page.text_content("body") or "").lower()
if any(x in body for x in ["přihlášení", "přihlásit", "sign in", "login"]):
raise SystemExit("❌ Not logged in refresh medevio_storage.json.")
# -------- Wait for appointments to render --------
page.wait_for_timeout(4000)
# -------- Dump a few appointment blocks --------
blocks = page.locator("div.rbc-event-inner-content, div[data-testid='Reservation']").evaluate_all(
"(els) => els.map(e => e.outerHTML)"
)
print(f"\n✅ Found {len(blocks)} appointment blocks. Showing first 3:\n")
for snippet in blocks[:3]:
print(snippet)
print("-" * 80)
# -------- Explore window memory --------
print("\n🔍 Inspecting global JS variables...")
keys = page.evaluate("Object.keys(window)")
interesting = [k for k in keys if any(w in k.lower() for w in ["mede", "cal", "react", "state", "reserv"])]
print("Interesting keys:", interesting[:20])
for candidate in [
"window.__INITIAL_STATE__",
"window.__INITIAL_DATA__",
"window.__REACT_DEVTOOLS_GLOBAL_HOOK__",
"window.medevioCalendar",
"window.calendarStore",
"window.reduxStore",
"window.reactProps",
]:
try:
data = page.evaluate(f"JSON.stringify({candidate}, null, 2)")
if data and len(data) > 200:
print(f"\n===== {candidate} =====\n{data[:1000]}...\n")
except Exception:
pass
# -------- Optionally: listen for network requests while you click --------
def log_request(req):
url = req.url
if any(x in url for x in ["pozadavek", "request", "api"]):
print("📡", url)
page.on("request", log_request)
print("\n👉 Now click manually on a few agenda items to open their detail cards.")
print(" Any backend calls will appear below.\n")
page.wait_for_timeout(40000) # give yourself ~40s to click around
browser.close()
if __name__ == "__main__":
main()
+104
View File
@@ -0,0 +1,104 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from playwright.sync_api import sync_playwright
import json
import requests
STATE_FILE = "../medevio_storage.json"
AGENDA_URL = (
"https://my.medevio.cz/mudr-buzalkova/klinika/kalendar/agenda-dne/"
"?kalendar=144c4e12-347c-49ca-9ec0-8ca965a4470d&datum=2025-10-17"
)
GRAPHQL_URL = "https://api.medevio.cz/graphql"
def extract_agenda_rows(page):
rows = page.locator("div[data-testid='reservation-row']")
if rows.count() == 0:
raise SystemExit("❌ No rows found — check selector or login session.")
results = []
print(f"\nFound {rows.count()} rows, showing sample structure:")
for row in rows.all()[:3]:
print("-" * 80)
print(row.evaluate("el => el.outerHTML")[:500], "...\n")
# Now extract data safely
for row in rows.all():
rid = row.get_attribute("data-id") or ""
# Try to read each cell dynamically
cells = row.locator("div.MuiDataGrid-cell")
record = {"id": rid}
for c in cells.all():
field = c.get_attribute("data-field") or "unknown"
text = (c.text_content() or "").strip()
record[field] = text
results.append(record)
return results
def step1_extract_appointments():
with sync_playwright() as pw:
browser = pw.chromium.launch(headless=False, slow_mo=150)
context = browser.new_context(storage_state=STATE_FILE)
page = context.new_page()
print("🔗 Opening Medevio agenda-day page...")
page.goto(AGENDA_URL, wait_until="networkidle", timeout=90_000)
page.wait_for_selector("div[data-testid='reservation-row']", timeout=30_000)
appointments = extract_agenda_rows(page)
browser.close()
print(f"✅ Extracted {len(appointments)} appointments")
for a in appointments:
print(f"{a.get('StartDateTime','?')} {a.get('Patient','?')}: {a.get('Reason','?')} ({a['id']})")
return appointments
def step2_fetch_detail(session_cookies, reservation_id):
headers = {
"content-type": "application/json",
"origin": "https://my.medevio.cz",
"referer": "https://my.medevio.cz/",
}
query = {
"operationName": "ReservationDetail",
"variables": {"id": reservation_id},
"query": """
query ReservationDetail($id: ID!) {
reservation(id: $id) {
id
reason
startDateTime
endDateTime
status
note
patient { id name age }
doctor { id name }
location { name }
}
}
""",
}
print(f"\n📡 Fetching GraphQL detail for {reservation_id}...")
response = requests.post(GRAPHQL_URL, headers=headers, cookies=session_cookies, data=json.dumps(query))
print("Status:", response.status_code)
print(json.dumps(response.json(), indent=2, ensure_ascii=False))
if __name__ == "__main__":
appointments = step1_extract_appointments()
if not appointments:
raise SystemExit("No appointments found.")
# Use first appointment for detail fetch
reservation_id = appointments[0]["id"]
# Load session cookies from storage
with open(STATE_FILE, "r", encoding="utf-8") as f:
state = json.load(f)
cookies = {c["name"]: c["value"] for c in state.get("cookies", []) if "medevio" in c["domain"]}
step2_fetch_detail(cookies, reservation_id)
+44
View File
@@ -0,0 +1,44 @@
import json, requests
GRAPHQL_URL = "https://api.medevio.cz/graphql"
FULL_INTROSPECTION_QUERY = """
query IntrospectionQuery {
__schema {
queryType { name }
mutationType { name }
subscriptionType { name }
types {
...FullType
}
}
}
fragment FullType on __Type {
kind
name
fields(includeDeprecated: true) {
name
}
}
"""
headers = {
"content-type": "application/json",
"origin": "https://my.medevio.cz",
"referer": "https://my.medevio.cz/",
}
# Load cookies from storage
state = json.load(open("../medevio_storage.json", encoding="utf-8"))
cookies = {c["name"]: c["value"] for c in state["cookies"] if "medevio" in c["domain"]}
payload = {"operationName": "IntrospectionQuery", "query": FULL_INTROSPECTION_QUERY}
r = requests.post(GRAPHQL_URL, headers=headers, cookies=cookies, data=json.dumps(payload))
print("Status:", r.status_code)
try:
data = r.json()
print(json.dumps(data, indent=2, ensure_ascii=False)[:2000])
except Exception as e:
print("Could not decode response:", e)
print(r.text)
+41
View File
@@ -0,0 +1,41 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from playwright.sync_api import sync_playwright
import json, os, time
STATE_FILE = "../medevio_storage.json"
GRAPHQL_LOG = f"graphql_capture_{int(time.time())}.jsonl"
with sync_playwright() as pw:
browser = pw.chromium.launch(headless=False, slow_mo=200)
context = browser.new_context(storage_state=STATE_FILE)
page = context.new_page()
def log_graphql(req):
if "graphql" in req.url and req.method == "POST":
try:
body = req.post_data or ""
data = json.loads(body)
with open(GRAPHQL_LOG, "a", encoding="utf-8") as f:
f.write(json.dumps(data, ensure_ascii=False) + "\n")
print(f"📡 {data.get('operationName')} saved")
except Exception:
pass
page.on("request", log_graphql)
print("🔗 Opening Medevio main page...")
page.goto("https://my.medevio.cz/mudr-buzalkova/klinika/kalendar/agenda-dne/"
"?kalendar=144c4e12-347c-49ca-9ec0-8ca965a4470d", wait_until="networkidle")
print("\n👉 Click various items in Medevio (calendar, reservations, requests, etc.).")
print(" Every GraphQL call will be saved to", GRAPHQL_LOG)
print(" Press Ctrl+C or close the browser when done.\n")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
browser.close()
print(f"\n✅ Finished — GraphQL calls saved to {GRAPHQL_LOG}")
+100
View File
@@ -0,0 +1,100 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Query Medevio for the full agenda of 17 Oct 2025 and print raw API response.
"""
import json
import requests
GRAPHQL_URL = "https://api.medevio.cz/graphql"
CALENDAR_ID = "144c4e12-347c-49ca-9ec0-8ca965a4470d"
CLINIC_SLUG = "mudr-buzalkova"
def load_gateway_token(storage_path="medevio_storage.json"):
"""Return Medevio gateway-access-token from saved Playwright storage."""
import json
from pathlib import Path
path = Path(storage_path)
if not path.exists():
raise SystemExit(f"❌ Storage file not found: {path}")
with path.open("r", encoding="utf-8") as f:
state = json.load(f)
token = next(
(c["value"] for c in state["cookies"]
if c["name"] == "gateway-access-token"), None
)
if not token:
raise SystemExit("❌ gateway-access-token not found in storage file.")
return token
gateway_token = load_gateway_token()
headers = {
"content-type": "application/json",
"origin": "https://my.medevio.cz",
"referer": "https://my.medevio.cz/",
"authorization": f"Bearer {gateway_token}",
}
payload = {
"operationName": "ClinicAgenda_ListClinicReservations",
"variables": {
"calendarIds": [CALENDAR_ID],
"clinicSlug": CLINIC_SLUG,
"since": "2025-10-16T22:00:00.000Z",
"until": "2025-10-17T21:59:59.999Z",
"locale": "cs",
"emptyCalendarIds": False,
},
"query": """query ClinicAgenda_ListClinicReservations(
$calendarIds: [UUID!],
$clinicSlug: String!,
$locale: Locale!,
$since: DateTime!,
$until: DateTime!,
$emptyCalendarIds: Boolean!
) {
reservations: listClinicReservations(
clinicSlug: $clinicSlug,
calendarIds: $calendarIds,
since: $since,
until: $until
) @skip(if: $emptyCalendarIds) {
id
start
end
note
done
color
request {
id
displayTitle(locale: $locale)
extendedPatient {
name
surname
dob
insuranceCompanyObject { shortName }
}
}
}
}""",
}
print("📡 Querying Medevio API for agenda...")
r = requests.post(GRAPHQL_URL, headers=headers, data=json.dumps(payload))
print("Status:", r.status_code)
try:
data = r.json()
print(json.dumps(data, indent=2, ensure_ascii=False))
except Exception as e:
print("❌ Could not parse JSON:", e)
print(r.text)
+176
View File
@@ -0,0 +1,176 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Query Medevio for the full agenda of 17 Oct 2025,
print raw API response, and export to Excel.
"""
import json
import time
from pathlib import Path
import requests
import pandas as pd
from openpyxl import load_workbook
from openpyxl.styles import Font, Alignment
from openpyxl.utils import get_column_letter
GRAPHQL_URL = "https://api.medevio.cz/graphql"
CALENDAR_ID = "144c4e12-347c-49ca-9ec0-8ca965a4470d"
CLINIC_SLUG = "mudr-buzalkova"
# ==================== Load Token ====================
def load_gateway_token(storage_path="medevio_storage.json"):
"""Return Medevio gateway-access-token from saved Playwright storage."""
from pathlib import Path
path = Path(storage_path)
if not path.exists():
raise SystemExit(f"❌ Storage file not found: {path}")
with path.open("r", encoding="utf-8") as f:
state = json.load(f)
token = next(
(c["value"] for c in state["cookies"]
if c["name"] == "gateway-access-token"), None
)
if not token:
raise SystemExit("❌ gateway-access-token not found in storage file.")
return token
gateway_token = load_gateway_token()
headers = {
"content-type": "application/json",
"origin": "https://my.medevio.cz",
"referer": "https://my.medevio.cz/",
"authorization": f"Bearer {gateway_token}",
}
payload = {
"operationName": "ClinicAgenda_ListClinicReservations",
"variables": {
"calendarIds": [CALENDAR_ID],
"clinicSlug": CLINIC_SLUG,
"since": "2025-10-20T00:00:00.001Z",
"until": "2025-10-27T21:59:59.999Z",
"locale": "cs",
"emptyCalendarIds": False,
},
"query": """query ClinicAgenda_ListClinicReservations(
$calendarIds: [UUID!],
$clinicSlug: String!,
$locale: Locale!,
$since: DateTime!,
$until: DateTime!,
$emptyCalendarIds: Boolean!
) {
reservations: listClinicReservations(
clinicSlug: $clinicSlug,
calendarIds: $calendarIds,
since: $since,
until: $until
) @skip(if: $emptyCalendarIds) {
id
start
end
note
done
color
request {
id
displayTitle(locale: $locale)
extendedPatient {
name
surname
dob
insuranceCompanyObject { shortName }
}
}
}
}""",
}
# ==================== Query API ====================
print("📡 Querying Medevio API for agenda...")
r = requests.post(GRAPHQL_URL, headers=headers, data=json.dumps(payload))
print("Status:", r.status_code)
try:
data = r.json()
except Exception as e:
print("❌ Could not parse JSON:", e)
print(r.text)
raise SystemExit()
if "data" not in data or "reservations" not in data["data"]:
raise SystemExit("⚠️ No 'reservations' data found in response.")
reservations = data["data"]["reservations"]
from datetime import datetime
from dateutil import parser, tz
# ===== Process reservations into table =====
rows = []
for r in reservations:
req = r.get("request") or {}
patient = req.get("extendedPatient") or {}
insurance = patient.get("insuranceCompanyObject") or {}
# parse datetimes (convert to local time)
try:
start_dt = parser.isoparse(r.get("start")).astimezone(tz.gettz("Europe/Prague"))
end_dt = parser.isoparse(r.get("end")).astimezone(tz.gettz("Europe/Prague"))
except Exception:
start_dt = end_dt = None
date_str = start_dt.strftime("%Y-%m-%d") if start_dt else ""
time_interval = f"{start_dt.strftime('%H:%M')}-{end_dt.strftime('%H:%M')}" if start_dt and end_dt else ""
rows.append({
"Date": date_str,
"Time": time_interval,
"Title": req.get("displayTitle") or "",
"Patient": f"{patient.get('surname','')} {patient.get('name','')}".strip(),
"DOB": patient.get("dob") or "",
"Insurance": insurance.get("shortName") or "",
"Note": r.get("note") or "",
"Color": r.get("color") or "",
"Request_ID": req.get("id") or "",
"Reservation_ID": r.get("id"),
})
df = pd.DataFrame(rows).sort_values(["Date", "Time"])
# ===== Excel export =====
EXPORT_DIR = Path(r"C:\Users\vlado\PycharmProjects\Medevio\exports")
EXPORT_DIR.mkdir(exist_ok=True)
timestamp = time.strftime("%Y-%m-%d %H-%M-%S")
xlsx_path = EXPORT_DIR / f"Medevio_agenda_{timestamp}.xlsx"
# remove old files
for old in EXPORT_DIR.glob("Medevio_agenda_*.xlsx"):
try:
old.unlink()
except Exception:
pass
df.to_excel(xlsx_path, index=False)
wb = load_workbook(xlsx_path)
ws = wb.active
# style header
for col in range(1, len(df.columns) + 1):
c = ws.cell(row=1, column=col)
c.font = Font(bold=True)
c.alignment = Alignment(horizontal="center")
ws.column_dimensions[get_column_letter(col)].width = 20
ws.freeze_panes = "A2"
wb.save(xlsx_path)
print(f"📘 Exported clean agenda view to:\n{xlsx_path}")
+299
View File
@@ -0,0 +1,299 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Query Medevio for the full agenda of 17 Oct 2025,
print raw API response, and export to Excel.
"""
import re
import json
import time
from pathlib import Path
import requests
import pandas as pd
from openpyxl import load_workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
from openpyxl.utils import get_column_letter
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
from Functions import get_reports_folder
from openpyxl.utils.dataframe import dataframe_to_rows
GRAPHQL_URL = "https://api.medevio.cz/graphql"
CALENDAR_ID = "144c4e12-347c-49ca-9ec0-8ca965a4470d"
CLINIC_SLUG = "mudr-buzalkova"
# ==================== Load Token ====================
def load_gateway_token(storage_path="medevio_storage.json"):
"""Return Medevio gateway-access-token from saved Playwright storage."""
path = Path(storage_path)
if not path.exists():
raise SystemExit(f"❌ Storage file not found: {path}")
with path.open("r", encoding="utf-8") as f:
state = json.load(f)
token = next(
(c["value"] for c in state["cookies"]
if c["name"] == "gateway-access-token"), None
)
if not token:
raise SystemExit("❌ gateway-access-token not found in storage file.")
return token
gateway_token = load_gateway_token()
headers = {
"content-type": "application/json",
"origin": "https://my.medevio.cz",
"referer": "https://my.medevio.cz/",
"authorization": f"Bearer {gateway_token}",
}
# === Dynamic date range ===
dnes = datetime.utcnow().date()
since = datetime.combine(dnes, datetime.min.time()).replace(microsecond=1)
until = since + relativedelta(months=1) - timedelta(milliseconds=1)
since_iso = since.isoformat() + "Z"
until_iso = until.isoformat() + "Z"
payload = {
"operationName": "ClinicAgenda_ListClinicReservations",
"variables": {
"calendarIds": [CALENDAR_ID],
"clinicSlug": CLINIC_SLUG,
"since": since_iso,
"until": "2025-11-30T21:59:59.999Z",
"locale": "cs",
"emptyCalendarIds": False,
},
"query": """query ClinicAgenda_ListClinicReservations(
$calendarIds: [UUID!],
$clinicSlug: String!,
$locale: Locale!,
$since: DateTime!,
$until: DateTime!,
$emptyCalendarIds: Boolean!
) {
reservations: listClinicReservations(
clinicSlug: $clinicSlug,
calendarIds: $calendarIds,
since: $since,
until: $until
) @skip(if: $emptyCalendarIds) {
id
start
end
note
done
color
request {
id
displayTitle(locale: $locale)
extendedPatient {
name
surname
dob
insuranceCompanyObject { shortName }
}
}
}
}""",
}
print("since:", since_iso)
print("until:", until_iso)
# ==================== Query API ====================
print("📡 Querying Medevio API for agenda...")
r = requests.post(GRAPHQL_URL, headers=headers, data=json.dumps(payload))
print("Status:", r.status_code)
try:
data = r.json()
except Exception as e:
print("❌ Could not parse JSON:", e)
print(r.text)
raise SystemExit()
if "data" not in data or "reservations" not in data["data"]:
raise SystemExit("⚠️ No 'reservations' data found in response.")
reservations = data["data"]["reservations"]
from dateutil import parser, tz
# ===== Process reservations into table =====
rows = []
for r in reservations:
req = r.get("request") or {}
patient = req.get("extendedPatient") or {}
insurance = patient.get("insuranceCompanyObject") or {}
try:
start_dt = parser.isoparse(r.get("start")).astimezone(tz.gettz("Europe/Prague"))
end_dt = parser.isoparse(r.get("end")).astimezone(tz.gettz("Europe/Prague"))
except Exception:
start_dt = end_dt = None
date_str = start_dt.strftime("%Y-%m-%d") if start_dt else ""
time_interval = f"{start_dt.strftime('%H:%M')}-{end_dt.strftime('%H:%M')}" if start_dt and end_dt else ""
rows.append({
"Date": date_str,
"Time": time_interval,
"Title": req.get("displayTitle") or "",
"Patient": f"{patient.get('surname','')} {patient.get('name','')}".strip(),
"DOB": patient.get("dob") or "",
"Insurance": insurance.get("shortName") or "",
"Note": r.get("note") or "",
"Color": r.get("color") or "",
"Request_ID": req.get("id") or "",
"Reservation_ID": r.get("id"),
})
df = pd.DataFrame(rows).sort_values(["Date", "Time"])
def kw_pattern(kw: str) -> str:
"""
Match the exact phrase kw (case-insensitive),
not as part of a '+something' continuation.
Examples:
'žloutenka a' ✅ matches '… žloutenka a …'
❌ NOT '… žloutenka a+b …'
'žloutenka a+b' ✅ matches exactly that phrase
"""
# start boundary: not preceded by a word char
# end guard: not followed by optional spaces + '+' + word
return rf"(?<!\w){re.escape(kw)}(?!\s*\+\s*\w)"
# ===== Excel export =====
EXPORT_DIR = Path(r"u:\Dropbox\Ordinace\Reporty")
EXPORT_DIR.mkdir(exist_ok=True, parents=True)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
xlsx_path = EXPORT_DIR / f"{timestamp} Agenda (30 dní dopředu).xlsx"
# Safely delete older Agenda reports
for old in EXPORT_DIR.glob("*Agenda (30 dní dopředu).xlsx"):
try:
old.unlink()
except PermissionError:
print(f"⚠️ File is open, skipping delete: {old}")
except Exception as e:
print(f"⚠️ Could not delete {old}: {e}")
# Export DataFrame
df.to_excel(xlsx_path, index=False)
wb = load_workbook(xlsx_path)
ws = wb.active
ws.title = "Agenda" # ✅ rename sheet
# === Apply styling and custom column widths ===
widths = {
1: 11, # Date
2: 13, # Time
3: 45, # Title
4: 30, # Patient
5: 15, # DOB
6: 15, # Insurance
7: 30, # Note
8: 15, # Color
9: 37, # Request_ID
10: 37 # Reservation_ID
}
# Define styles
header_fill = PatternFill("solid", fgColor="FFFF00") # real yellow
alt_fill = PatternFill("solid", fgColor="F2F2F2") # light grey alternate rows
thin_border = Border(
left=Side(style="thin", color="000000"),
right=Side(style="thin", color="000000"),
top=Side(style="thin", color="000000"),
bottom=Side(style="thin", color="000000")
)
# === Format header ===
for col_idx in range(1, len(df.columns) + 1):
col_letter = get_column_letter(col_idx)
cell = ws.cell(row=1, column=col_idx)
cell.font = Font(bold=True)
cell.alignment = Alignment(horizontal="center", vertical="center")
cell.fill = header_fill
cell.value = str(cell.value).upper()
cell.border = thin_border
ws.column_dimensions[col_letter].width = widths.get(col_idx, 20)
# === Format data rows ===
for r_idx, row in enumerate(ws.iter_rows(min_row=2, max_row=ws.max_row, max_col=ws.max_column), start=2):
for cell in row:
cell.border = thin_border
if r_idx % 2 == 0: # alternate row background
cell.fill = alt_fill
ws.freeze_panes = "A2"
from openpyxl.utils.dataframe import dataframe_to_rows
# === Vaccine sheet configuration ===
VACCINE_SHEETS = {
"Chřipka": ["očkování", "chřipka"],
"COVID": ["očkování", "covid"],
"Pneumokok": ["očkování", "pneumo"],
"Hep A": ["očkování", "žloutenka a"],
"Hep B": ["očkování", "žloutenka b"],
"Hep A+B": ["očkování", "žloutenka a+b"],
"Klíšťovka": ["očkování", "klíšť"]
}
# === Generate sheets based on keyword combinations ===
for sheet_name, keywords in VACCINE_SHEETS.items():
mask = pd.Series(True, index=df.index)
title_series = df["Title"].fillna("")
for kw in keywords:
pattern = kw_pattern(kw)
mask &= title_series.str.contains(pattern, flags=re.IGNORECASE, regex=True)
filtered_df = df[mask].copy()
if filtered_df.empty:
print(f"️ No matches for sheet '{sheet_name}' ({' AND '.join(keywords)})")
continue
ws_new = wb.create_sheet(title=sheet_name)
for r in dataframe_to_rows(filtered_df, index=False, header=True):
ws_new.append(r)
# === Apply formatting ===
for col_idx in range(1, len(filtered_df.columns) + 1):
col_letter = get_column_letter(col_idx)
c = ws_new.cell(row=1, column=col_idx)
c.font = Font(bold=True)
c.alignment = Alignment(horizontal="center", vertical="center")
c.fill = PatternFill("solid", fgColor="FFFF00") # bright yellow header
c.value = str(c.value).upper()
c.border = thin_border
ws_new.column_dimensions[col_letter].width = widths.get(col_idx, 20)
# Borders + alternating rows
for r_idx, row in enumerate(ws_new.iter_rows(min_row=2, max_row=ws_new.max_row, max_col=ws_new.max_column), start=2):
for cell in row:
cell.border = thin_border
if r_idx % 2 == 0:
cell.fill = PatternFill("solid", fgColor="F2F2F2")
ws_new.freeze_panes = "A2"
print(f"🟡 Created sheet '{sheet_name}' with {len(filtered_df)} rows ({' AND '.join(keywords)})")
wb.save(xlsx_path)
print(f"📘 Exported clean agenda view to:\n{xlsx_path}")
@@ -0,0 +1,117 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Read OPEN Medevio requests (požadavky) from local MySQL table `pozadavky`
and export to Excel in the same visual format as Agenda.
"""
import pymysql
import pandas as pd
from datetime import datetime
from pathlib import Path
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
from openpyxl.utils import get_column_letter
from openpyxl.utils.dataframe import dataframe_to_rows
# ==============================
# 🔧 CONFIGURATION
# ==============================
DB_CONFIG = {
"host": "192.168.1.76",
"port": 3307,
"user": "root",
"password": "Vlado9674+",
"database": "medevio",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor,
}
EXPORT_DIR = Path(r"u:\Dropbox\Ordinace\Reporty")
EXPORT_DIR.mkdir(parents=True, exist_ok=True)
xlsx_path = EXPORT_DIR / f"{datetime.now():%Y-%m-%d_%H-%M-%S} Otevřené požadavky.xlsx"
# ==============================
# 📡 LOAD DATA
# ==============================
print("📡 Fetching open requests from MySQL...")
conn = pymysql.connect(**DB_CONFIG)
with conn.cursor() as cur:
cur.execute("""
SELECT id AS Request_ID,
displayTitle AS Title,
pacient_prijmeni AS Pacient_Prijmeni,
pacient_jmeno AS Pacient_Jmeno,
pacient_rodnecislo AS RodneCislo,
createdAt AS Created,
updatedAt AS Updated,
doneAt AS Done,
removedAt AS Removed
FROM pozadavky
WHERE doneAt IS NULL AND removedAt IS NULL
ORDER BY createdAt DESC
""")
rows = cur.fetchall()
conn.close()
if not rows:
print("⚠️ No open requests found.")
raise SystemExit()
df = pd.DataFrame(rows)
print(f"✅ Loaded {len(df)} open requests.")
# ==============================
# 🧩 CLEAN + PREPARE
# ==============================
df["Patient"] = (df["Pacient_Prijmeni"].fillna("") + " " + df["Pacient_Jmeno"].fillna("")).str.strip()
df = df.rename(columns={
"RodneCislo": "Rodné číslo",
"Request_ID": "Request ID",
})
df = df[["Created", "Title", "Patient", "Rodné číslo", "Request ID", "Updated"]]
# ==============================
# 🧾 EXPORT TO EXCEL
# ==============================
wb = Workbook()
ws = wb.active
ws.title = "Otevřené požadavky"
# === Styles ===
header_fill = PatternFill("solid", fgColor="00FF99")
alt_fill = PatternFill("solid", fgColor="F2FFF2")
thin_border = Border(
left=Side(style="thin", color="000000"),
right=Side(style="thin", color="000000"),
top=Side(style="thin", color="000000"),
bottom=Side(style="thin", color="000000")
)
# === Write DataFrame ===
for r_idx, row in enumerate(dataframe_to_rows(df, index=False, header=True), start=1):
ws.append(row)
# === Header styling ===
for col_idx in range(1, len(df.columns) + 1):
c = ws.cell(row=1, column=col_idx)
c.font = Font(bold=True)
c.alignment = Alignment(horizontal="center", vertical="center")
c.fill = header_fill
c.border = thin_border
ws.column_dimensions[get_column_letter(col_idx)].width = 25
# === Data styling ===
for r_idx, row in enumerate(ws.iter_rows(min_row=2, max_row=ws.max_row, max_col=ws.max_column), start=2):
for cell in row:
cell.border = thin_border
if r_idx % 2 == 0:
cell.fill = alt_fill
ws.freeze_panes = "A2"
wb.save(xlsx_path)
print(f"📘 Exported {len(df)} open requests → {xlsx_path}")
+98
View File
@@ -0,0 +1,98 @@
# print_patients_first_page_ids.py
from pathlib import Path
import json, time, sys
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
STATE_FILE = r"../medevio_storage.json"
PATIENTS_URL = "https://my.medevio.cz/mudr-buzalkova/klinika/pacienti"
def harvest_ids_on_page(page):
ids = set()
for sel in ["div[role='row'][data-id]", "div.MuiDataGrid-row[data-id]"]:
for row in page.locator(sel).all():
pid = row.get_attribute("data-id")
if pid:
ids.add(pid)
return ids
def set_page_size(page, value="100"):
# Open the page-size combobox
for loc in [
page.get_by_role("combobox", name="Řádků na stránce:"),
page.get_by_role("combobox", name="Rows per page:"),
page.locator("div.MuiTablePagination-root [role='combobox']"),
]:
if loc.count():
loc.first.click()
break
# Select option "100" (portal-safe)
opt = page.get_by_role("option", name=value)
if not opt.count():
opt = page.locator(f"//li[normalize-space(.)='{value}']")
opt.first.wait_for(state="visible", timeout=5000)
opt.first.click()
# Wait a moment for refresh
try:
page.wait_for_selector("div[role='row'][data-id]", timeout=10000)
except PWTimeout:
time.sleep(0.8)
def main():
sf = Path(STATE_FILE)
if not sf.exists():
print(f"ERROR: storage not found: {sf}")
sys.exit(1)
with sync_playwright() as p:
browser = p.chromium.launch(headless=True) # set False to watch
context = browser.new_context(storage_state=str(sf))
context.set_default_navigation_timeout(30000)
context.set_default_timeout(15000)
page = context.new_page()
try:
page.goto(PATIENTS_URL, wait_until="domcontentloaded")
except PWTimeout:
print("Warning: goto timeout; continuing…")
# Detect redirect to login
if "/prihlaseni" in page.url.lower():
print("You were redirected to the login page → saved session is expired. Re-run the login-save step.")
browser.close()
return
# (Optional) print pagination label before/after
try:
print("Before:", page.locator("p.MuiTablePagination-displayedRows").first.inner_text())
except Exception:
pass
try:
set_page_size(page, "100")
except Exception as e:
print(f"Could not set page size to 100: {e!r}")
try:
print("After :", page.locator("p.MuiTablePagination-displayedRows").first.inner_text())
except Exception:
pass
page.wait_for_selector("div[role='row'][data-id]", timeout=15000)
ids = sorted(harvest_ids_on_page(page))
print(f"\nCollected {len(ids)} IDs on first page:")
for pid in ids:
print(pid)
# Also save if you want
out_json = Path("patient_ids_first_page.json")
out_csv = Path("patient_ids_first_page.csv")
out_json.write_text(json.dumps(ids, ensure_ascii=False, indent=2), encoding="utf-8")
out_csv.write_text("patient_id\n" + "\n".join(ids), encoding="utf-8")
print(f"\nSaved → {out_json.resolve()}")
print(f"Saved → {out_csv.resolve()}")
browser.close()
if __name__ == "__main__":
main()
+98
View File
@@ -0,0 +1,98 @@
# print_patients_first_page_ids.py
from pathlib import Path
import json, time, sys
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
STATE_FILE = r"../medevio_storage.json"
PATIENTS_URL = "https://my.medevio.cz/mudr-buzalkova/klinika/pacienti"
def harvest_ids_on_page(page):
ids = set()
for sel in ["div[role='row'][data-id]", "div.MuiDataGrid-row[data-id]"]:
for row in page.locator(sel).all():
pid = row.get_attribute("data-id")
if pid:
ids.add(pid)
return ids
def set_page_size(page, value="100"):
# Open the page-size combobox
for loc in [
page.get_by_role("combobox", name="Řádků na stránce:"),
page.get_by_role("combobox", name="Rows per page:"),
page.locator("div.MuiTablePagination-root [role='combobox']"),
]:
if loc.count():
loc.first.click()
break
# Select option "100" (portal-safe)
opt = page.get_by_role("option", name=value)
if not opt.count():
opt = page.locator(f"//li[normalize-space(.)='{value}']")
opt.first.wait_for(state="visible", timeout=5000)
opt.first.click()
# Wait a moment for refresh
try:
page.wait_for_selector("div[role='row'][data-id]", timeout=10000)
except PWTimeout:
time.sleep(0.8)
def main():
sf = Path(STATE_FILE)
if not sf.exists():
print(f"ERROR: storage not found: {sf}")
sys.exit(1)
with sync_playwright() as p:
browser = p.chromium.launch(headless=False) # set False to watch
context = browser.new_context(storage_state=str(sf))
context.set_default_navigation_timeout(30000)
context.set_default_timeout(15000)
page = context.new_page()
try:
page.goto(PATIENTS_URL, wait_until="domcontentloaded")
except PWTimeout:
print("Warning: goto timeout; continuing…")
# Detect redirect to login
if "/prihlaseni" in page.url.lower():
print("You were redirected to the login page → saved session is expired. Re-run the login-save step.")
browser.close()
return
# (Optional) print pagination label before/after
try:
print("Before:", page.locator("p.MuiTablePagination-displayedRows").first.inner_text())
except Exception:
pass
try:
set_page_size(page, "100")
except Exception as e:
print(f"Could not set page size to 100: {e!r}")
try:
print("After :", page.locator("p.MuiTablePagination-displayedRows").first.inner_text())
except Exception:
pass
page.wait_for_selector("div[role='row'][data-id]", timeout=15000)
ids = sorted(harvest_ids_on_page(page))
print(f"\nCollected {len(ids)} IDs on first page:")
for pid in ids:
print(pid)
# Also save if you want
out_json = Path("patient_ids_first_page.json")
out_csv = Path("patient_ids_first_page.csv")
out_json.write_text(json.dumps(ids, ensure_ascii=False, indent=2), encoding="utf-8")
out_csv.write_text("patient_id\n" + "\n".join(ids), encoding="utf-8")
print(f"\nSaved → {out_json.resolve()}")
print(f"Saved → {out_csv.resolve()}")
browser.close()
if __name__ == "__main__":
main()
+249
View File
@@ -0,0 +1,249 @@
# extract_patient_detail.py
# Usage:
# 1) Put your medevio_storage.json path into STATE_FILE.
# 2) Set PATIENT_ID to a real UUID from your list.
# 3) Run: python extract_patient_detail.py
#
# Output: prints a dict to console and saves patient_<ID>.json next to the script.
from pathlib import Path
import json, sys, time, re
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
STATE_FILE = r"../medevio_storage.json"
BASE_URL = "https://my.medevio.cz/mudr-buzalkova/klinika/pacienti"
PATIENT_ID = "fcb2414b-067b-4ca2-91b2-6c36a86d4cbb" # <-- put target ID here
# ---------- helpers ----------
def wait_for_grid(page, timeout=15000):
try:
page.wait_for_selector("div[role='rowgroup']", timeout=timeout)
except PWTimeout:
pass
page.wait_for_selector("div[role='row'][data-id]", timeout=timeout)
def open_detail_via_query(page, patient_id):
# Try opening page with ?pacient=... (SPA should open drawer/detail)
target = f"{BASE_URL}?pacient={patient_id}"
page.goto(target, wait_until="domcontentloaded")
# Wait briefly for the drawer/dialog to render
if not wait_for_detail_open(page, quick=True):
# Some apps need a tiny delay to mount the panel
time.sleep(0.8)
return is_detail_open(page)
def is_detail_open(page):
# Look for a dialog/drawer that likely contains patient detail.
# Typical MUI patterns: role="dialog", or an aside/div with aria-modal etc.
selectors = [
"[role='dialog']",
"div.MuiDrawer-paper",
"div.MuiModal-root [role='dialog']",
"div[aria-modal='true']",
]
for sel in selectors:
loc = page.locator(sel)
if loc.count() and loc.first.is_visible():
return True
return False
def wait_for_detail_open(page, quick=False):
timeout = 4000 if quick else 15000
selectors = [
"[role='dialog']",
"div.MuiDrawer-paper",
"div.MuiModal-root [role='dialog']",
"div[aria-modal='true']",
]
for sel in selectors:
try:
page.wait_for_selector(sel, timeout=timeout, state="visible")
return True
except PWTimeout:
continue
return False
def open_detail_by_click(page, patient_id):
# Click the row with matching data-id (fallback)
wait_for_grid(page, timeout=15000)
row = page.locator(f"div[role='row'][data-id='{patient_id}']").first
if not row.count():
return False
row.click()
return wait_for_detail_open(page)
def find_detail_root(page):
# Return the locator that represents the open detail container
for sel in ["[role='dialog']", "div.MuiDrawer-paper", "div[aria-modal='true']"]:
loc = page.locator(sel)
if loc.count() and loc.first.is_visible():
return loc.first
# Fallback to the last visible modal-ish container
return page.locator("div.MuiModal-root, div.MuiDrawer-paper").last
def extract_text(el):
try:
return el.inner_text().strip()
except Exception:
return ""
def extract_field_by_label(root, label_texts):
"""
Try to find a field value by its label text (CZ/EN variants).
Looks for elements containing the label and then a sibling/value element.
"""
labels_xpath = " | ".join([f".//*[normalize-space()='{t}']" for t in label_texts])
loc = root.locator(f"xpath=({labels_xpath})")
if not loc.count():
# Try contains(label)
labels_xpath2 = " | ".join([f".//*[contains(normalize-space(), '{t}')]" for t in label_texts])
loc = root.locator(f"xpath=({labels_xpath2})")
if not loc.count():
return None
candidate = loc.first
# Value might be in parent/next sibling
parent = candidate.locator("xpath=..")
siblings = [
parent.locator("xpath=following-sibling::*[1]"),
candidate.locator("xpath=following-sibling::*[1]"),
parent.locator(".//*[(self::span or self::div) and string-length(normalize-space())>0]"),
]
for s in siblings:
if s.count():
text = extract_text(s.first)
# Clean common label-value formatting like "E-mail\nx@y.cz"
if text:
# If the label text is included, strip it
for t in label_texts:
text = re.sub(rf"^{re.escape(t)}\s*[:]?\s*", "", text, flags=re.I)
text = re.sub(r"\s+\n\s+", "", text).strip()
return text
# As a last fallback, try reading the parent block's text minus the label
block_text = extract_text(parent)
if block_text:
for t in label_texts:
block_text = re.sub(rf"{re.escape(t)}\s*[:]?\s*", "", block_text, flags=re.I)
return block_text.strip()
return None
def extract_all_text_pairs(root):
"""
Generic key-value sweep for components that render details as 2-column grids.
Returns a dict of guessed label->value pairs.
"""
result = {}
# Try common MUI grid/list patterns
blocks = root.locator("div.MuiGrid-container, dl, ul.MuiList-root")
for i in range(min(20, blocks.count())):
block = blocks.nth(i)
text = extract_text(block)
if not text:
continue
# naive split by newlines, pair neighbors "Label\nValue"
parts = [t.strip() for t in text.splitlines() if t.strip()]
for j in range(len(parts) - 1):
label, value = parts[j], parts[j+1]
# Heuristic: labels usually short, values not identical, ignore obvious noise
if len(label) <= 32 and label != value and ":" not in value:
if label not in result:
result[label] = value
return result
def extract_patient_detail(page, patient_id):
root = find_detail_root(page)
if not root:
return {"id": patient_id, "error": "detail_not_found"}
# Try to get a headline with the name
name = None
for sel in ["h1", "h2", "h3", "header h2", "[data-testid='PatientName']"]:
loc = root.locator(sel)
if loc.count():
nm = extract_text(loc.first)
if nm and len(nm) > 1:
name = nm
break
# Targeted fields (CZ + EN aliases)
fields = {
"Datum narození / Born": extract_field_by_label(root, ["Datum narození", "Datum nar.", "Date of birth", "Born"]),
"Rodné číslo": extract_field_by_label(root, ["Rodné číslo", "", "Personal ID"]),
"Telefon": extract_field_by_label(root, ["Telefon", "Tel.", "Phone", "Mobile"]),
"E-mail": extract_field_by_label(root, ["E-mail", "Email", "E-mail"]),
"Zdravotní pojišťovna": extract_field_by_label(root, ["Pojišťovna", "Zdravotní pojišťovna", "Insurer", "Insurance"]),
"Adresa": extract_field_by_label(root, ["Adresa", "Address"]),
"Poznámka": extract_field_by_label(root, ["Poznámka", "Note", "Notes"]),
"Pohlaví": extract_field_by_label(root, ["Pohlaví", "Gender", "Sex"]),
"Praktický lékař": extract_field_by_label(root, ["Praktický lékař", "GP", "General practitioner"]),
}
# Sweep for any extra key→value pairs we didnt explicitly target
extras = extract_all_text_pairs(root)
# Merge non-empty fields
data = {"id": patient_id}
if name: data["name"] = name
for k, v in fields.items():
if v and v.strip():
data[k] = v.strip()
# Add extras that aren't already present
for k, v in extras.items():
if k not in data and v and v.strip():
data[k] = v.strip()
return data
# ---------- main ----------
def main():
if not PATIENT_ID or len(PATIENT_ID) < 8:
print("Set PATIENT_ID to a valid patient UUID.")
sys.exit(1)
sf = Path(STATE_FILE)
if not sf.exists():
print(f"Storage file not found: {sf}")
sys.exit(1)
with sync_playwright() as p:
browser = p.chromium.launch(headless=False) # set False to watch
context = browser.new_context(storage_state=str(sf))
context.set_default_navigation_timeout(30000)
context.set_default_timeout(15000)
page = context.new_page()
# Try via query param first
opened = open_detail_via_query(page, PATIENT_ID)
# If not opened, go to base list and click the row
if not opened:
# Ensure the base grid exists
page.goto(BASE_URL, wait_until="domcontentloaded")
if "/prihlaseni" in page.url.lower():
print("Redirected to login — refresh your medevio_storage.json.")
browser.close()
return
if not open_detail_by_click(page, PATIENT_ID):
print("Could not open detail panel (neither via query nor by clicking).")
browser.close()
return
# At this point, detail should be open
data = extract_patient_detail(page, PATIENT_ID)
print("\n=== Patient detail ===")
print(json.dumps(data, ensure_ascii=False, indent=2))
out = Path(f"patient_{PATIENT_ID}.json")
out.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"\nSaved → {out.resolve()}")
browser.close()
if __name__ == "__main__":
main()
+111
View File
@@ -0,0 +1,111 @@
from playwright.sync_api import sync_playwright
import mysql.connector
import time
MYSQL_CFG = dict(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
)
# --- load 3 patients from DB ---
conn = mysql.connector.connect(**MYSQL_CFG)
with conn.cursor() as cur:
cur.execute("""
SELECT rid, prijmeni, jmeno, rc
FROM patients_extracted
WHERE prijmeni IS NOT NULL and mamedevioucet is null
ORDER BY prijmeni ASC
LIMIT 3
""")
rows = cur.fetchall()
if not rows:
raise RuntimeError("No entries found in patients_extracted")
STATE_FILE = r"../medevio_storage.json"
BASE_URL = "https://my.medevio.cz/mudr-buzalkova/klinika/pacienti"
NOT_FOUND_SEL = "div[role='alert']:has-text('Pacient nebyl nalezen'), div:has-text('Pacient nebyl nalezen')"
DIALOG_SEL = "[role='dialog'], div.MuiDrawer-paper, div[aria-modal='true']"
def close_dialog_if_open(page):
dlg = page.locator(DIALOG_SEL)
try:
if dlg.count():
# Try a close button; if not, press Escape
try:
dlg.locator("button:has-text('Zavřít'), [aria-label='Zavřít'], [aria-label='Close'], [data-testid='CloseIcon']").first.click(timeout=1000)
except:
page.keyboard.press("Escape")
page.wait_for_selector(DIALOG_SEL, state="detached", timeout=1500)
except:
pass # best-effort close
def main():
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
try:
context = browser.new_context(storage_state=STATE_FILE)
page = context.new_page()
for rid, surname, name, rc in rows:
# 0) close any previous dialog to avoid stale matches
close_dialog_if_open(page)
target_url = f"{BASE_URL}?pacient={rid}"
page.goto(target_url, wait_until="domcontentloaded")
page.wait_for_load_state("networkidle")
# 1) Not-found toast?
try:
page.wait_for_selector(NOT_FOUND_SEL, timeout=3000)
print(f"{surname} {name} {rc} ⚠️ pacient s RID {rid} nebyl nalezen, přeskočeno")
# (optional) set mamedevioucet=NULL for this rid here
continue
except:
pass
# 2) Detail panel
try:
page.wait_for_selector(DIALOG_SEL, timeout=6000)
except:
print(f"⚠️ {surname} {name} {rc}: detailový panel se nenačetl, přeskočeno")
continue
# 3) Verify dialog belongs to current patient (avoid stale dialog)
detail = page.locator(DIALOG_SEL).first
detail_text = detail.inner_text()
if (surname not in detail_text) and (rc not in detail_text):
# Still looks wrong; give UI a moment and re-check once
page.wait_for_timeout(500)
detail_text = detail.inner_text()
if (surname not in detail_text) and (rc not in detail_text):
print(f"⚠️ {surname} {name} {rc}: detail neodpovídá (stará karta?), přeskočeno")
continue
# 4) Check Medevio account text
if "zatím nemá Medevio účet" in detail_text:
has_account = 0
print(f"{surname} {name} {rc} ❌ zatím nemá Medevio účet")
else:
has_account = 1
print(f"{surname} {name} {rc} ✅ má Medevio účet")
# Update DB by RID (or swap to rc if you prefer)
with conn.cursor() as c:
c.execute(
"UPDATE patients_extracted SET mamedevioucet = %s WHERE rid = %s",
(has_account, rid),
)
conn.commit()
time.sleep(0.5) # gentle pacing
finally:
browser.close()
if __name__ == "__main__":
main()
+101
View File
@@ -0,0 +1,101 @@
import time
from playwright.sync_api import sync_playwright
import mysql.connector
MYSQL_CFG = dict(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
)
# --- load 3 patients from DB ---
conn = mysql.connector.connect(**MYSQL_CFG)
with conn.cursor() as cur:
cur.execute("""
SELECT rid, prijmeni, jmeno, rc
FROM patients_extracted
WHERE prijmeni IS NOT NULL and mamedevioucet is null
ORDER BY prijmeni ASC
LIMIT 300
""")
rows = cur.fetchall()
if not rows:
raise RuntimeError("No entries found in patients_extracted")
STATE_FILE = r"../medevio_storage.json"
BASE_URL = "https://my.medevio.cz/mudr-buzalkova/klinika/pacienti"
NOT_FOUND_SEL = "div[role='alert']:has-text('Pacient nebyl nalezen'), div:has-text('Pacient nebyl nalezen')"
DIALOG_SEL = "[role='dialog'], div.MuiDrawer-paper, div[aria-modal='true']"
def close_dialog_if_open(page):
dlg = page.locator(DIALOG_SEL)
try:
if dlg.count():
# Try a close button; if not, press Escape
try:
dlg.locator("button:has-text('Zavřít'), [aria-label='Zavřít'], [aria-label='Close'], [data-testid='CloseIcon']").first.click(timeout=1000)
except:
page.keyboard.press("Escape")
page.wait_for_selector(DIALOG_SEL, state="detached", timeout=1500)
except:
pass # best-effort close
def main():
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(storage_state=STATE_FILE)
page = context.new_page()
for rid, surname, name, rc in rows:
print(f"\nProcessing {surname} {name} {rc}")
# 1️⃣ Navigation time
t0 = time.perf_counter()
page.goto(f"{BASE_URL}?pacient={rid}", wait_until="domcontentloaded")
# page.wait_for_load_state("networkidle")
t_nav = time.perf_counter() - t0
print(f" ⏱️ page.goto + networkidle: {t_nav:.2f}s")
# 2️⃣ Toast / dialog detection
t1 = time.perf_counter()
not_found = False
try:
page.wait_for_selector(NOT_FOUND_SEL, timeout=2500)
not_found = True
except:
pass
if not_found:
print(f" ⚠️ not-found toast detected after {time.perf_counter() - t1:.2f}s")
continue
try:
page.wait_for_selector(DIALOG_SEL, timeout=8000)
except:
print(f" ⚠️ dialog not found (waited {time.perf_counter() - t1:.2f}s)")
continue
t_dialog = time.perf_counter() - t1
print(f" ⏱️ toast/dialog detection: {t_dialog:.2f}s")
# 3️⃣ Account check + DB update
t2 = time.perf_counter()
text = page.locator(DIALOG_SEL).first.inner_text()
has_account = 0 if "zatím nemá Medevio účet" in text else 1
with conn.cursor() as c:
c.execute("UPDATE patients_extracted SET mamedevioucet=%s WHERE rid=%s",
(has_account, rid))
conn.commit()
t_db = time.perf_counter() - t2
print(f" ⏱️ DB update & text parse: {t_db:.2f}s")
# 4️⃣ Optional pacing
t3 = time.perf_counter()
# time.sleep(0.5)
print(f" ⏱️ explicit sleep: {time.perf_counter() - t3:.2f}s")
browser.close()
if __name__ == "__main__":
main()
+177
View File
@@ -0,0 +1,177 @@
#Tento kod se pripoji do kartoteky Medevio, zmeni na 100 pacientu na stranu, nactene
# medevio_dump_patients_html_to_mysql.py
import time
import json
from pathlib import Path
from datetime import datetime
from typing import Set
import mysql.connector
from mysql.connector import errorcode
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
# ---------- CONFIG ----------
STATE_FILE = r"../medevio_storage.json"
BASE_LIST_URL = "https://my.medevio.cz/mudr-buzalkova/klinika/pacienti"
SAVE_DELAY_SECONDS = 10 # throttle: 10 sec per patient
# MySQL connection settings (fill in)
MYSQL_CFG = dict(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
)
# ---------- DB helpers ----------
def db_connect():
try:
conn = mysql.connector.connect(**MYSQL_CFG)
return conn
except mysql.connector.Error as e:
raise SystemExit(f"MySQL connection failed: {e}")
# ---------- Playwright helpers ----------
def wait_for_grid_ready(page):
# grid present & at least one row (be generous on timeout)
page.wait_for_selector("div[role='rowgroup']", timeout=20000)
page.wait_for_selector("div[role='row'][data-id]", timeout=20000)
def set_page_size_100(page): #zde se nastavuje hodnota pacientu na stranu na 100, toto je jedno volani
# Click the page-size combobox (CZ/EN + generic)
for loc in [
page.get_by_role("combobox", name="Řádků na stránce:"),
page.get_by_role("combobox", name="Rows per page:"),
page.locator("div.MuiTablePagination-root [role='combobox']"),
]:
if loc.count():
loc.first.click()
break
# Select 100 (MUI menu often renders in a portal)
opt = page.get_by_role("option", name="100")
if not opt.count():
opt = page.locator("//li[normalize-space(.)='100']")
opt.first.wait_for(state="visible", timeout=5000)
opt.first.click()
# Wait for rows to refresh
try:
page.wait_for_selector("div[role='row'][data-id]", timeout=10000)
except PWTimeout:
time.sleep(0.8)
def click_next_page(page) -> bool: #toto je kliknuti, aby se nacetla dalsi stranka se 100 zaznamy
# Prefer ARIA label
nxt = page.get_by_role("button", name="Go to next page")
if nxt.count():
try:
if nxt.first.is_enabled():
nxt.first.click()
return True
except Exception:
pass
# Fallback (CZ)
nxt2 = page.get_by_role("button", name="Další")
if nxt2.count():
try:
if nxt2.first.is_enabled():
nxt2.first.click()
return True
except Exception:
pass
return False
# ---------- Main workflow ----------
def save_all_patient_htmls(conn,context,next_round): #toto ulozi do mysql vsechny html stranky z kartoteky, takze cca 19
page = context.new_page()
page.set_default_timeout(15000)
page.set_default_navigation_timeout(30000)
# Use domcontentloaded (SPAs often keep network busy)
page.goto(BASE_LIST_URL, wait_until="domcontentloaded")
if "/prihlaseni" in page.url.lower():
raise SystemExit("Session expired → refresh medevio_storage.json via the login script.")
wait_for_grid_ready(page)
# optional: print label like "125 z 1856"
try:
label = page.locator("p.MuiTablePagination-displayedRows").first.inner_text()
print("Pagination label BEFORE:", label)
except Exception:
pass
# Set 100/page
try:
set_page_size_100(page)
try:
label = page.locator("p.MuiTablePagination-displayedRows").first.inner_text()
print("Pagination label AFTER :", label)
except Exception:
pass
except Exception as e:
print(f"Warning: could not set page size to 100: {e!r}")
page_index = 1
while True:
wait_for_grid_ready(page)
#here I need code to save page into kartoteka_html
cur = conn.cursor()
cur.execute(
f"""INSERT INTO kartoteka_html (html,round)
VALUES (%s,%s)""",
(page.content(),next_round),
)
conn.commit()
cur.close()
print(f"DB saved page index {page_index}")
# Try to go next; if cannot, break
if not click_next_page(page):
break
# Wait for DOM to actually update (new rows)
try:
page.wait_for_load_state("domcontentloaded", timeout=10000)
except PWTimeout:
pass
time.sleep(0.5)
page_index += 1
page.close()
print(f"Total pages colleceted collected: {page_index}")
return
def main():
# Check storage exists
if not Path(STATE_FILE).exists():
raise SystemExit(f"Storage not found: {STATE_FILE}")
# DB ready
conn = db_connect()
#vymazat vsechny zaznamy z kartoteka_html, ktere nemaji hodnotu round
cur=conn.cursor()
cur.execute("delete from kartoteka_html where round=0")
conn.commit()
with conn.cursor() as cur:
cur.execute("SELECT MAX(`round`) AS max_round FROM kartoteka_html")
result = cur.fetchone()
# If table empty, use 0 as fallback
next_round = (result[0] or 0) + 1
print("Next round will be:", next_round)
with sync_playwright() as p:
browser = p.chromium.launch(headless=False) # set False to watch
context = browser.new_context(storage_state=STATE_FILE)
save_all_patient_htmls(conn, context,next_round)
browser.close()
conn.close()
print("Done.")
if __name__ == "__main__":
main()
+262
View File
@@ -0,0 +1,262 @@
#Tento kod se pripoji do kartoteky Medevio, zmeni na 100 pacientu na stranu, nactene
# medevio_dump_patients_html_to_mysql.py
import time
import json
from pathlib import Path
from datetime import datetime
from typing import Set
import mysql.connector
from mysql.connector import errorcode
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
# ---------- CONFIG ----------
STATE_FILE = r"../medevio_storage.json"
BASE_LIST_URL = "https://my.medevio.cz/mudr-buzalkova/klinika/pacienti"
SAVE_DELAY_SECONDS = 10 # throttle: 10 sec per patient
# MySQL connection settings (fill in)
MYSQL_CFG = dict(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
)
TABLE_NAME = "patients_html" # schema created automatically
# ---------- DB helpers ----------
def db_connect():
try:
conn = mysql.connector.connect(**MYSQL_CFG)
return conn
except mysql.connector.Error as e:
raise SystemExit(f"MySQL connection failed: {e}")
def db_ensure_table(conn):
ddl = f"""
CREATE TABLE IF NOT EXISTS `{TABLE_NAME}` (
patient_id VARCHAR(64) PRIMARY KEY,
html LONGTEXT NOT NULL,
fetched_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
"""
cur = conn.cursor()
cur.execute(ddl)
conn.commit()
cur.close()
def db_existing_ids(conn) -> Set[str]:
ids = set()
cur = conn.cursor()
cur.execute(f"SELECT patient_id FROM `{TABLE_NAME}`")
for (pid,) in cur.fetchall():
ids.add(pid)
cur.close()
return ids
def db_upsert_html(conn, patient_id: str, html: str):
cur = conn.cursor()
cur.execute(
f"""INSERT INTO `{TABLE_NAME}` (patient_id, html, fetched_at)
VALUES (%s, %s, NOW())
ON DUPLICATE KEY UPDATE html = VALUES(html), fetched_at = VALUES(fetched_at)""",
(patient_id, html),
)
conn.commit()
cur.close()
# ---------- Playwright helpers ----------
def wait_for_grid_ready(page):
# grid present & at least one row (be generous on timeout)
page.wait_for_selector("div[role='rowgroup']", timeout=20000)
page.wait_for_selector("div[role='row'][data-id]", timeout=20000)
def set_page_size_100(page):
# Click the page-size combobox (CZ/EN + generic)
for loc in [
page.get_by_role("combobox", name="Řádků na stránce:"),
page.get_by_role("combobox", name="Rows per page:"),
page.locator("div.MuiTablePagination-root [role='combobox']"),
]:
if loc.count():
loc.first.click()
break
# Select 100 (MUI menu often renders in a portal)
opt = page.get_by_role("option", name="100")
if not opt.count():
opt = page.locator("//li[normalize-space(.)='100']")
opt.first.wait_for(state="visible", timeout=5000)
opt.first.click()
# Wait for rows to refresh
try:
page.wait_for_selector("div[role='row'][data-id]", timeout=10000)
except PWTimeout:
time.sleep(0.8)
def harvest_ids_on_current_page(page) -> Set[str]:
ids = set()
for sel in ["div[role='row'][data-id]", "div.MuiDataGrid-row[data-id]"]:
for row in page.locator(sel).all():
pid = row.get_attribute("data-id")
if pid:
ids.add(pid)
return ids
def click_next_page(page) -> bool:
# Prefer ARIA label
nxt = page.get_by_role("button", name="Go to next page")
if nxt.count():
try:
if nxt.first.is_enabled():
nxt.first.click()
return True
except Exception:
pass
# Fallback (CZ)
nxt2 = page.get_by_role("button", name="Další")
if nxt2.count():
try:
if nxt2.first.is_enabled():
nxt2.first.click()
return True
except Exception:
pass
return False
def ensure_detail_open(page) -> bool:
# Detail drawer/dialog visible?
for sel in ["[role='dialog']", "div.MuiDrawer-paper", "div[aria-modal='true']"]:
loc = page.locator(sel)
if loc.count() and loc.first.is_visible():
return True
return False
# ---------- Main workflow ----------
def collect_all_patient_ids(context) -> Set[str]:
page = context.new_page()
page.set_default_timeout(15000)
page.set_default_navigation_timeout(30000)
# Use domcontentloaded (SPAs often keep network busy)
page.goto(BASE_LIST_URL, wait_until="domcontentloaded")
if "/prihlaseni" in page.url.lower():
raise SystemExit("Session expired → refresh medevio_storage.json via the login script.")
wait_for_grid_ready(page)
# optional: print label like "125 z 1856"
try:
label = page.locator("p.MuiTablePagination-displayedRows").first.inner_text()
print("Pagination label BEFORE:", label)
except Exception:
pass
# Set 100/page
try:
set_page_size_100(page)
try:
label = page.locator("p.MuiTablePagination-displayedRows").first.inner_text()
print("Pagination label AFTER :", label)
except Exception:
pass
except Exception as e:
print(f"Warning: could not set page size to 100: {e!r}")
all_ids: Set[str] = set()
page_index = 1
while True:
wait_for_grid_ready(page)
ids_now = harvest_ids_on_current_page(page)
print(f"Page {page_index}: harvested {len(ids_now)} ids")
all_ids |= ids_now
# Try to go next; if cannot, break
if not click_next_page(page):
break
# Wait for DOM to actually update (new rows)
try:
page.wait_for_load_state("domcontentloaded", timeout=10000)
except PWTimeout:
pass
time.sleep(0.5)
page_index += 1
page.close()
print(f"Total unique IDs collected: {len(all_ids)}")
return all_ids
def fetch_and_store_patient_html(context, conn, patient_id: str):
page = context.new_page()
page.set_default_timeout(15000)
page.set_default_navigation_timeout(30000)
url = f"{BASE_LIST_URL}?pacient={patient_id}"
page.goto(url, wait_until="domcontentloaded")
# If detail didnt open, fallback: go to list, click row
if not ensure_detail_open(page):
page.goto(BASE_LIST_URL, wait_until="domcontentloaded")
try:
page.wait_for_selector(f"div[role='row'][data-id='{patient_id}']", timeout=15000)
page.locator(f"div[role='row'][data-id='{patient_id}']").first.click()
# wait for drawer/dialog
page.wait_for_selector("[role='dialog'], div.MuiDrawer-paper, div[aria-modal='true']", timeout=12000)
except PWTimeout:
print(f"[{patient_id}] detail panel did not open — skipping")
page.close()
return
# Save full HTML of the page (includes the open detail drawer)
html = page.content()
db_upsert_html(conn, patient_id, html)
print(f"[{patient_id}] saved HTML ({len(html)} bytes) at {datetime.now().isoformat(timespec='seconds')}")
page.close()
# Throttle per your requirement
time.sleep(SAVE_DELAY_SECONDS)
def main():
# Check storage exists
if not Path(STATE_FILE).exists():
raise SystemExit(f"Storage not found: {STATE_FILE}")
# DB ready
conn = db_connect()
db_ensure_table(conn)
already = db_existing_ids(conn)
print(f"Already in DB: {len(already)} ids")
with sync_playwright() as p:
browser = p.chromium.launch(headless=False) # set False to watch
context = browser.new_context(storage_state=STATE_FILE)
# 1) Collect all IDs from the listing (all pages)
# all_ids = collect_all_patient_ids(context)
all_ids=db_existing_ids(conn)
# 2) Iterate and store HTML (skip existing)
todo = [pid for pid in sorted(all_ids) if pid not in already]
print(f"To fetch now: {len(todo)} ids (skipping {len(all_ids)-len(todo)} already saved)")
for i, pid in enumerate(todo, 1):
try:
fetch_and_store_patient_html(context, conn, pid)
except Exception as e:
print(f"[{pid}] ERROR: {e!r} — continuing with next")
browser.close()
conn.close()
print("Done.")
if __name__ == "__main__":
main()
+258
View File
@@ -0,0 +1,258 @@
# medevio_dump_patients_html_to_mysql.py
import time
import json
from pathlib import Path
from datetime import datetime
from typing import Set
import mysql.connector
from mysql.connector import errorcode
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
# ---------- CONFIG ----------
STATE_FILE = r"../medevio_storage.json"
BASE_LIST_URL = "https://my.medevio.cz/mudr-buzalkova/klinika/pacienti"
SAVE_DELAY_SECONDS = 10 # throttle: 10 sec per patient
# MySQL connection settings (fill in)
MYSQL_CFG = dict(
host="192.168.1.74",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
)
TABLE_NAME = "patients_html" # schema created automatically
# ---------- DB helpers ----------
def db_connect():
try:
conn = mysql.connector.connect(**MYSQL_CFG)
return conn
except mysql.connector.Error as e:
raise SystemExit(f"MySQL connection failed: {e}")
def db_ensure_table(conn):
ddl = f"""
CREATE TABLE IF NOT EXISTS `{TABLE_NAME}` (
patient_id VARCHAR(64) PRIMARY KEY,
html LONGTEXT NOT NULL,
fetched_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
"""
cur = conn.cursor()
cur.execute(ddl)
conn.commit()
cur.close()
def db_existing_ids(conn) -> Set[str]:
ids = set()
cur = conn.cursor()
cur.execute(f"SELECT patient_id FROM `{TABLE_NAME}`")
for (pid,) in cur.fetchall():
ids.add(pid)
cur.close()
return ids
def db_upsert_html(conn, patient_id: str, html: str):
cur = conn.cursor()
cur.execute(
f"""INSERT INTO `{TABLE_NAME}` (patient_id, html, fetched_at)
VALUES (%s, %s, NOW())
ON DUPLICATE KEY UPDATE html = VALUES(html), fetched_at = VALUES(fetched_at)""",
(patient_id, html),
)
conn.commit()
cur.close()
# ---------- Playwright helpers ----------
def wait_for_grid_ready(page):
# grid present & at least one row (be generous on timeout)
page.wait_for_selector("div[role='rowgroup']", timeout=20000)
page.wait_for_selector("div[role='row'][data-id]", timeout=20000)
def set_page_size_100(page):
# Click the page-size combobox (CZ/EN + generic)
for loc in [
page.get_by_role("combobox", name="Řádků na stránce:"),
page.get_by_role("combobox", name="Rows per page:"),
page.locator("div.MuiTablePagination-root [role='combobox']"),
]:
if loc.count():
loc.first.click()
break
# Select 100 (MUI menu often renders in a portal)
opt = page.get_by_role("option", name="100")
if not opt.count():
opt = page.locator("//li[normalize-space(.)='100']")
opt.first.wait_for(state="visible", timeout=5000)
opt.first.click()
# Wait for rows to refresh
try:
page.wait_for_selector("div[role='row'][data-id]", timeout=10000)
except PWTimeout:
time.sleep(0.8)
def harvest_ids_on_current_page(page) -> Set[str]:
ids = set()
for sel in ["div[role='row'][data-id]", "div.MuiDataGrid-row[data-id]"]:
for row in page.locator(sel).all():
pid = row.get_attribute("data-id")
if pid:
ids.add(pid)
return ids
def click_next_page(page) -> bool:
# Prefer ARIA label
nxt = page.get_by_role("button", name="Go to next page")
if nxt.count():
try:
if nxt.first.is_enabled():
nxt.first.click()
return True
except Exception:
pass
# Fallback (CZ)
nxt2 = page.get_by_role("button", name="Další")
if nxt2.count():
try:
if nxt2.first.is_enabled():
nxt2.first.click()
return True
except Exception:
pass
return False
def ensure_detail_open(page) -> bool:
# Detail drawer/dialog visible?
for sel in ["[role='dialog']", "div.MuiDrawer-paper", "div[aria-modal='true']"]:
loc = page.locator(sel)
if loc.count() and loc.first.is_visible():
return True
return False
# ---------- Main workflow ----------
def collect_all_patient_ids(context) -> Set[str]:
page = context.new_page()
page.set_default_timeout(15000)
page.set_default_navigation_timeout(30000)
# Use domcontentloaded (SPAs often keep network busy)
page.goto(BASE_LIST_URL, wait_until="domcontentloaded")
if "/prihlaseni" in page.url.lower():
raise SystemExit("Session expired → refresh medevio_storage.json via the login script.")
wait_for_grid_ready(page)
# optional: print label like "125 z 1856"
try:
label = page.locator("p.MuiTablePagination-displayedRows").first.inner_text()
print("Pagination label BEFORE:", label)
except Exception:
pass
# Set 100/page
try:
set_page_size_100(page)
try:
label = page.locator("p.MuiTablePagination-displayedRows").first.inner_text()
print("Pagination label AFTER :", label)
except Exception:
pass
except Exception as e:
print(f"Warning: could not set page size to 100: {e!r}")
all_ids: Set[str] = set()
page_index = 1
while True:
wait_for_grid_ready(page)
ids_now = harvest_ids_on_current_page(page)
print(f"Page {page_index}: harvested {len(ids_now)} ids")
all_ids |= ids_now
# Try to go next; if cannot, break
if not click_next_page(page):
break
# Wait for DOM to actually update (new rows)
try:
page.wait_for_load_state("domcontentloaded", timeout=10000)
except PWTimeout:
pass
time.sleep(0.5)
page_index += 1
page.close()
print(f"Total unique IDs collected: {len(all_ids)}")
return all_ids
def fetch_and_store_patient_html(context, conn, patient_id: str):
page = context.new_page()
page.set_default_timeout(15000)
page.set_default_navigation_timeout(30000)
url = f"{BASE_LIST_URL}?pacient={patient_id}"
page.goto(url, wait_until="domcontentloaded")
# If detail didnt open, fallback: go to list, click row
if not ensure_detail_open(page):
page.goto(BASE_LIST_URL, wait_until="domcontentloaded")
try:
page.wait_for_selector(f"div[role='row'][data-id='{patient_id}']", timeout=15000)
page.locator(f"div[role='row'][data-id='{patient_id}']").first.click()
# wait for drawer/dialog
page.wait_for_selector("[role='dialog'], div.MuiDrawer-paper, div[aria-modal='true']", timeout=12000)
except PWTimeout:
print(f"[{patient_id}] detail panel did not open — skipping")
page.close()
return
# Save full HTML of the page (includes the open detail drawer)
html = page.content()
db_upsert_html(conn, patient_id, html)
print(f"[{patient_id}] saved HTML ({len(html)} bytes) at {datetime.now().isoformat(timespec='seconds')}")
page.close()
# Throttle per your requirement
time.sleep(SAVE_DELAY_SECONDS)
def main():
# Check storage exists
if not Path(STATE_FILE).exists():
raise SystemExit(f"Storage not found: {STATE_FILE}")
# DB ready
conn = db_connect()
db_ensure_table(conn)
already = db_existing_ids(conn)
print(f"Already in DB: {len(already)} ids")
with sync_playwright() as p:
browser = p.chromium.launch(headless=True) # set False to watch
context = browser.new_context(storage_state=STATE_FILE)
# 1) Collect all IDs from the listing (all pages)
all_ids = collect_all_patient_ids(context)
# 2) Iterate and store HTML (skip existing)
todo = [pid for pid in sorted(all_ids) if pid not in already]
print(f"To fetch now: {len(todo)} ids (skipping {len(all_ids)-len(todo)} already saved)")
for i, pid in enumerate(todo, 1):
try:
fetch_and_store_patient_html(context, conn, pid)
except Exception as e:
print(f"[{pid}] ERROR: {e!r} — continuing with next")
browser.close()
conn.close()
print("Done.")
if __name__ == "__main__":
main()
@@ -0,0 +1,110 @@
import mysql.connector
from bs4 import BeautifulSoup
import re
import time
MYSQL_CFG = dict(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
)
#Helper functions
def is_valid_rc(rc: str) -> bool:
"""
Very basic RC check:
remove any slash
must be 9 or 10 digits
"""
rc_clean = rc.replace("/", "")
return bool(re.fullmatch(r"\d{9,10}", rc_clean))
conn = mysql.connector.connect(**MYSQL_CFG)
# --- get latest HTML (single-row result) ---
with conn.cursor() as cur:
cur.execute("""
SELECT html
FROM kartoteka_html
where round=3
ORDER BY `fetched-at` DESC
""")
rows = cur.fetchall()
if not rows:
raise RuntimeError("No HTML found in kartoteka_html")
for row in rows:
html = row[0]
soup = BeautifulSoup(html, "html.parser")
records = []
for row in soup.find_all("div", attrs={"role": "row", "data-id": True}):
data_id = row["data-id"]
# full name -> surname + rest
name_btn = row.find("button", class_="MuiTypography-root")
fullname = name_btn.get_text(strip=True) if name_btn else ""
parts = fullname.split()
surname = parts[0] if parts else ""
name = " ".join(parts[1:]) if len(parts) > 1 else ""
# RC
id_cell = row.find("div", attrs={"data-field": "IdentificationNumber"})
rc = (id_cell.get("title", "") if id_cell else "")
rc = rc.replace("/", "").replace("\\", "")
# Phone
ph_cell = row.find("div", attrs={"data-field": "Phone"})
raw_phone = ph_cell.get("title", "") if ph_cell else ""
raw_phone = raw_phone.replace("\u00A0", " ") # NBSP -> space
phone = re.sub(r"[^\d+]", "", raw_phone) # keep + and digits
# Insurance
ins_cell = row.find("div", attrs={"data-field": "InsuranceCompany"})
poj = ins_cell.get("title", "") if ins_cell else ""
# Skip rows with no name or no RC or not valid TC
if not fullname or not rc:
continue
if not is_valid_rc(rc):
continue
records.append((data_id, fullname, rc, phone, poj))
# --- per-patient lookup: use a fresh cursor each time (or buffered=True) ---
with conn.cursor(buffered=True) as cur2:
cur2.execute(
"""
SELECT *
FROM patients_extracted
WHERE rc=%s
""",
(rc,),
)
rows = cur2.fetchall()
# print(surname, name, rc, len(rows))
if len(rows) > 1:
print(f"Pacient {surname} {name} {rc} je v medeviu {len(rows)}x")
time.sleep(1)
if len(rows)==0:
print(f"Pacient {surname} {name} {rc} je v medeviu {len(rows)}x")
time.sleep(1)
if len(rows)==1 and rows[0][0]!=data_id:
print(f"Pacient {surname} {name} {rc} má v medeviu jiný id, v db je {rows[0][0]} and nyní je {data_id}")
time.sleep(.1)
if len(rows) == 1:
cur2.execute("""
Update patients_extracted set rid=%s where rc=%s""",(data_id,rc))
conn.commit()
# preview
# for r in records[:10]:
# print(f"ID: {r[0]} Name: {r[1]} RC: {r[2]} Phone: {r[3]} Pojistovna: {r[4]}")
#
# print("Total patients:", len(records))
View File
@@ -0,0 +1,188 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
from pathlib import Path
from datetime import datetime
import pymysql
from pymysql.cursors import DictCursor
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
# ========= CONFIG =========
MYSQL_CFG = dict(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="medevio",
cursorclass=DictCursor,
autocommit=False, # we commit in batches
)
# Column in patients_extracted that stores Medevio UUID used in the URL:
UUID_COLUMN = "rid" # <-- change if your column name differs
# Output columns (will be created if missing; MySQL 8.0+ supports IF NOT EXISTS):
REGISTERED_COL = "medevio_registered" # TINYINT(1) NULL/0/1
CHECKED_AT_COL = "medevio_checked_at" # DATETIME NULL
ERROR_COL = "medevio_check_error" # TEXT NULL (optional)
# Medevio routing
PATIENT_URL_TMPL = "https://my.medevio.cz/mudr-buzalkova/klinika/pacienti?pacient={uuid}"
# Login session (created earlier with your script)
STATE_FILE = Path("../medevio_storage.json")
# Batch/pace
BATCH_LIMIT = 5 # how many patients per run
SLEEP_SECONDS = 3 # wait between patients (requested)
NAV_TIMEOUT = 20_000 # ms
TEXT_TIMEOUT = 15_000 # ms (for main area/heading)
# Texts indicating NOT registered:
NOT_REGISTERED_STRINGS = [
"Pacientka zatím nemá Medevio účet.",
"Pacient zatím nemá Medevio účet.",
]
# ==========================
SELECT_SQL = f"""
SELECT {UUID_COLUMN} AS uuid, jmeno, prijmeni, rc
FROM patients_extracted
WHERE {UUID_COLUMN} IS NOT NULL
AND {UUID_COLUMN} <> ''
AND {REGISTERED_COL} IS NULL
LIMIT {BATCH_LIMIT};
"""
UPDATE_OK_SQL = f"""
UPDATE patients_extracted
SET {REGISTERED_COL}=%s, {CHECKED_AT_COL}=NOW(), {ERROR_COL}=NULL
WHERE {UUID_COLUMN}=%s
"""
UPDATE_ERR_SQL = f"""
UPDATE patients_extracted
SET {REGISTERED_COL}=NULL, {CHECKED_AT_COL}=NOW(), {ERROR_COL}=%s
WHERE {UUID_COLUMN}=%s
"""
DDL_SQLS = [
f"ALTER TABLE patients_extracted ADD COLUMN {REGISTERED_COL} TINYINT(1) NULL",
f"ALTER TABLE patients_extracted ADD COLUMN {CHECKED_AT_COL} DATETIME NULL",
f"ALTER TABLE patients_extracted ADD COLUMN {ERROR_COL} TEXT NULL",
]
CHECKS_FOR_DDL_SQLS=[
f"SELECT COUNT(*) AS cnt FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'patients_extracted' AND COLUMN_NAME = '{REGISTERED_COL}'",
f"SELECT COUNT(*) AS cnt FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'patients_extracted' AND COLUMN_NAME = '{CHECKED_AT_COL}'",
f"SELECT COUNT(*) AS cnt FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'patients_extracted' AND COLUMN_NAME = '{ERROR_COL}'",
]
def ensure_columns(conn):
with conn.cursor() as cur:
for ddl,ddlcheck in zip(DDL_SQLS,CHECKS_FOR_DDL_SQLS):
cur.execute(ddlcheck)
row = cur.fetchone()
if row["cnt"] == 0:
print("Column missing")
cur.execute(ddl)
print(f"✓ Executed: {ddl}")
else:
print("Column exists")
conn.commit()
def pick_registered_flag(page_text: str) -> int:
text = page_text or ""
# If any NOT-registered phrase is present → 0; otherwise assume registered → 1
for marker in NOT_REGISTERED_STRINGS:
if marker in text:
return 0
return 1
def main():
# --- DB: fetch a batch to process ---
conn = pymysql.connect(**MYSQL_CFG)
try:
ensure_columns(conn)
with conn.cursor() as cur:
cur.execute("SET NAMES utf8mb4 COLLATE utf8mb4_czech_ci")
cur.execute("SET collation_connection = 'utf8mb4_czech_ci'")
cur.execute(SELECT_SQL)
rows = cur.fetchall()
if not rows:
print("No patients to check (all have medevio_registered filled).")
return
print(f"Will process {len(rows)} patients…")
# --- Playwright session ---
with sync_playwright() as p:
browser = p.chromium.launch(headless=False, slow_mo=0)
context = browser.new_context(storage_state=str(STATE_FILE))
page = context.new_page()
page.set_default_timeout(NAV_TIMEOUT)
processed = ok = errs = 0
for r in rows:
processed += 1
# pid = r["id"]
uuid = r["uuid"]
name = f"{r.get('prijmeni','')}, {r.get('jmeno','')}"
rc = r.get("rc","")
url = PATIENT_URL_TMPL.format(uuid=uuid)
print(f"URL pro otevření pacienta je: {url}0")
print(f"[{processed:>3}] {name} | RC {rc} | {uuid}{url}")
try:
page.goto(url, wait_until="domcontentloaded")
# Optionally wait for a stable anchor; fallback to sleep
try:
# A stable bit we saw earlier
page.get_by_text("Historie požadavků").wait_for(timeout=TEXT_TIMEOUT)
except PWTimeout:
pass
# Wait the requested 3 seconds for the UI to settle
time.sleep(SLEEP_SECONDS)
# Get full text and detect
full_text = page.content() # HTML; safer to check visible text too:
vis_text = page.inner_text("body")
registered = pick_registered_flag(full_text) if full_text else pick_registered_flag(vis_text)
with conn.cursor() as cur:
cur.execute(UPDATE_OK_SQL, (registered, pid))
conn.commit()
ok += 1
state = "REGISTERED" if registered == 1 else "NOT REGISTERED"
print(f"{state}")
except Exception as e:
conn.rollback()
errs += 1
msg = f"{type(e).__name__}: {e}"
with conn.cursor() as cur:
cur.execute(UPDATE_ERR_SQL, (msg[:1000], pid))
conn.commit()
print(f" ! ERROR → {msg}")
browser.close()
print(f"Done. processed={processed}, ok={ok}, errors={errs}")
finally:
conn.close()
if __name__ == "__main__":
main()
+118
View File
@@ -0,0 +1,118 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
from urllib.parse import urlparse, parse_qs
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout, Page
STATE_FILE = "../medevio_storage.json"
POZADAVKY_URL = "https://my.medevio.cz/mudr-buzalkova/klinika/pozadavky?neprirazene=1"
# ---------- helpers ----------
def get_uuid_from_href(href: str) -> str | None:
try:
q = parse_qs(urlparse(href).query)
val = q.get("pozadavek", [None])[0]
return val if val else None
except Exception:
return None
def is_flu_request(text: str) -> bool:
return bool(re.search(r"ch(r|ř)ipk", text, re.IGNORECASE))
def scrape_visible_rows(page: Page, seen: set) -> list[dict]:
"""Collect all *new* visible rows on the current screen."""
bucket: list[dict] = []
rows = page.locator('tr[data-testid="patient-request-row"]')
n = rows.count()
for i in range(n):
row = rows.nth(i)
href_el = row.locator('a[href*="pozadavky?pozadavek="]').first
href = href_el.get_attribute("href") if href_el.count() else None
req_id = get_uuid_from_href(href) if href else None
if not req_id or req_id in seen:
continue
name = (row.locator('td:nth-child(2) a span').first.text_content(timeout=0) or "").strip()
rc = (row.locator('a.MuiTypography-overline2').first.text_content(timeout=0) or "").strip()
text_p = row.locator('td:nth-child(3) p.MuiTypography-body1, td:nth-child(4) p.MuiTypography-body1').first
text_req = (text_p.text_content(timeout=0) or "").strip()
if not text_req:
aria = row.locator('td:nth-child(3) [aria-label], td:nth-child(4) [aria-label]').first
text_req = (aria.get_attribute("aria-label") or "").strip() if aria.count() else ""
avatar = row.locator('[data-testid="queue-avatar"]').first
assigned_to = (avatar.get_attribute("aria-label") or "").strip() if avatar.count() else ""
initials = (avatar.text_content(timeout=0) or "").strip() if avatar.count() else ""
seen.add(req_id)
bucket.append({
"id": req_id,
"name": name,
"rc": rc,
"text": text_req,
"assigned_to": assigned_to,
"initials": initials,
})
return bucket
def assign_request_to_buzalka(page: Page, request_uuid: str) -> None:
"""Open request detail by UUID and assign it to MUDr. Buzalka (já)."""
url = f"{POZADAVKY_URL.split('?')[0]}?pozadavek={request_uuid}"
page.goto(url, wait_until="domcontentloaded", timeout=60_000)
combo = page.locator('div[role="combobox"][aria-labelledby="queue-select-label"]')
combo.wait_for(state="visible")
combo.click()
option = page.get_by_role("option", name=re.compile(r"MUDr\.?\s*Buzalka", re.I))
option.click()
page.wait_for_load_state("networkidle")
page.locator("button.MuiDialog-close").click()
print(f"✔ Assigned to MUDr. Buzalka: {request_uuid}")
# ---------- main ----------
def main():
with sync_playwright() as pw:
browser = pw.chromium.launch(headless=False) # we want to see the page
context = browser.new_context(storage_state=STATE_FILE)
page = context.new_page()
page.goto(POZADAVKY_URL, wait_until="domcontentloaded", timeout=60_000)
# check login
body = (page.text_content("body") or "").lower()
if any(x in body for x in ["přihlášení", "přihlásit", "sign in", "login"]):
raise SystemExit("Not logged in refresh medevio_storage.json.")
try:
page.wait_for_selector('tr[data-testid="patient-request-row"]', timeout=20_000)
except PWTimeout:
raise SystemExit("Rows not found: tr[data-testid=patient-request-row].")
seen: set[str] = set()
assigned_count = 0
print("\n>>> Scroll the page manually. Press Enter here any time to scrape current view.")
print(" Press Ctrl+C to finish.\n")
while True:
input("Press Enter to scan visible rows...")
for item in scrape_visible_rows(page, seen):
text = item["text"]
initials = (item["initials"] or "").upper()
assigned_to = (item["assigned_to"] or "").lower()
if is_flu_request(text) and not ("buzalka" in assigned_to or initials == "VB"):
assign_request_to_buzalka(page, item["id"])
assigned_count += 1
print(f"Total newly assigned so far: {assigned_count}")
if __name__ == "__main__":
main()
+236
View File
@@ -0,0 +1,236 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from urllib.parse import urlparse, parse_qs
import re
import time
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout, Page
# ===== funkce pro přiřazení jednoho požadavku =====
def assign_request_to_buzalka(page: Page, request_uuid: str) -> None:
"""
Otevře kartu požadavku podle UUID a přiřadí ji MUDr. Buzalka (já).
Po uložení změny zavře dialog a vypíše potvrzení.
"""
url = f"https://my.medevio.cz/mudr-buzalkova/klinika/pozadavky?pozadavek={request_uuid}"
page.goto(url, wait_until="domcontentloaded", timeout=60_000)
combo = page.locator('div[role="combobox"][aria-labelledby="queue-select-label"]')
combo.wait_for(state="visible")
combo.click()
option = page.get_by_role("option", name=re.compile(r"MUDr\.?\s*Buzalka", re.I))
option.click()
page.wait_for_load_state("networkidle")
page.locator("button.MuiDialog-close").click()
print(f"✔ Požadavek {request_uuid} přiřazen: MUDr. Buzalka (já)")
# ===== hlavní část: projít listing a řešit chřipku =====
POZADAVKY_URL = "https://my.medevio.cz/mudr-buzalkova/klinika/pozadavky"
# POZADAVKY_URL = "https://my.medevio.cz/mudr-buzalkova/klinika/pozadavky?neprirazene=1"
STATE_FILE = "../medevio_storage.json"
from playwright.sync_api import Page
import time
def _find_scroll_container(page: Page):
"""Return an ElementHandle of the real scrollable container, or None -> use window."""
handle = page.evaluate_handle("""
() => {
const isScrollable = el => !!el && (el.scrollHeight > el.clientHeight || el.scrollWidth > el.clientWidth);
const row = document.querySelector('tr[data-testid="patient-request-row"]');
if (row) {
let el = row.parentElement;
while (el) {
const style = getComputedStyle(el);
const overflowY = style.overflowY;
if (isScrollable(el) && (overflowY === 'auto' || overflowY === 'scroll')) return el;
el = el.parentElement;
}
}
const guesses = [
'[role="rowgroup"]', '[role="table"]', '.MuiTableContainer-root',
'[data-testid="requests-table"]', '.MuiContainer-root', 'main'
];
for (const sel of guesses) {
const el = document.querySelector(sel);
if (el) {
const style = getComputedStyle(el);
const overflowY = style.overflowY;
if (isScrollable(el) && (overflowY === 'auto' || overflowY === 'scroll')) return el;
}
}
return null;
}
""")
# If JS returned null, convert to Python None
try:
if handle is None or handle.json_value() is None:
return None
except Exception:
return None
return handle
def _has_handle(page: Page, handle) -> bool:
"""Check the handle still points to a live element; else False -> use window."""
if not handle:
return False
try:
return bool(page.evaluate("(el)=>!!el", handle))
except Exception:
return False
def _scroll_step(page: Page, container_handle, px=800):
if _has_handle(page, container_handle):
try:
page.evaluate(
"(args) => { const [el, dy] = args; el.scrollBy(0, dy); }",
[container_handle, px]
)
return
except Exception:
pass
# Fallback to window
page.evaluate("dy => window.scrollBy(0, dy)", px)
def _scroll_to_bottom(page: Page, container_handle):
if _has_handle(page, container_handle):
try:
page.evaluate("(el) => el.scrollTo(0, el.scrollHeight)", container_handle)
return
except Exception:
pass
page.evaluate("() => window.scrollTo(0, document.body.scrollHeight)")
def _click_load_more_if_any(page: Page) -> bool:
btn = page.locator("button:has-text('Načíst více'), button:has-text('Zobrazit další'), button:has-text('Load more')")
if btn.count() and btn.is_visible():
btn.click()
return True
return False
def load_all_requests(page: Page, max_rounds: int = 200, stagnation_limit: int = 4) -> None:
"""
Incrementally loads the entire list of requests.
Stops after 'stagnation_limit' rounds without row growth, or after max_rounds.
"""
page.wait_for_selector('tr[data-testid="patient-request-row"]', timeout=20000)
container = _find_scroll_container(page)
prev_count = page.locator('tr[data-testid="patient-request-row"]').count()
stagnant = 0
for _ in range(max_rounds):
if _click_load_more_if_any(page):
page.wait_for_load_state("networkidle")
# small incremental scrolls
for _ in range(4):
_scroll_step(page, container, px=800)
time.sleep(0.15)
# touch bottom at least once
_scroll_to_bottom(page, container)
# settle
page.wait_for_load_state("networkidle")
spinners = page.locator('[role="progressbar"], .MuiCircularProgress-root')
if spinners.count():
try:
spinners.first.wait_for(state="detached", timeout=5000)
except Exception:
pass
# growth check
curr_count = page.locator('tr[data-testid="patient-request-row"]').count()
if curr_count <= prev_count:
stagnant += 1
else:
stagnant = 0
prev_count = curr_count
if stagnant >= stagnation_limit:
break
def get_uuid_from_href(href: str) -> str | None:
try:
q = parse_qs(urlparse(href).query)
val = q.get("pozadavek", [None])[0]
return val
except Exception:
return None
def is_flu_request(text: str) -> bool:
# libovolná varianta slova „chřipk“ (chřipka, chřipky, …), case-insensitive, s diakritikou i bez
return bool(re.search(r"ch(r|ř)ipk", text, re.IGNORECASE))
def main():
with sync_playwright() as pw:
browser = pw.chromium.launch(headless=False)
context = browser.new_context(storage_state=STATE_FILE)
page = context.new_page()
page.goto(POZADAVKY_URL, wait_until="domcontentloaded", timeout=60_000)
body = (page.text_content("body") or "").lower()
if any(x in body for x in ["přihlášení", "přihlásit", "sign in", "login"]):
raise SystemExit("Vypadá to, že nejsi přihlášený obnov prosím medevio_storage.json.")
try:
page.wait_for_selector('tr[data-testid="patient-request-row"]', timeout=20_000)
except PWTimeout:
raise SystemExit("Nenašel jsem řádky požadavků (selector tr[data-testid=patient-request-row]).")
# after navigating to the listing and ensuring first rows are visible:
load_all_requests(page)
rows = page.locator('tr[data-testid="patient-request-row"]')
print("Loaded rows:", rows.count())
for i in range(count):
row = rows.nth(i)
# UUID z href
a_with_req = row.locator('a[href*="pozadavky?pozadavek="]').first
href = a_with_req.get_attribute("href") if a_with_req.count() else None
req_id = get_uuid_from_href(href) if href else None
if not req_id:
continue
# Text požadavku (pro filtr „chřipka“)
text_p = row.locator('td:nth-child(3) p.MuiTypography-body1, td:nth-child(4) p.MuiTypography-body1').first
text_req = text_p.inner_text().strip() if text_p.count() else ""
if not text_req:
aria = row.locator('td:nth-child(3) [aria-label], td:nth-child(4) [aria-label]').first
text_req = (aria.get_attribute("aria-label") or "").strip() if aria.count() else ""
if not is_flu_request(text_req):
# není to chřipkový požadavek přeskočit
continue
# Zjištění přiřazení z avatara v listingu
avatar = row.locator('[data-testid="queue-avatar"]').first
assigned_to = (avatar.get_attribute("aria-label") or "").strip() if avatar.count() else ""
initials = avatar.inner_text().strip() if avatar.count() else ""
already_mine = ("buzalka" in assigned_to.lower()) or (initials.upper() == "VB")
if already_mine:
print(f"= SKIP (už přiřazeno mně): {req_id} | {text_req}")
continue
print(f"→ Přiřazuji chřipkový požadavek: {req_id} | {text_req}")
assign_request_to_buzalka(page, req_id)
time.sleep(1)
context.close()
browser.close()
if __name__ == "__main__":
main()
+41
View File
@@ -0,0 +1,41 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
from playwright.sync_api import sync_playwright
STATE_FILE = "../medevio_storage.json"
REQUEST_URL = (
"https://my.medevio.cz/mudr-buzalkova/klinika/"
"pozadavky?pozadavek=e28cbf71-8280-4078-a881-c44119bbccc2"
)
def main():
with sync_playwright() as pw:
browser = pw.chromium.launch(headless=False)
context = browser.new_context(storage_state=STATE_FILE)
page = context.new_page()
# otevři konkrétní požadavek
page.goto(REQUEST_URL, wait_until="domcontentloaded", timeout=60_000)
# combobox Fronta
combo = page.locator('div[role="combobox"][aria-labelledby="queue-select-label"]')
combo.wait_for(state="visible")
combo.click()
# vyber „MUDr. Buzalka (já)“
option = page.get_by_role("option", name=re.compile(r"MUDr\.?\s*Buzalka", re.I))
option.click()
# počkej, dokud síť neutichne (změna je odeslaná/uložená)
page.wait_for_load_state("networkidle")
# zavři dialog
page.locator("button.MuiDialog-close").click()
context.close()
browser.close()
if __name__ == "__main__":
main()
+36
View File
@@ -0,0 +1,36 @@
"""
Activate virtualenv for current interpreter:
Use exec(open(this_file).read(), {'__file__': this_file}).
This can be used when you must use an existing Python interpreter, not the virtualenv bin/python.
""" # noqa: D415
from __future__ import annotations
import os
import site
import sys
try:
abs_file = os.path.abspath(__file__)
except NameError as exc:
msg = "You must use exec(open(this_file).read(), {'__file__': this_file}))"
raise AssertionError(msg) from exc
bin_dir = os.path.dirname(abs_file)
base = bin_dir[: -len("Scripts") - 1] # strip away the bin part from the __file__, plus the path separator
# prepend bin to PATH (this file is inside the bin directory)
os.environ["PATH"] = os.pathsep.join([bin_dir, *os.environ.get("PATH", "").split(os.pathsep)])
os.environ["VIRTUAL_ENV"] = base # virtual env is right above bin directory
os.environ["VIRTUAL_ENV_PROMPT"] = "" or os.path.basename(base) # noqa: SIM222
# add the virtual environments libraries to the host python import mechanism
prev_length = len(sys.path)
for lib in "..\\Lib\\site-packages".split(os.pathsep):
path = os.path.realpath(os.path.join(bin_dir, lib))
site.addsitedir(path.decode("utf-8") if "" else path)
sys.path[:] = sys.path[prev_length:] + sys.path[0:prev_length]
sys.real_prefix = sys.prefix
sys.prefix = base
File diff suppressed because one or more lines are too long
Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB

+315
View File
@@ -0,0 +1,315 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
FAST FILE HASH INDEXER UNRAID (BLAKE3 ONLY, ALL SHARES)
- HARDCODED SINGLE SHARE MODE
- SQL OPTIMIZATION
- STRICT MODE (NO TOLERANCE) - Updates DB on any mismatch
"""
import os
import pymysql
import socket
import platform
from blake3 import blake3
# ==============================
# ENV / HOST
# ==============================
HOSTNAME = socket.gethostname()
OS_NAME = platform.system()
# ZDE JE TO NATVRDO PRO TESTOVÁNÍ:
# SCAN_ONLY_THIS = None #"#Fotky"
SCAN_ONLY_THIS = '#Library' # "#Fotky"
# ==============================
# CONFIG
# ==============================
EXCLUDED_SHARES = {"domains", "appdata", "system", "isos"}
# --- File size limits (bytes) ---
FILE_MIN_SIZE = 0
FILE_MAX_SIZE = 1024 * 1024 * 1024 * 1024 # 50MB
DB_CONFIG = {
"host": "192.168.1.76",
"port": 3307,
"user": "root",
"password": "Vlado9674+",
"database": "torrents",
"charset": "utf8mb4",
"autocommit": True,
}
CHUNK_SIZE = 4 * 1024 * 1024 # 4 MB
PRINT_SKIPPED = False
# ==============================
# HASH
# ==============================
def compute_blake3(path: str) -> bytes:
h = blake3()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(CHUNK_SIZE), b""):
h.update(chunk)
return h.digest()
# ==============================
# SHARE / PATH HELPERS
# ==============================
def get_user_shares():
if SCAN_ONLY_THIS:
path = f"/mnt/user/{SCAN_ONLY_THIS}"
if os.path.isdir(path):
print(f"🎯 SINGLE SHARE MODE ACTIVE: Scanning only '{SCAN_ONLY_THIS}'")
return [SCAN_ONLY_THIS]
else:
print(f"⚠️ ERROR: Requested share '{SCAN_ONLY_THIS}' not found in /mnt/user!")
return []
shares = []
if not os.path.exists("/mnt/user"):
return []
for name in os.listdir("/mnt/user"):
if name.startswith("."):
continue
if name in EXCLUDED_SHARES:
continue
path = f"/mnt/user/{name}"
if os.path.isdir(path):
shares.append(name)
return sorted(shares)
def find_physical_roots(shares):
roots = []
if not os.path.exists("/mnt"):
return []
for disk in os.listdir("/mnt"):
if not disk.startswith("disk"):
continue
for share in shares:
path = f"/mnt/{disk}/{share}"
if os.path.isdir(path):
roots.append((share, path))
return sorted(roots)
def logical_path_from_disk_path(disk_path: str) -> str:
if not disk_path.startswith("/mnt/disk"):
raise ValueError(f"Unexpected disk path: {disk_path}")
parts = disk_path.split("/", 3)
return f"/mnt/user/{parts[3]}"
def size_allowed(size: int) -> bool:
if FILE_MIN_SIZE is not None and size < FILE_MIN_SIZE:
return False
if FILE_MAX_SIZE is not None and size > FILE_MAX_SIZE:
return False
return True
# ==============================
# MAIN
# ==============================
def main():
print("🚀 BLAKE3 indexer starting", flush=True)
print(f"🖥 Host: {HOSTNAME} | OS: {OS_NAME}", flush=True)
if FILE_MIN_SIZE or FILE_MAX_SIZE:
print(f"📏 File size limits: min={FILE_MIN_SIZE} max={FILE_MAX_SIZE}", flush=True)
shares = get_user_shares()
if not shares:
print("❌ No user shares to index!", flush=True)
return
print("📦 User shares to index:", flush=True)
for s in shares:
print(f" - {s}", flush=True)
scan_roots = find_physical_roots(shares)
if not scan_roots:
print("❌ No physical disk roots found!", flush=True)
return
print("📂 Physical scan roots:", flush=True)
for _, path in scan_roots:
print(f" - {path}", flush=True)
try:
db = pymysql.connect(**DB_CONFIG)
cur = db.cursor()
# === TOTO JE TEN PŘÍKAZ "NEPŘEMÝŠLEJ" ===
# Nastaví relaci na UTC. MySQL přestane posouvat časy o hodinu sem a tam.
# cur.execute("SET time_zone = '+00:00'")
# =========================================
except Exception as e:
print(f"❌ Database connection failed: {e}")
return
print("📥 Loading already indexed files into memory...", flush=True)
# === OPTIMALIZACE SQL ===
if SCAN_ONLY_THIS:
search_pattern = f"/mnt/user/{SCAN_ONLY_THIS}%"
print(f"⚡ OPTIMIZATION: Fetching only DB records for '{search_pattern}'", flush=True)
cur.execute("""
SELECT full_path, file_size, UNIX_TIMESTAMP(mtime)
FROM file_md5_index
WHERE host_name = %s AND full_path LIKE %s
""", (HOSTNAME, search_pattern))
else:
cur.execute("""
SELECT full_path, file_size, UNIX_TIMESTAMP(mtime)
FROM file_md5_index
WHERE host_name = %s
""", (HOSTNAME,))
# Načteme do slovníku pro rychlé vyhledávání
# Formát: { "cesta": (velikost, mtime) }
indexed_map = {row[0]: (row[1], row[2]) for row in cur.fetchall()}
print(f"✅ Loaded {len(indexed_map):,} indexed entries", flush=True)
print("======================================", flush=True)
new_files = 0
skipped = 0
filtered = 0
seen_paths = set()
# --- SCAN ---
for share, scan_root in scan_roots:
for root, _, files in os.walk(scan_root):
for fname in files:
disk_path = os.path.join(root, fname)
try:
stat = os.stat(disk_path)
except OSError:
continue
size = stat.st_size
if not size_allowed(size):
filtered += 1
continue
logical_path = logical_path_from_disk_path(disk_path)
if logical_path in seen_paths:
continue
seen_paths.add(logical_path)
mtime = int(stat.st_mtime)
# === PŘÍSNÁ KONTROLA (ŽÁDNÁ TOLERANCE) ===
# Pokud soubor v DB existuje a přesně sedí velikost i čas, přeskočíme ho.
# Vše ostatní (včetně posunu času o 1s) se považuje za změnu a aktualizuje se.
is_match = False
if logical_path in indexed_map:
db_size, db_mtime = indexed_map[logical_path]
if size == db_size and mtime == db_mtime:
is_match = True
if is_match:
skipped += 1
if PRINT_SKIPPED:
print(f"⏭ SKIP {logical_path}", flush=True)
continue
# ============================================
print(" NEW / UPDATED", flush=True)
print(f" File: {logical_path}", flush=True)
print(f" Size: {size:,} B", flush=True)
try:
b3 = compute_blake3(disk_path)
except Exception as e:
print(f"❌ BLAKE3 failed: {e}", flush=True)
continue
# Zde proběhne UPDATE mtime na hodnotu z disku
cur.execute("""
INSERT INTO file_md5_index
(os_name, host_name, full_path, file_name, directory,
file_size, mtime, blake3)
VALUES (%s, %s, %s, %s, %s, %s, FROM_UNIXTIME(%s), %s)
ON DUPLICATE KEY UPDATE
file_size = VALUES(file_size),
mtime = VALUES(mtime),
blake3 = VALUES(blake3),
updated_at = CURRENT_TIMESTAMP
""", (
OS_NAME,
HOSTNAME,
logical_path,
fname,
os.path.dirname(logical_path),
size,
mtime,
b3,
))
new_files += 1
print(f" B3 : {b3.hex()}", flush=True)
print("--------------------------------------", flush=True)
print("======================================", flush=True)
print(f"✅ New / updated : {new_files}", flush=True)
print(f"⏭ Skipped : {skipped}", flush=True)
print(f"🚫 Size filtered: {filtered}", flush=True)
print("🏁 Script finished", flush=True)
# ==============================
# DB CLEANUP REMOVE DELETED FILES
# ==============================
print("🧹 Checking for deleted files in DB...", flush=True)
db_paths = set(indexed_map.keys())
deleted_paths = db_paths - seen_paths
# Omezíme jen na aktuální share (pokud je aktivní)
if SCAN_ONLY_THIS:
prefix = f"/mnt/user/{SCAN_ONLY_THIS}/"
deleted_paths = {p for p in deleted_paths if p.startswith(prefix)}
if deleted_paths:
print(f"🗑 Removing {len(deleted_paths):,} deleted files from DB", flush=True)
BATCH_SIZE = 1000
deleted_paths = list(deleted_paths)
for i in range(0, len(deleted_paths), BATCH_SIZE):
batch = deleted_paths[i:i + BATCH_SIZE]
placeholders = ",".join(["%s"] * len(batch))
sql = f"""
DELETE FROM file_md5_index
WHERE host_name = %s
AND full_path IN ({placeholders})
"""
cur.execute(sql, (HOSTNAME, *batch))
print("✅ DB cleanup completed", flush=True)
else:
print("✅ No deleted files found in DB", flush=True)
cur.close()
db.close()
if __name__ == "__main__":
main()
+23
View File
@@ -0,0 +1,23 @@
import requests
from datetime import datetime
# Diafaan SMS Server settings
server_url = "http://192.168.1.138:9710/http/send-message"
username = "admin" # adjust if you changed it
password = "" # adjust if you changed it
phone_number = "+420775735276"
for i in range(1, 11):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message_text = f"{i}/10 {timestamp}"
params = {
"username": username,
"password": password,
"to": phone_number,
"message": message_text
}
try:
response = requests.get(server_url, params=params)
print(f"Sent '{message_text}': {response.text}")
except Exception as e:
print(f"Error sending {message_text}: {e}")
+18
View File
@@ -0,0 +1,18 @@
from playwright.sync_api import sync_playwright
LOGIN_URL = "https://my.medevio.cz/prihlaseni"
STATE_FILE = "../medevio_storage.json"
with sync_playwright() as p:
browser = p.chromium.launch(headless=False, slow_mo=150) # zobrazí prohlížeč
context = browser.new_context()
page = context.new_page()
page.goto(LOGIN_URL, wait_until="load")
print(">>> Přihlas se v otevřeném okně (Medevio).")
print(">>> Jakmile jsi na hlavní stránce po přihlášení, vrať se do PyCharm konzole a stiskni Enter.")
input()
context.storage_state(path=STATE_FILE)
print(f"Session uložena do: {STATE_FILE}")
browser.close()