Files
medevio/40 agenda a požadavky/871 test.py
2025-12-07 12:11:50 +01:00

302 lines
9.6 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Full Medevio Report:
- Agenda (API, next 30 days)
- Otevřené požadavky (MySQL)
- Merged (Agenda + Open, deduplicated)
- Vaccine sheets (from merged data)
"""
import re
import json
import pymysql
import requests
import pandas as pd
from pathlib import Path
from datetime import datetime
from dateutil import parser, tz
from dateutil.relativedelta import relativedelta
from openpyxl import load_workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
from openpyxl.utils import get_column_letter
from openpyxl.utils.dataframe import dataframe_to_rows
# ==================== CONFIG ====================
GRAPHQL_URL = "https://api.medevio.cz/graphql"
CALENDAR_ID = "144c4e12-347c-49ca-9ec0-8ca965a4470d"
CLINIC_SLUG = "mudr-buzalkova"
DB_CONFIG = {
"host": "192.168.1.76",
"port": 3307,
"user": "root",
"password": "Vlado9674+",
"database": "medevio",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor,
}
EXPORT_DIR = Path(r"z:\Dropbox\Ordinace\Reporty")
EXPORT_DIR.mkdir(exist_ok=True, parents=True)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
xlsx_path = EXPORT_DIR / f"{timestamp} Agenda + Pozadavky (Merged).xlsx"
# ==================== LOAD TOKEN ====================
def load_gateway_token(storage_path="medevio_storage.json"):
path = Path(storage_path)
if not path.exists():
raise SystemExit(f"❌ Storage file not found: {path}")
with path.open("r", encoding="utf-8") as f:
state = json.load(f)
token = next(
(c["value"] for c in state["cookies"] if c["name"] == "gateway-access-token"),
None,
)
if not token:
raise SystemExit("❌ gateway-access-token not found in storage file.")
return token
gateway_token = load_gateway_token()
headers = {
"content-type": "application/json",
"authorization": f"Bearer {gateway_token}",
"origin": "https://my.medevio.cz",
"referer": "https://my.medevio.cz/",
}
# ==================== STYLING ====================
widths = {1: 11, 2: 13, 3: 45, 4: 30, 5: 15, 6: 15, 7: 30, 8: 15, 9: 37, 10: 37}
header_fill = PatternFill("solid", fgColor="FFFF00")
alt_fill = PatternFill("solid", fgColor="F2F2F2")
thin_border = Border(
left=Side(style="thin", color="000000"),
right=Side(style="thin", color="000000"),
top=Side(style="thin", color="000000"),
bottom=Side(style="thin", color="000000"),
)
def format_ws(ws, df):
"""Apply unified formatting to a worksheet."""
for col_idx in range(1, len(df.columns) + 1):
col_letter = get_column_letter(col_idx)
cell = ws.cell(row=1, column=col_idx)
cell.font = Font(bold=True)
cell.alignment = Alignment(horizontal="center", vertical="center")
cell.fill = header_fill
cell.value = str(cell.value).upper()
cell.border = thin_border
ws.column_dimensions[col_letter].width = widths.get(col_idx, 20)
for r_idx, row in enumerate(ws.iter_rows(min_row=2, max_row=ws.max_row), start=2):
for cell in row:
cell.border = thin_border
if r_idx % 2 == 0:
cell.fill = alt_fill
ws.freeze_panes = "A2"
# ==================== 1⃣ LOAD AGENDA (API) ====================
print("📡 Querying Medevio API for agenda...")
dnes = datetime.utcnow().date()
since = datetime.combine(dnes, datetime.min.time())
until = since + relativedelta(months=1)
payload = {
"operationName": "ClinicAgenda_ListClinicReservations",
"variables": {
"calendarIds": [CALENDAR_ID],
"clinicSlug": CLINIC_SLUG,
"since": since.isoformat() + "Z",
"until": until.isoformat() + "Z",
"locale": "cs",
"emptyCalendarIds": False,
},
"query": """query ClinicAgenda_ListClinicReservations(
$calendarIds: [UUID!], $clinicSlug: String!,
$locale: Locale!, $since: DateTime!, $until: DateTime!,
$emptyCalendarIds: Boolean!
) {
reservations: listClinicReservations(
clinicSlug: $clinicSlug, calendarIds: $calendarIds,
since: $since, until: $until
) @skip(if: $emptyCalendarIds) {
id start end note done color
request {
id displayTitle(locale: $locale)
extendedPatient {
name surname dob insuranceCompanyObject { shortName }
}
}
}
}""",
}
r = requests.post(GRAPHQL_URL, headers=headers, data=json.dumps(payload))
r.raise_for_status()
reservations = r.json()["data"]["reservations"]
rows = []
for r in reservations:
req = r.get("request") or {}
patient = req.get("extendedPatient") or {}
insurance = patient.get("insuranceCompanyObject") or {}
try:
start_dt = parser.isoparse(r.get("start")).astimezone(tz.gettz("Europe/Prague"))
end_dt = parser.isoparse(r.get("end")).astimezone(tz.gettz("Europe/Prague"))
except Exception:
start_dt = end_dt = None
date_str = start_dt.strftime("%Y-%m-%d") if start_dt else ""
time_interval = (
f"{start_dt.strftime('%H:%M')}-{end_dt.strftime('%H:%M')}"
if start_dt and end_dt
else ""
)
rows.append(
{
"Date": date_str,
"Time": time_interval,
"Title": req.get("displayTitle") or "",
"Patient": f"{patient.get('surname','')} {patient.get('name','')}".strip(),
"DOB": patient.get("dob") or "",
"Insurance": insurance.get("shortName") or "",
"Note": r.get("note") or "",
"Color": r.get("color") or "",
"Request_ID": req.get("id") or "",
"Reservation_ID": r.get("id"),
}
)
df_agenda = pd.DataFrame(rows).sort_values(["Date", "Time"])
print(f"✅ Loaded {len(df_agenda)} agenda rows.")
# ==================== 2⃣ LOAD OPEN REQUESTS (MySQL) ====================
print("📡 Loading open requests from MySQL...")
conn = pymysql.connect(**DB_CONFIG)
with conn.cursor() as cur:
cur.execute(
"""
SELECT
id AS Request_ID,
displayTitle AS Title,
pacient_prijmeni AS Pacient_Prijmeni,
pacient_jmeno AS Pacient_Jmeno,
pacient_rodnecislo AS DOB,
createdAt AS Created
FROM pozadavky
WHERE doneAt IS NULL AND removedAt IS NULL
ORDER BY createdAt DESC
"""
)
rows = cur.fetchall()
conn.close()
df_open = pd.DataFrame(rows)
if not df_open.empty:
df_open["Patient"] = (
df_open["Pacient_Prijmeni"].fillna("")
+ " "
+ df_open["Pacient_Jmeno"].fillna("")
).str.strip()
df_open["Date"] = df_open["Created"].astype(str).str[:10]
df_open["Time"] = ""
df_open["Insurance"] = ""
df_open["Note"] = ""
df_open["Color"] = ""
df_open["Reservation_ID"] = ""
df_open = df_open[
[
"Date",
"Time",
"Title",
"Patient",
"DOB",
"Insurance",
"Note",
"Color",
"Request_ID",
"Reservation_ID",
]
]
print(f"✅ Loaded {len(df_open)} open requests.")
# ==================== 3⃣ MERGE + DEDUPLICATE ====================
print("🟢 Merging and deduplicating (Agenda preferred)...")
df_agenda["Source"] = "Agenda"
df_open["Source"] = "Open"
df_merged = pd.concat([df_agenda, df_open], ignore_index=True).fillna("")
df_merged = df_merged.sort_values(["Source"], ascending=[True])
# drop duplicates — prefer Agenda if same Request_ID or same (Patient+Title)
df_merged = df_merged.drop_duplicates(
subset=["Request_ID", "Patient", "Title"], keep="first"
)
df_merged = df_merged.drop(columns=["Source"], errors="ignore")
df_merged = df_merged.sort_values(["Date", "Time"], na_position="last").reset_index(
drop=True
)
print(f"✅ Total merged rows after deduplication: {len(df_merged)}")
# ==================== 4⃣ WRITE BASE SHEETS ====================
with pd.ExcelWriter(xlsx_path, engine="openpyxl") as writer:
df_agenda.to_excel(writer, sheet_name="Agenda", index=False)
df_open.to_excel(writer, sheet_name="Otevřené požadavky", index=False)
df_merged.to_excel(writer, sheet_name="Merged", index=False)
wb = load_workbook(xlsx_path)
for name, df_ref in [
("Agenda", df_agenda),
("Otevřené požadavky", df_open),
("Merged", df_merged),
]:
ws = wb[name]
format_ws(ws, df_ref)
# ==================== 5⃣ VACCINE SHEETS (from MERGED) ====================
VACCINE_SHEETS = {
"Chřipka": ["očkování", "chřipka"],
"COVID": ["očkování", "covid"],
"Pneumokok": ["očkování", "pneumo"],
"Hep A": ["očkování", "žloutenka a"],
"Hep B": ["očkování", "žloutenka b"],
"Hep A+B": ["očkování", "žloutenka a+b"],
"Klíšťovka": ["očkování", "klíšť"],
}
def kw_pattern(kw):
return rf"(?<!\w){re.escape(kw)}(?!\s*\+\s*\w)"
for sheet_name, keywords in VACCINE_SHEETS.items():
mask = pd.Series(True, index=df_merged.index)
title_series = df_merged["Title"].fillna("")
for kw in keywords:
pattern = kw_pattern(kw)
mask &= title_series.str.contains(pattern, flags=re.IGNORECASE, regex=True)
filtered_df = df_merged[mask].copy()
if filtered_df.empty:
print(f" No matches for '{sheet_name}'")
continue
ws_new = wb.create_sheet(title=sheet_name)
for r in dataframe_to_rows(filtered_df, index=False, header=True):
ws_new.append(r)
format_ws(ws_new, filtered_df)
print(f"🟡 Created sheet '{sheet_name}' ({len(filtered_df)} rows).")
# ==================== SAVE ====================
wb.save(xlsx_path)
print(f"📘 Exported full merged report:\n{xlsx_path}")