Files
ordinaceprojekt/Medevio/40 agenda a požadavky/Report_AgendaPozadavky.py
T
administrator f2b3ebe0b4 Report_AgendaPozadavky: fix deduplication, UTF-8 output, export path
- Fix deduplication: deduplicate only by Request_ID (not Patient+Title combo)
- Reservations without Request_ID are kept as-is
- Add UTF-8 stdout fix for Windows console (emoji support)
- Change export dir to u:\Dropbox\!!!Days\Downloads Z230
- Rename response variable r → response to avoid collision with loop variable

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-13 13:17:26 +02:00

326 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Full Medevio Report:
- Agenda (API, next 30 days)
- Otevřené požadavky (MySQL)
- Merged (Agenda + Open, deduplicated)
- Vaccine sheets (from merged data)
"""
import sys
try:
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
except AttributeError:
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
import re
import json
import pymysql
import requests
import pandas as pd
from pathlib import Path
from datetime import datetime
from dateutil import parser, tz
from dateutil.relativedelta import relativedelta
from openpyxl import load_workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
from openpyxl.utils import get_column_letter
from openpyxl.utils.dataframe import dataframe_to_rows
# ==================== CONFIG ====================
GRAPHQL_URL = "https://api.medevio.cz/graphql"
CALENDAR_ID = "144c4e12-347c-49ca-9ec0-8ca965a4470d"
CLINIC_SLUG = "mudr-buzalkova"
DB_CONFIG = {
"host": "192.168.1.76",
"port": 3306,
"user": "root",
"password": "Vlado9674+",
"database": "medevio",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor,
}
EXPORT_DIR = Path(r"u:\Dropbox\!!!Days\Downloads Z230")
EXPORT_DIR.mkdir(exist_ok=True, parents=True)
# Delete previous reports
for old in EXPORT_DIR.glob("* Agenda + Požadavky.xlsx"):
old.unlink()
print(f"🗑️ Deleted old report: {old.name}")
timestamp = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
xlsx_path = EXPORT_DIR / f"{timestamp} Agenda + Požadavky.xlsx"
# ==================== LOAD TOKEN ====================
TOKEN_PATH = Path(__file__).resolve().parent.parent / "token.txt"
gateway_token = TOKEN_PATH.read_text(encoding="utf-8").strip()
headers = {
"content-type": "application/json",
"authorization": f"Bearer {gateway_token}",
"origin": "https://my.medevio.cz",
"referer": "https://my.medevio.cz/",
}
# ==================== STYLING ====================
widths = {1: 11, 2: 13, 3: 45, 4: 30, 5: 15, 6: 15, 7: 30, 8: 15, 9: 37, 10: 37}
header_fill = PatternFill("solid", fgColor="FFFF00")
alt_fill = PatternFill("solid", fgColor="F2F2F2")
thin_border = Border(
left=Side(style="thin", color="000000"),
right=Side(style="thin", color="000000"),
top=Side(style="thin", color="000000"),
bottom=Side(style="thin", color="000000"),
)
REQUEST_URL_TEMPLATE = "https://my.medevio.cz/mudr-buzalkova/klinika/pozadavky?pozadavek={}"
link_font = Font(color="0563C1", underline="single")
def format_ws(ws, df):
"""Apply unified formatting to a worksheet."""
# Find Request_ID column index (1-based)
req_id_col = None
columns = list(df.columns)
if "Request_ID" in columns:
req_id_col = columns.index("Request_ID") + 1
for col_idx in range(1, len(df.columns) + 1):
col_letter = get_column_letter(col_idx)
cell = ws.cell(row=1, column=col_idx)
cell.font = Font(bold=True)
cell.alignment = Alignment(horizontal="center", vertical="center")
cell.fill = header_fill
cell.value = str(cell.value).upper()
cell.border = thin_border
ws.column_dimensions[col_letter].width = widths.get(col_idx, 20)
for r_idx, row in enumerate(ws.iter_rows(min_row=2, max_row=ws.max_row), start=2):
for cell in row:
cell.border = thin_border
if r_idx % 2 == 0:
cell.fill = alt_fill
# Add hyperlink to Request_ID cells
if req_id_col and cell.column == req_id_col and cell.value:
cell.hyperlink = REQUEST_URL_TEMPLATE.format(cell.value)
cell.font = link_font
ws.freeze_panes = "A2"
ws.auto_filter.ref = ws.dimensions
# ==================== 1️⃣ LOAD AGENDA (API) ====================
print("📡 Querying Medevio API for agenda...")
dnes = datetime.utcnow().date()
since = datetime.combine(dnes, datetime.min.time())
until = since + relativedelta(months=1)
payload = {
"operationName": "ClinicAgenda_ListClinicReservations",
"variables": {
"calendarIds": [CALENDAR_ID],
"clinicSlug": CLINIC_SLUG,
"since": since.isoformat() + "Z",
"until": until.isoformat() + "Z",
"locale": "cs",
"emptyCalendarIds": False,
},
"query": """query ClinicAgenda_ListClinicReservations(
$calendarIds: [UUID!], $clinicSlug: String!,
$locale: Locale!, $since: DateTime!, $until: DateTime!,
$emptyCalendarIds: Boolean!
) {
reservations: listClinicReservations(
clinicSlug: $clinicSlug, calendarIds: $calendarIds,
since: $since, until: $until
) @skip(if: $emptyCalendarIds) {
id start end note done color
request {
id displayTitle(locale: $locale)
extendedPatient {
name surname dob insuranceCompanyObject { shortName }
}
}
}
}""",
}
response = requests.post(GRAPHQL_URL, headers=headers, data=json.dumps(payload))
response.raise_for_status()
resp = response.json()
if "errors" in resp or "data" not in resp:
print("❌ API response:")
print(json.dumps(resp, indent=2, ensure_ascii=False))
raise SystemExit("API call failed - check token or query.")
reservations = resp["data"]["reservations"]
rows = []
for r in reservations:
req = r.get("request") or {}
patient = req.get("extendedPatient") or {}
insurance = patient.get("insuranceCompanyObject") or {}
try:
start_dt = parser.isoparse(r.get("start")).astimezone(tz.gettz("Europe/Prague"))
end_dt = parser.isoparse(r.get("end")).astimezone(tz.gettz("Europe/Prague"))
except Exception:
start_dt = end_dt = None
date_str = start_dt.strftime("%Y-%m-%d") if start_dt else ""
time_interval = (
f"{start_dt.strftime('%H:%M')}-{end_dt.strftime('%H:%M')}"
if start_dt and end_dt
else ""
)
rows.append(
{
"Date": date_str,
"Time": time_interval,
"Title": req.get("displayTitle") or "",
"Patient": f"{patient.get('surname','')} {patient.get('name','')}".strip(),
"DOB": patient.get("dob") or "",
"Insurance": insurance.get("shortName") or "",
"Note": r.get("note") or "",
"Color": r.get("color") or "",
"Request_ID": req.get("id") or "",
"Reservation_ID": r.get("id"),
}
)
df_agenda = pd.DataFrame(rows).sort_values(["Date", "Time"])
print(f"✅ Loaded {len(df_agenda)} agenda rows.")
# ==================== 2️⃣ LOAD OPEN REQUESTS (MySQL) ====================
print("📡 Loading open requests from MySQL...")
conn = pymysql.connect(**DB_CONFIG)
with conn.cursor() as cur:
cur.execute(
"""
SELECT
id AS Request_ID,
displayTitle AS Title,
pacient_prijmeni AS Pacient_Prijmeni,
pacient_jmeno AS Pacient_Jmeno,
pacient_rodnecislo AS DOB,
createdAt AS Created
FROM pozadavky
WHERE doneAt IS NULL AND removedAt IS NULL
ORDER BY createdAt DESC
"""
)
rows = cur.fetchall()
conn.close()
df_open = pd.DataFrame(rows)
if not df_open.empty:
df_open["Patient"] = (
df_open["Pacient_Prijmeni"].fillna("")
+ " "
+ df_open["Pacient_Jmeno"].fillna("")
).str.strip()
df_open["Date"] = df_open["Created"].astype(str).str[:10]
df_open["Time"] = ""
df_open["Insurance"] = ""
df_open["Note"] = ""
df_open["Color"] = ""
df_open["Reservation_ID"] = ""
df_open = df_open[
[
"Date",
"Time",
"Title",
"Patient",
"DOB",
"Insurance",
"Note",
"Color",
"Request_ID",
"Reservation_ID",
]
]
print(f"✅ Loaded {len(df_open)} open requests.")
# ==================== 3️⃣ MERGE + DEDUPLICATE ====================
print("🟢 Merging and deduplicating (Agenda preferred)...")
df_agenda["Source"] = "Agenda"
df_open["Source"] = "Open"
df_merged = pd.concat([df_agenda, df_open], ignore_index=True).fillna("")
# "Agenda" < "Open" alphabetically → Agenda rows come first after sort
df_merged = df_merged.sort_values(["Source"], ascending=[True])
# Deduplicate by Request_ID only where non-empty (Agenda preferred over Open)
# Rows without Request_ID (pure reservations) are always kept as-is
mask_has_id = df_merged["Request_ID"] != ""
df_with_id = df_merged[mask_has_id].drop_duplicates(subset=["Request_ID"], keep="first")
df_no_id = df_merged[~mask_has_id]
df_merged = pd.concat([df_with_id, df_no_id], ignore_index=True)
df_merged = df_merged.drop(columns=["Source"], errors="ignore")
df_merged = df_merged.sort_values(["Date", "Time"], na_position="last").reset_index(
drop=True
)
print(f"✅ Total merged rows after deduplication: {len(df_merged)}")
# ==================== 4️⃣ WRITE BASE SHEETS ====================
with pd.ExcelWriter(xlsx_path, engine="openpyxl") as writer:
df_agenda.to_excel(writer, sheet_name="Agenda", index=False)
df_open.to_excel(writer, sheet_name="Otevřené požadavky", index=False)
df_merged.to_excel(writer, sheet_name="Merged", index=False)
wb = load_workbook(xlsx_path)
for name, df_ref in [
("Agenda", df_agenda),
("Otevřené požadavky", df_open),
("Merged", df_merged),
]:
ws = wb[name]
format_ws(ws, df_ref)
# ==================== 5️⃣ VACCINE SHEETS (from MERGED) ====================
VACCINE_SHEETS = {
"Chřipka": ["očkování", "chřipka"],
"COVID": ["očkování", "covid"],
"Pneumokok": ["očkování", "pneumo"],
"Hep A": ["očkování", "žloutenka a"],
"Hep B": ["očkování", "žloutenka b"],
"Hep A+B": ["očkování", "žloutenka a+b"],
"Klíšťovka": ["očkování", "klíšť"],
}
def kw_pattern(kw):
return rf"(?<!\w){re.escape(kw)}(?!\s*\+\s*\w)"
for sheet_name, keywords in VACCINE_SHEETS.items():
mask = pd.Series(True, index=df_merged.index)
title_series = df_merged["Title"].fillna("")
for kw in keywords:
pattern = kw_pattern(kw)
mask &= title_series.str.contains(pattern, flags=re.IGNORECASE, regex=True)
filtered_df = df_merged[mask].copy()
if filtered_df.empty:
print(f"️ No matches for '{sheet_name}'")
continue
ws_new = wb.create_sheet(title=sheet_name)
for r in dataframe_to_rows(filtered_df, index=False, header=True):
ws_new.append(r)
format_ws(ws_new, filtered_df)
print(f"🟡 Created sheet '{sheet_name}' ({len(filtered_df)} rows).")
# ==================== SAVE ====================
wb.save(xlsx_path)
print(f"📘 Exported full merged report:\n{xlsx_path}")