notebookvb

This commit is contained in:
Vladimir Buzalka
2026-04-29 06:55:23 +02:00
parent a9c143ba24
commit daad4adeab
113 changed files with 16563 additions and 0 deletions
@@ -0,0 +1,313 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Full Medevio Report:
- Agenda (API, next 30 days)
- Otevřené požadavky (MySQL)
- Merged (Agenda + Open, deduplicated)
- Vaccine sheets (from merged data)
"""
import re
import json
import pymysql
import requests
import pandas as pd
from pathlib import Path
from datetime import datetime
from dateutil import parser, tz
from dateutil.relativedelta import relativedelta
from openpyxl import load_workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
from openpyxl.utils import get_column_letter
from openpyxl.utils.dataframe import dataframe_to_rows
# ==================== CONFIG ====================
GRAPHQL_URL = "https://api.medevio.cz/graphql"
CALENDAR_ID = "144c4e12-347c-49ca-9ec0-8ca965a4470d"
CLINIC_SLUG = "mudr-buzalkova"
DB_CONFIG = {
"host": "192.168.1.76",
"port": 3306,
"user": "root",
"password": "Vlado9674+",
"database": "medevio",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor,
}
EXPORT_DIR = Path(r"u:\Dropbox\Ordinace\Reporty")
EXPORT_DIR.mkdir(exist_ok=True, parents=True)
# Delete previous reports
for old in EXPORT_DIR.glob("* Agenda + Požadavky.xlsx"):
old.unlink()
print(f"🗑️ Deleted old report: {old.name}")
timestamp = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
xlsx_path = EXPORT_DIR / f"{timestamp} Agenda + Požadavky.xlsx"
# ==================== LOAD TOKEN ====================
TOKEN_PATH = Path(__file__).resolve().parent.parent / "token.txt"
gateway_token = TOKEN_PATH.read_text(encoding="utf-8").strip()
headers = {
"content-type": "application/json",
"authorization": f"Bearer {gateway_token}",
"origin": "https://my.medevio.cz",
"referer": "https://my.medevio.cz/",
}
# ==================== STYLING ====================
widths = {1: 11, 2: 13, 3: 45, 4: 30, 5: 15, 6: 15, 7: 30, 8: 15, 9: 37, 10: 37}
header_fill = PatternFill("solid", fgColor="FFFF00")
alt_fill = PatternFill("solid", fgColor="F2F2F2")
thin_border = Border(
left=Side(style="thin", color="000000"),
right=Side(style="thin", color="000000"),
top=Side(style="thin", color="000000"),
bottom=Side(style="thin", color="000000"),
)
REQUEST_URL_TEMPLATE = "https://my.medevio.cz/mudr-buzalkova/klinika/pozadavky?pozadavek={}"
link_font = Font(color="0563C1", underline="single")
def format_ws(ws, df):
"""Apply unified formatting to a worksheet."""
# Find Request_ID column index (1-based)
req_id_col = None
columns = list(df.columns)
if "Request_ID" in columns:
req_id_col = columns.index("Request_ID") + 1
for col_idx in range(1, len(df.columns) + 1):
col_letter = get_column_letter(col_idx)
cell = ws.cell(row=1, column=col_idx)
cell.font = Font(bold=True)
cell.alignment = Alignment(horizontal="center", vertical="center")
cell.fill = header_fill
cell.value = str(cell.value).upper()
cell.border = thin_border
ws.column_dimensions[col_letter].width = widths.get(col_idx, 20)
for r_idx, row in enumerate(ws.iter_rows(min_row=2, max_row=ws.max_row), start=2):
for cell in row:
cell.border = thin_border
if r_idx % 2 == 0:
cell.fill = alt_fill
# Add hyperlink to Request_ID cells
if req_id_col and cell.column == req_id_col and cell.value:
cell.hyperlink = REQUEST_URL_TEMPLATE.format(cell.value)
cell.font = link_font
ws.freeze_panes = "A2"
ws.auto_filter.ref = ws.dimensions
# ==================== 1️⃣ LOAD AGENDA (API) ====================
print("📡 Querying Medevio API for agenda...")
dnes = datetime.utcnow().date()
since = datetime.combine(dnes, datetime.min.time())
until = since + relativedelta(months=1)
payload = {
"operationName": "ClinicAgenda_ListClinicReservations",
"variables": {
"calendarIds": [CALENDAR_ID],
"clinicSlug": CLINIC_SLUG,
"since": since.isoformat() + "Z",
"until": until.isoformat() + "Z",
"locale": "cs",
"emptyCalendarIds": False,
},
"query": """query ClinicAgenda_ListClinicReservations(
$calendarIds: [UUID!], $clinicSlug: String!,
$locale: Locale!, $since: DateTime!, $until: DateTime!,
$emptyCalendarIds: Boolean!
) {
reservations: listClinicReservations(
clinicSlug: $clinicSlug, calendarIds: $calendarIds,
since: $since, until: $until
) @skip(if: $emptyCalendarIds) {
id start end note done color
request {
id displayTitle(locale: $locale)
extendedPatient {
name surname dob insuranceCompanyObject { shortName }
}
}
}
}""",
}
r = requests.post(GRAPHQL_URL, headers=headers, data=json.dumps(payload))
r.raise_for_status()
resp = r.json()
if "errors" in resp or "data" not in resp:
print("❌ API response:")
print(json.dumps(resp, indent=2, ensure_ascii=False))
raise SystemExit("API call failed - check token or query.")
reservations = resp["data"]["reservations"]
rows = []
for r in reservations:
req = r.get("request") or {}
patient = req.get("extendedPatient") or {}
insurance = patient.get("insuranceCompanyObject") or {}
try:
start_dt = parser.isoparse(r.get("start")).astimezone(tz.gettz("Europe/Prague"))
end_dt = parser.isoparse(r.get("end")).astimezone(tz.gettz("Europe/Prague"))
except Exception:
start_dt = end_dt = None
date_str = start_dt.strftime("%Y-%m-%d") if start_dt else ""
time_interval = (
f"{start_dt.strftime('%H:%M')}-{end_dt.strftime('%H:%M')}"
if start_dt and end_dt
else ""
)
rows.append(
{
"Date": date_str,
"Time": time_interval,
"Title": req.get("displayTitle") or "",
"Patient": f"{patient.get('surname','')} {patient.get('name','')}".strip(),
"DOB": patient.get("dob") or "",
"Insurance": insurance.get("shortName") or "",
"Note": r.get("note") or "",
"Color": r.get("color") or "",
"Request_ID": req.get("id") or "",
"Reservation_ID": r.get("id"),
}
)
df_agenda = pd.DataFrame(rows).sort_values(["Date", "Time"])
print(f"✅ Loaded {len(df_agenda)} agenda rows.")
# ==================== 2️⃣ LOAD OPEN REQUESTS (MySQL) ====================
print("📡 Loading open requests from MySQL...")
conn = pymysql.connect(**DB_CONFIG)
with conn.cursor() as cur:
cur.execute(
"""
SELECT
id AS Request_ID,
displayTitle AS Title,
pacient_prijmeni AS Pacient_Prijmeni,
pacient_jmeno AS Pacient_Jmeno,
pacient_rodnecislo AS DOB,
createdAt AS Created
FROM pozadavky
WHERE doneAt IS NULL AND removedAt IS NULL
ORDER BY createdAt DESC
"""
)
rows = cur.fetchall()
conn.close()
df_open = pd.DataFrame(rows)
if not df_open.empty:
df_open["Patient"] = (
df_open["Pacient_Prijmeni"].fillna("")
+ " "
+ df_open["Pacient_Jmeno"].fillna("")
).str.strip()
df_open["Date"] = df_open["Created"].astype(str).str[:10]
df_open["Time"] = ""
df_open["Insurance"] = ""
df_open["Note"] = ""
df_open["Color"] = ""
df_open["Reservation_ID"] = ""
df_open = df_open[
[
"Date",
"Time",
"Title",
"Patient",
"DOB",
"Insurance",
"Note",
"Color",
"Request_ID",
"Reservation_ID",
]
]
print(f"✅ Loaded {len(df_open)} open requests.")
# ==================== 3️⃣ MERGE + DEDUPLICATE ====================
print("🟢 Merging and deduplicating (Agenda preferred)...")
df_agenda["Source"] = "Agenda"
df_open["Source"] = "Open"
df_merged = pd.concat([df_agenda, df_open], ignore_index=True).fillna("")
df_merged = df_merged.sort_values(["Source"], ascending=[True])
# drop duplicates — prefer Agenda if same Request_ID or same (Patient+Title)
df_merged = df_merged.drop_duplicates(
subset=["Request_ID", "Patient", "Title"], keep="first"
)
df_merged = df_merged.drop(columns=["Source"], errors="ignore")
df_merged = df_merged.sort_values(["Date", "Time"], na_position="last").reset_index(
drop=True
)
print(f"✅ Total merged rows after deduplication: {len(df_merged)}")
# ==================== 4️⃣ WRITE BASE SHEETS ====================
with pd.ExcelWriter(xlsx_path, engine="openpyxl") as writer:
df_agenda.to_excel(writer, sheet_name="Agenda", index=False)
df_open.to_excel(writer, sheet_name="Otevřené požadavky", index=False)
df_merged.to_excel(writer, sheet_name="Merged", index=False)
wb = load_workbook(xlsx_path)
for name, df_ref in [
("Agenda", df_agenda),
("Otevřené požadavky", df_open),
("Merged", df_merged),
]:
ws = wb[name]
format_ws(ws, df_ref)
# ==================== 5️⃣ VACCINE SHEETS (from MERGED) ====================
VACCINE_SHEETS = {
"Chřipka": ["očkování", "chřipka"],
"COVID": ["očkování", "covid"],
"Pneumokok": ["očkování", "pneumo"],
"Hep A": ["očkování", "žloutenka a"],
"Hep B": ["očkování", "žloutenka b"],
"Hep A+B": ["očkování", "žloutenka a+b"],
"Klíšťovka": ["očkování", "klíšť"],
}
def kw_pattern(kw):
return rf"(?<!\w){re.escape(kw)}(?!\s*\+\s*\w)"
for sheet_name, keywords in VACCINE_SHEETS.items():
mask = pd.Series(True, index=df_merged.index)
title_series = df_merged["Title"].fillna("")
for kw in keywords:
pattern = kw_pattern(kw)
mask &= title_series.str.contains(pattern, flags=re.IGNORECASE, regex=True)
filtered_df = df_merged[mask].copy()
if filtered_df.empty:
print(f"️ No matches for '{sheet_name}'")
continue
ws_new = wb.create_sheet(title=sheet_name)
for r in dataframe_to_rows(filtered_df, index=False, header=True):
ws_new.append(r)
format_ws(ws_new, filtered_df)
print(f"🟡 Created sheet '{sheet_name}' ({len(filtered_df)} rows).")
# ==================== SAVE ====================
wb.save(xlsx_path)
print(f"📘 Exported full merged report:\n{xlsx_path}")
@@ -0,0 +1,33 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Check one request in MySQL."""
import pymysql
import json
DB_CONFIG = {
"host": "192.168.1.76",
"port": 3306,
"user": "root",
"password": "Vlado9674+",
"database": "medevio",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor,
}
REQUEST_ID = "6b46b5a8-b080-4821-86b0-39adabeec86b"
conn = pymysql.connect(**DB_CONFIG)
with conn.cursor() as cur:
cur.execute("SELECT * FROM pozadavky WHERE id = %s", (REQUEST_ID,))
row = cur.fetchone()
conn.close()
if row:
# Convert datetime objects to strings for JSON serialization
for k, v in row.items():
if hasattr(v, 'isoformat'):
row[k] = v.isoformat()
print(json.dumps(row, indent=2, ensure_ascii=False, default=str))
else:
print(f"Not found: {REQUEST_ID}")
@@ -0,0 +1,63 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Quick check: fetch one request from Medevio API and print all fields."""
import json
import requests
from pathlib import Path
TOKEN_PATH = Path(__file__).resolve().parent.parent / "token.txt"
GRAPHQL_URL = "https://api.medevio.cz/graphql"
CLINIC_SLUG = "mudr-buzalkova"
REQUEST_ID = "6b46b5a8-b080-4821-86b0-39adabeec86b"
token = TOKEN_PATH.read_text(encoding="utf-8").strip()
headers = {
"content-type": "application/json",
"authorization": f"Bearer {token}",
"origin": "https://my.medevio.cz",
"referer": "https://my.medevio.cz/",
}
# Query with as many fields as possible
QUERY = """
query GetPatientRequest2($requestId: UUID!, $clinicSlug: String!, $locale: Locale!) {
request: getPatientRequest2(patientRequestId: $requestId, clinicSlug: $clinicSlug) {
id
displayTitle(locale: $locale)
createdAt
updatedAt
doneAt
removedAt
userNote
eventType
extendedPatient(clinicSlug: $clinicSlug) {
name
surname
dob
identificationNumber
insuranceCompanyObject { shortName }
}
ecrfFilledData(locale: $locale) {
name
groups {
label
fields { name label type value }
}
}
}
}
"""
payload = {
"operationName": "GetPatientRequest2",
"query": QUERY,
"variables": {
"requestId": REQUEST_ID,
"clinicSlug": CLINIC_SLUG,
"locale": "cs",
},
}
r = requests.post(GRAPHQL_URL, json=payload, headers=headers, timeout=30)
print(json.dumps(r.json(), indent=2, ensure_ascii=False))
@@ -0,0 +1 @@
{"cookies": [{"name": "gateway-access-token", "value": "YwBgkf8McREDKs7vCZj0EZD2fJsuV8RyDPtYx7WiDoz0nFJ9kxId8kcNEPBLFSwM+Tiz80+SOdFwo+oj", "domain": "my.medevio.cz", "path": "/", "expires": 1763372319, "httpOnly": false, "secure": false, "sameSite": "Lax"}, {"name": "aws-waf-token", "value": "b6a1d4eb-4350-40e5-8e52-1f5f9600fbb8:CgoAr9pC8c6zAAAA:OYwXLY5OyitSQPl5v2oIlS+hIxsrb5LxV4VjCyE2gJCFFE5PQu+0Zbxse2ZIofrNv5QKs0TYUDTmxPhZyTr9Qtjnq2gsVQxWHXzrbebv3Z7RbzB63u6Ymn3Fo8IbDev3CfCNcNuxCKltFEXLqSCjI2vqNY+7HZkgQBIqy2wMgzli3aSLq0w8lWYtZzyyot7q8RPXWMGTfaBUo2reY0SOSffm9rAivE9PszNfPid71CvNrGAAoxRbwb25eVujlyIcDVWe5vZ9Iw==", "domain": ".my.medevio.cz", "path": "/", "expires": 1761125920, "httpOnly": false, "secure": true, "sameSite": "Lax"}], "origins": [{"origin": "https://my.medevio.cz", "localStorage": [{"name": "awswaf_token_refresh_timestamp", "value": "1760780309860"}, {"name": "awswaf_session_storage", "value": "b6a1d4eb-4350-40e5-8e52-1f5f9600fbb8:CgoAr9pC8c+zAAAA:+vw//1NzmePjPpbGCJzUB+orCRivtJd098DbDX4AnABiGRw/+ql6ShqvFY4YdCY7w2tegb5mEPBdAmc4sNi22kNR9BuEoAgCUiMhkU1AZWfzM51zPfTh7SveCrREZ7xdvxcqKPMmfVLRYX5E4+UWh22z/LKQ7+d9VERp3J+wWCUW3dFFirkezy3N7b2FVjTlY/RxsZwhejQziTG/L3CkIFFP3mOReNgBvDpj7aKoM1knY4IL4TZ8E7zNv3nTsvzACLYvnUutVOUcofN1TfOzwZshSKsEXsMzrQn8PzLccX1jM5VSzce7gfEzl0zSPsT8NB3Sna+rhMIttDNYgvbW1HsfG2LIeKMR27Zf8hkslDRVVkcU/Kp2jLOEdhhrBKGjKY2o9/uX3NExdzh5MEKQSSRtmue01BpWYILPH23rMsz4YSmF+Ough5OeQoC95rkcYwVXMhwvUN9Zfp9UZ4xCNfFUex5dOrg9aJntYRnaceeocGUttNI5AdT0i3+osV6XHXzKxeqO8zLCS9BIsCzxaHfdqqem5DorMceuGKz+QqksatIQAA=="}, {"name": "Application.Intl.locale", "value": "cs"}, {"name": "Password.prefill", "value": "{\"username\":\"vladimir.buzalka@buzalka.cz\",\"type\":\"email\"}"}]}]}
@@ -0,0 +1,176 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Sync open requests: checks each request marked as open in MySQL (doneAt IS NULL
AND removedAt IS NULL) against the Medevio API. If the API shows the request is
closed (doneAt) or removed (removedAt), updates MySQL accordingly.
"""
import json
import sys
import time
import requests
import pymysql
from pathlib import Path
from datetime import datetime
# ==============================
# UTF-8 output (Windows friendly)
# ==============================
try:
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
except AttributeError:
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
# ==============================
# DRY RUN - set to True to only print what would be updated, False to actually update
# ==============================
DRY_RUN = False
# ==============================
# CONFIG
# ==============================
GRAPHQL_URL = "https://api.medevio.cz/graphql"
CLINIC_SLUG = "mudr-buzalkova"
TOKEN_PATH = Path(__file__).resolve().parent.parent / "token.txt"
gateway_token = TOKEN_PATH.read_text(encoding="utf-8").strip()
headers = {
"content-type": "application/json",
"authorization": f"Bearer {gateway_token}",
"origin": "https://my.medevio.cz",
"referer": "https://my.medevio.cz/",
}
DB_CONFIG = {
"host": "192.168.1.76",
"port": 3306,
"user": "root",
"password": "Vlado9674+",
"database": "medevio",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor,
}
GRAPHQL_QUERY = """
query GetPatientRequest2($requestId: UUID!, $clinicSlug: String!) {
request: getPatientRequest2(patientRequestId: $requestId, clinicSlug: $clinicSlug) {
id
doneAt
removedAt
updatedAt
}
}
"""
def fix_datetime(dt_str):
if not dt_str:
return None
try:
return datetime.fromisoformat(dt_str.replace("Z", "+00:00"))
except Exception:
return None
def fetch_request(request_id):
payload = {
"operationName": "GetPatientRequest2",
"query": GRAPHQL_QUERY,
"variables": {
"requestId": request_id,
"clinicSlug": CLINIC_SLUG,
},
}
for attempt in range(3):
try:
r = requests.post(GRAPHQL_URL, json=payload, headers=headers, timeout=30)
break
except (requests.ConnectionError, requests.Timeout, requests.exceptions.RequestException) as e:
print(f" ⚠️ Attempt {attempt+1}/3 failed: {e}")
time.sleep(2)
else:
print(f" ❌ Connection failed after 3 attempts for {request_id}")
return None
if r.status_code != 200:
print(f" ❌ HTTP {r.status_code} for {request_id}")
return None
data = r.json()
if "errors" in data:
print(f" ❌ API error for {request_id}: {data['errors']}")
return None
return data.get("data", {}).get("request")
# ==============================
# MAIN
# ==============================
conn = pymysql.connect(**DB_CONFIG)
# 1) Read all open requests from MySQL
with conn.cursor() as cur:
cur.execute(
"SELECT id, displayTitle, pacient_prijmeni, pacient_jmeno "
"FROM pozadavky WHERE doneAt IS NULL AND removedAt IS NULL"
)
open_requests = cur.fetchall()
mode = "DRY RUN" if DRY_RUN else "LIVE"
print(f"🔧 Mode: {mode}")
print(f"📋 Found {len(open_requests)} open requests in MySQL.\n")
updated = 0
errors = 0
for i, req in enumerate(open_requests, 1):
rid = req["id"]
name = f"{req.get('pacient_prijmeni', '')} {req.get('pacient_jmeno', '')}".strip()
title = req.get("displayTitle", "")
print(f"[{i}/{len(open_requests)}] {name} {title} ({rid})")
api_data = fetch_request(rid)
if api_data is None:
errors += 1
continue
api_done = api_data.get("doneAt")
api_removed = api_data.get("removedAt")
api_updated = api_data.get("updatedAt")
if api_done or api_removed:
done_dt = fix_datetime(api_done)
removed_dt = fix_datetime(api_removed)
updated_dt = fix_datetime(api_updated)
status = "DONE" if api_done else "REMOVED"
if DRY_RUN:
print(f" 🔍 Would update → {status} (doneAt={api_done}, removedAt={api_removed})")
else:
with conn.cursor() as cur:
cur.execute(
"UPDATE pozadavky SET doneAt = %s, removedAt = %s, updatedAt = %s WHERE id = %s",
(done_dt, removed_dt, updated_dt, rid),
)
conn.commit()
print(f" ✅ Updated → {status}")
updated += 1
else:
print(f" ⏳ Still open")
# Be gentle with the API
time.sleep(1)
conn.close()
print(f"\n{'='*50}")
print(f"📊 Total open in MySQL: {len(open_requests)}")
print(f"✅ Updated (closed/removed): {updated}")
print(f"⏳ Still open: {len(open_requests) - updated - errors}")
print(f"❌ Errors: {errors}")