#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Full Medevio Report: - Agenda (API, next 30 days) - Otevřené požadavky (MySQL) - Merged (Agenda + Open, deduplicated) - Vaccine sheets (from merged data) """ import re import json import pymysql import requests import pandas as pd from pathlib import Path from datetime import datetime from dateutil import parser, tz from dateutil.relativedelta import relativedelta from openpyxl import load_workbook from openpyxl.styles import Font, Alignment, PatternFill, Border, Side from openpyxl.utils import get_column_letter from openpyxl.utils.dataframe import dataframe_to_rows # ==================== CONFIG ==================== GRAPHQL_URL = "https://api.medevio.cz/graphql" CALENDAR_ID = "144c4e12-347c-49ca-9ec0-8ca965a4470d" CLINIC_SLUG = "mudr-buzalkova" DB_CONFIG = { "host": "192.168.1.76", "port": 3307, "user": "root", "password": "Vlado9674+", "database": "medevio", "charset": "utf8mb4", "cursorclass": pymysql.cursors.DictCursor, } EXPORT_DIR = Path(r"u:\Dropbox\Ordinace\Reporty") EXPORT_DIR.mkdir(exist_ok=True, parents=True) timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") xlsx_path = EXPORT_DIR / f"{timestamp} Agenda + Pozadavky (Merged).xlsx" # ==================== LOAD TOKEN ==================== def load_gateway_token(storage_path="medevio_storage.json"): path = Path(storage_path) if not path.exists(): raise SystemExit(f"❌ Storage file not found: {path}") with path.open("r", encoding="utf-8") as f: state = json.load(f) token = next( (c["value"] for c in state["cookies"] if c["name"] == "gateway-access-token"), None, ) if not token: raise SystemExit("❌ gateway-access-token not found in storage file.") return token gateway_token = load_gateway_token() headers = { "content-type": "application/json", "authorization": f"Bearer {gateway_token}", "origin": "https://my.medevio.cz", "referer": "https://my.medevio.cz/", } # ==================== STYLING ==================== widths = {1: 11, 2: 13, 3: 45, 4: 30, 5: 15, 6: 15, 7: 30, 8: 15, 9: 37, 10: 37} header_fill = PatternFill("solid", fgColor="FFFF00") alt_fill = PatternFill("solid", fgColor="F2F2F2") thin_border = Border( left=Side(style="thin", color="000000"), right=Side(style="thin", color="000000"), top=Side(style="thin", color="000000"), bottom=Side(style="thin", color="000000"), ) def format_ws(ws, df): """Apply unified formatting to a worksheet.""" for col_idx in range(1, len(df.columns) + 1): col_letter = get_column_letter(col_idx) cell = ws.cell(row=1, column=col_idx) cell.font = Font(bold=True) cell.alignment = Alignment(horizontal="center", vertical="center") cell.fill = header_fill cell.value = str(cell.value).upper() cell.border = thin_border ws.column_dimensions[col_letter].width = widths.get(col_idx, 20) for r_idx, row in enumerate(ws.iter_rows(min_row=2, max_row=ws.max_row), start=2): for cell in row: cell.border = thin_border if r_idx % 2 == 0: cell.fill = alt_fill ws.freeze_panes = "A2" # ==================== 1️⃣ LOAD AGENDA (API) ==================== print("📡 Querying Medevio API for agenda...") dnes = datetime.utcnow().date() since = datetime.combine(dnes, datetime.min.time()) until = since + relativedelta(months=1) payload = { "operationName": "ClinicAgenda_ListClinicReservations", "variables": { "calendarIds": [CALENDAR_ID], "clinicSlug": CLINIC_SLUG, "since": since.isoformat() + "Z", "until": until.isoformat() + "Z", "locale": "cs", "emptyCalendarIds": False, }, "query": """query ClinicAgenda_ListClinicReservations( $calendarIds: [UUID!], $clinicSlug: String!, $locale: Locale!, $since: DateTime!, $until: DateTime!, $emptyCalendarIds: Boolean! ) { reservations: listClinicReservations( clinicSlug: $clinicSlug, calendarIds: $calendarIds, since: $since, until: $until ) @skip(if: $emptyCalendarIds) { id start end note done color request { id displayTitle(locale: $locale) extendedPatient { name surname dob insuranceCompanyObject { shortName } } } } }""", } r = requests.post(GRAPHQL_URL, headers=headers, data=json.dumps(payload)) r.raise_for_status() reservations = r.json()["data"]["reservations"] rows = [] for r in reservations: req = r.get("request") or {} patient = req.get("extendedPatient") or {} insurance = patient.get("insuranceCompanyObject") or {} try: start_dt = parser.isoparse(r.get("start")).astimezone(tz.gettz("Europe/Prague")) end_dt = parser.isoparse(r.get("end")).astimezone(tz.gettz("Europe/Prague")) except Exception: start_dt = end_dt = None date_str = start_dt.strftime("%Y-%m-%d") if start_dt else "" time_interval = ( f"{start_dt.strftime('%H:%M')}-{end_dt.strftime('%H:%M')}" if start_dt and end_dt else "" ) rows.append( { "Date": date_str, "Time": time_interval, "Title": req.get("displayTitle") or "", "Patient": f"{patient.get('surname','')} {patient.get('name','')}".strip(), "DOB": patient.get("dob") or "", "Insurance": insurance.get("shortName") or "", "Note": r.get("note") or "", "Color": r.get("color") or "", "Request_ID": req.get("id") or "", "Reservation_ID": r.get("id"), } ) df_agenda = pd.DataFrame(rows).sort_values(["Date", "Time"]) print(f"✅ Loaded {len(df_agenda)} agenda rows.") # ==================== 2️⃣ LOAD OPEN REQUESTS (MySQL) ==================== print("📡 Loading open requests from MySQL...") conn = pymysql.connect(**DB_CONFIG) with conn.cursor() as cur: cur.execute( """ SELECT id AS Request_ID, displayTitle AS Title, pacient_prijmeni AS Pacient_Prijmeni, pacient_jmeno AS Pacient_Jmeno, pacient_rodnecislo AS DOB, createdAt AS Created FROM pozadavky WHERE doneAt IS NULL AND removedAt IS NULL ORDER BY createdAt DESC """ ) rows = cur.fetchall() conn.close() df_open = pd.DataFrame(rows) if not df_open.empty: df_open["Patient"] = ( df_open["Pacient_Prijmeni"].fillna("") + " " + df_open["Pacient_Jmeno"].fillna("") ).str.strip() df_open["Date"] = df_open["Created"].astype(str).str[:10] df_open["Time"] = "" df_open["Insurance"] = "" df_open["Note"] = "" df_open["Color"] = "" df_open["Reservation_ID"] = "" df_open = df_open[ [ "Date", "Time", "Title", "Patient", "DOB", "Insurance", "Note", "Color", "Request_ID", "Reservation_ID", ] ] print(f"✅ Loaded {len(df_open)} open requests.") # ==================== 3️⃣ MERGE + DEDUPLICATE ==================== print("🟢 Merging and deduplicating (Agenda preferred)...") df_agenda["Source"] = "Agenda" df_open["Source"] = "Open" df_merged = pd.concat([df_agenda, df_open], ignore_index=True).fillna("") df_merged = df_merged.sort_values(["Source"], ascending=[True]) # drop duplicates — prefer Agenda if same Request_ID or same (Patient+Title) df_merged = df_merged.drop_duplicates( subset=["Request_ID", "Patient", "Title"], keep="first" ) df_merged = df_merged.drop(columns=["Source"], errors="ignore") df_merged = df_merged.sort_values(["Date", "Time"], na_position="last").reset_index( drop=True ) print(f"✅ Total merged rows after deduplication: {len(df_merged)}") # ==================== 4️⃣ WRITE BASE SHEETS ==================== with pd.ExcelWriter(xlsx_path, engine="openpyxl") as writer: df_agenda.to_excel(writer, sheet_name="Agenda", index=False) df_open.to_excel(writer, sheet_name="Otevřené požadavky", index=False) df_merged.to_excel(writer, sheet_name="Merged", index=False) wb = load_workbook(xlsx_path) for name, df_ref in [ ("Agenda", df_agenda), ("Otevřené požadavky", df_open), ("Merged", df_merged), ]: ws = wb[name] format_ws(ws, df_ref) # ==================== 5️⃣ VACCINE SHEETS (from MERGED) ==================== VACCINE_SHEETS = { "Chřipka": ["očkování", "chřipka"], "COVID": ["očkování", "covid"], "Pneumokok": ["očkování", "pneumo"], "Hep A": ["očkování", "žloutenka a"], "Hep B": ["očkování", "žloutenka b"], "Hep A+B": ["očkování", "žloutenka a+b"], "Klíšťovka": ["očkování", "klíšť"], } def kw_pattern(kw): return rf"(?