Files
ordinaceprojekt/Medevio/40 agenda a požadavky/Report_AgendaPozadavky.py
T
administrator 57e60b4a47 Report_AgendaPozadavky: přidán list Požadavky kompletní (všechny záznamy)
- Nový sheet "Požadavky kompletní" — všechny požadavky z pozadavky bez filtru
- Sloupce: Date, Title, Patient, DOB, Request_ID, doneAt, removedAt
- 11 415 záznamů včetně uzavřených a smazaných

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-13 13:32:25 +02:00

395 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Full Medevio Report:
- Agenda (API, next 30 days)
- Otevřené požadavky (MySQL)
- Merged (Agenda + Open, deduplicated)
- Agenda kompletní (MySQL medevio_agenda, všechny záznamy)
- Vaccine sheets (from merged data)
"""
import sys
try:
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
except AttributeError:
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
import re
import json
import pymysql
import requests
import pandas as pd
from pathlib import Path
from datetime import datetime
from dateutil import parser, tz
from dateutil.relativedelta import relativedelta
from openpyxl import load_workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
from openpyxl.utils import get_column_letter
from openpyxl.utils.dataframe import dataframe_to_rows
# ==================== CONFIG ====================
GRAPHQL_URL = "https://api.medevio.cz/graphql"
CALENDAR_ID = "144c4e12-347c-49ca-9ec0-8ca965a4470d"
CLINIC_SLUG = "mudr-buzalkova"
DB_CONFIG = {
"host": "192.168.1.76",
"port": 3306,
"user": "root",
"password": "Vlado9674+",
"database": "medevio",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor,
}
EXPORT_DIR = Path(r"u:\Dropbox\!!!Days\Downloads Z230")
EXPORT_DIR.mkdir(exist_ok=True, parents=True)
# Delete previous reports
for old in EXPORT_DIR.glob("* Agenda + Požadavky.xlsx"):
old.unlink()
print(f"🗑️ Deleted old report: {old.name}")
timestamp = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
xlsx_path = EXPORT_DIR / f"{timestamp} Agenda + Požadavky.xlsx"
# ==================== LOAD TOKEN ====================
TOKEN_PATH = Path(__file__).resolve().parent.parent / "token.txt"
gateway_token = TOKEN_PATH.read_text(encoding="utf-8").strip()
headers = {
"content-type": "application/json",
"authorization": f"Bearer {gateway_token}",
"origin": "https://my.medevio.cz",
"referer": "https://my.medevio.cz/",
}
# ==================== STYLING ====================
widths = {1: 11, 2: 13, 3: 45, 4: 30, 5: 15, 6: 15, 7: 30, 8: 15, 9: 37, 10: 37}
header_fill = PatternFill("solid", fgColor="FFFF00")
alt_fill = PatternFill("solid", fgColor="F2F2F2")
thin_border = Border(
left=Side(style="thin", color="000000"),
right=Side(style="thin", color="000000"),
top=Side(style="thin", color="000000"),
bottom=Side(style="thin", color="000000"),
)
REQUEST_URL_TEMPLATE = "https://my.medevio.cz/mudr-buzalkova/klinika/pozadavky?pozadavek={}"
link_font = Font(color="0563C1", underline="single")
def format_ws(ws, df):
"""Apply unified formatting to a worksheet."""
# Find Request_ID column index (1-based)
req_id_col = None
columns = list(df.columns)
if "Request_ID" in columns:
req_id_col = columns.index("Request_ID") + 1
for col_idx in range(1, len(df.columns) + 1):
col_letter = get_column_letter(col_idx)
cell = ws.cell(row=1, column=col_idx)
cell.font = Font(bold=True)
cell.alignment = Alignment(horizontal="center", vertical="center")
cell.fill = header_fill
cell.value = str(cell.value).upper()
cell.border = thin_border
ws.column_dimensions[col_letter].width = widths.get(col_idx, 20)
for r_idx, row in enumerate(ws.iter_rows(min_row=2, max_row=ws.max_row), start=2):
for cell in row:
cell.border = thin_border
if r_idx % 2 == 0:
cell.fill = alt_fill
# Add hyperlink to Request_ID cells
if req_id_col and cell.column == req_id_col and cell.value:
cell.hyperlink = REQUEST_URL_TEMPLATE.format(cell.value)
cell.font = link_font
ws.freeze_panes = "A2"
ws.auto_filter.ref = ws.dimensions
# ==================== 1️⃣ LOAD AGENDA (API) ====================
print("📡 Querying Medevio API for agenda...")
dnes = datetime.utcnow().date()
since = datetime.combine(dnes, datetime.min.time())
until = since + relativedelta(months=1)
payload = {
"operationName": "ClinicAgenda_ListClinicReservations",
"variables": {
"calendarIds": [CALENDAR_ID],
"clinicSlug": CLINIC_SLUG,
"since": since.isoformat() + "Z",
"until": until.isoformat() + "Z",
"locale": "cs",
"emptyCalendarIds": False,
},
"query": """query ClinicAgenda_ListClinicReservations(
$calendarIds: [UUID!], $clinicSlug: String!,
$locale: Locale!, $since: DateTime!, $until: DateTime!,
$emptyCalendarIds: Boolean!
) {
reservations: listClinicReservations(
clinicSlug: $clinicSlug, calendarIds: $calendarIds,
since: $since, until: $until
) @skip(if: $emptyCalendarIds) {
id start end note done color
request {
id displayTitle(locale: $locale)
extendedPatient {
name surname dob insuranceCompanyObject { shortName }
}
}
}
}""",
}
response = requests.post(GRAPHQL_URL, headers=headers, data=json.dumps(payload))
response.raise_for_status()
resp = response.json()
if "errors" in resp or "data" not in resp:
print("❌ API response:")
print(json.dumps(resp, indent=2, ensure_ascii=False))
raise SystemExit("API call failed - check token or query.")
reservations = resp["data"]["reservations"]
rows = []
for r in reservations:
req = r.get("request") or {}
patient = req.get("extendedPatient") or {}
insurance = patient.get("insuranceCompanyObject") or {}
try:
start_dt = parser.isoparse(r.get("start")).astimezone(tz.gettz("Europe/Prague"))
end_dt = parser.isoparse(r.get("end")).astimezone(tz.gettz("Europe/Prague"))
except Exception:
start_dt = end_dt = None
date_str = start_dt.strftime("%Y-%m-%d") if start_dt else ""
time_interval = (
f"{start_dt.strftime('%H:%M')}-{end_dt.strftime('%H:%M')}"
if start_dt and end_dt
else ""
)
rows.append(
{
"Date": date_str,
"Time": time_interval,
"Title": req.get("displayTitle") or "",
"Patient": f"{patient.get('surname','')} {patient.get('name','')}".strip(),
"DOB": patient.get("dob") or "",
"Insurance": insurance.get("shortName") or "",
"Note": r.get("note") or "",
"Color": r.get("color") or "",
"Request_ID": req.get("id") or "",
"Reservation_ID": r.get("id"),
}
)
df_agenda = pd.DataFrame(rows).sort_values(["Date", "Time"])
print(f"✅ Loaded {len(df_agenda)} agenda rows.")
# ==================== 2️⃣ LOAD OPEN REQUESTS + COMPLETE AGENDA (MySQL) ====================
print("📡 Loading open requests and complete agenda from MySQL...")
conn = pymysql.connect(**DB_CONFIG)
with conn.cursor() as cur:
cur.execute(
"""
SELECT
id AS Request_ID,
displayTitle AS Title,
pacient_prijmeni AS Pacient_Prijmeni,
pacient_jmeno AS Pacient_Jmeno,
pacient_rodnecislo AS DOB,
createdAt AS Created
FROM pozadavky
WHERE doneAt IS NULL AND removedAt IS NULL
ORDER BY createdAt DESC
"""
)
rows = cur.fetchall()
with conn.cursor() as cur:
cur.execute(
"""
SELECT
id AS Request_ID,
displayTitle AS Title,
pacient_prijmeni AS Pacient_Prijmeni,
pacient_jmeno AS Pacient_Jmeno,
pacient_rodnecislo AS DOB,
createdAt AS Created,
doneAt,
removedAt
FROM pozadavky
ORDER BY createdAt DESC
"""
)
pozadavky_all_rows = cur.fetchall()
with conn.cursor() as cur:
cur.execute(
"""
SELECT
date AS Date,
time_interval AS Time,
request_title AS Title,
patient_surname AS surname,
patient_name AS name,
patient_dob AS DOB,
insurance_short AS Insurance,
note AS Note,
color AS Color,
request_id AS Request_ID,
reservation_id AS Reservation_ID
FROM medevio_agenda
ORDER BY date, time_interval
"""
)
agenda_full_rows = cur.fetchall()
conn.close()
df_open = pd.DataFrame(rows)
if not df_open.empty:
df_open["Patient"] = (
df_open["Pacient_Prijmeni"].fillna("")
+ " "
+ df_open["Pacient_Jmeno"].fillna("")
).str.strip()
df_open["Date"] = df_open["Created"].astype(str).str[:10]
df_open["Time"] = ""
df_open["Insurance"] = ""
df_open["Note"] = ""
df_open["Color"] = ""
df_open["Reservation_ID"] = ""
df_open = df_open[
[
"Date",
"Time",
"Title",
"Patient",
"DOB",
"Insurance",
"Note",
"Color",
"Request_ID",
"Reservation_ID",
]
]
print(f"✅ Loaded {len(df_open)} open requests.")
df_agenda_full = pd.DataFrame(agenda_full_rows)
if not df_agenda_full.empty:
df_agenda_full["Patient"] = (
df_agenda_full["surname"].fillna("") + " " + df_agenda_full["name"].fillna("")
).str.strip()
df_agenda_full = df_agenda_full[
["Date", "Time", "Title", "Patient", "DOB", "Insurance", "Note", "Color", "Request_ID", "Reservation_ID"]
].fillna("")
df_agenda_full["Date"] = df_agenda_full["Date"].astype(str)
print(f"✅ Loaded {len(df_agenda_full)} rows from medevio_agenda.")
df_pozadavky_all = pd.DataFrame(pozadavky_all_rows)
if not df_pozadavky_all.empty:
df_pozadavky_all["Patient"] = (
df_pozadavky_all["Pacient_Prijmeni"].fillna("") + " " + df_pozadavky_all["Pacient_Jmeno"].fillna("")
).str.strip()
df_pozadavky_all["Date"] = df_pozadavky_all["Created"].astype(str).str[:10]
df_pozadavky_all["doneAt"] = df_pozadavky_all["doneAt"].astype(str).str[:10].replace("None", "")
df_pozadavky_all["removedAt"] = df_pozadavky_all["removedAt"].astype(str).str[:10].replace("None", "")
df_pozadavky_all = df_pozadavky_all[
["Date", "Title", "Patient", "DOB", "Request_ID", "doneAt", "removedAt"]
].fillna("")
print(f"✅ Loaded {len(df_pozadavky_all)} total requests from pozadavky.")
# ==================== 3️⃣ MERGE + DEDUPLICATE ====================
print("🟢 Merging and deduplicating (Agenda preferred)...")
df_agenda["Source"] = "Agenda"
df_open["Source"] = "Open"
df_merged = pd.concat([df_agenda, df_open], ignore_index=True).fillna("")
# "Agenda" < "Open" alphabetically → Agenda rows come first after sort
df_merged = df_merged.sort_values(["Source"], ascending=[True])
# Deduplicate by Request_ID only where non-empty (Agenda preferred over Open)
# Rows without Request_ID (pure reservations) are always kept as-is
mask_has_id = df_merged["Request_ID"] != ""
df_with_id = df_merged[mask_has_id].drop_duplicates(subset=["Request_ID"], keep="first")
df_no_id = df_merged[~mask_has_id]
df_merged = pd.concat([df_with_id, df_no_id], ignore_index=True)
df_merged = df_merged.drop(columns=["Source"], errors="ignore")
df_merged = df_merged.sort_values(["Date", "Time"], na_position="last").reset_index(
drop=True
)
print(f"✅ Total merged rows after deduplication: {len(df_merged)}")
# ==================== 4️⃣ WRITE BASE SHEETS ====================
with pd.ExcelWriter(xlsx_path, engine="openpyxl") as writer:
df_agenda.to_excel(writer, sheet_name="Agenda", index=False)
df_open.to_excel(writer, sheet_name="Otevřené požadavky", index=False)
df_merged.to_excel(writer, sheet_name="Merged", index=False)
df_agenda_full.to_excel(writer, sheet_name="Agenda kompletní", index=False)
df_pozadavky_all.to_excel(writer, sheet_name="Požadavky kompletní", index=False)
wb = load_workbook(xlsx_path)
for name, df_ref in [
("Agenda", df_agenda),
("Otevřené požadavky", df_open),
("Merged", df_merged),
("Agenda kompletní", df_agenda_full),
("Požadavky kompletní", df_pozadavky_all),
]:
ws = wb[name]
format_ws(ws, df_ref)
# ==================== 5️⃣ VACCINE SHEETS (from MERGED) ====================
VACCINE_SHEETS = {
"Chřipka": ["očkování", "chřipka"],
"COVID": ["očkování", "covid"],
"Pneumokok": ["očkování", "pneumo"],
"Hep A": ["očkování", "žloutenka a"],
"Hep B": ["očkování", "žloutenka b"],
"Hep A+B": ["očkování", "žloutenka a+b"],
"Klíšťovka": ["očkování", "klíšť"],
}
def kw_pattern(kw):
return rf"(?<!\w){re.escape(kw)}(?!\s*\+\s*\w)"
for sheet_name, keywords in VACCINE_SHEETS.items():
mask = pd.Series(True, index=df_merged.index)
title_series = df_merged["Title"].fillna("")
for kw in keywords:
pattern = kw_pattern(kw)
mask &= title_series.str.contains(pattern, flags=re.IGNORECASE, regex=True)
filtered_df = df_merged[mask].copy()
if filtered_df.empty:
print(f"️ No matches for '{sheet_name}'")
continue
ws_new = wb.create_sheet(title=sheet_name)
for r in dataframe_to_rows(filtered_df, index=False, header=True):
ws_new.append(r)
format_ws(ws_new, filtered_df)
print(f"🟡 Created sheet '{sheet_name}' ({len(filtered_df)} rows).")
# ==================== SAVE ====================
wb.save(xlsx_path)
print(f"📘 Exported full merged report:\n{xlsx_path}")