reporter
This commit is contained in:
301
40 agenda a požadavky/871 test.py
Normal file
301
40 agenda a požadavky/871 test.py
Normal file
@@ -0,0 +1,301 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
Full Medevio Report:
|
||||
- Agenda (API, next 30 days)
|
||||
- Otevřené požadavky (MySQL)
|
||||
- Merged (Agenda + Open, deduplicated)
|
||||
- Vaccine sheets (from merged data)
|
||||
"""
|
||||
|
||||
import re
|
||||
import json
|
||||
import pymysql
|
||||
import requests
|
||||
import pandas as pd
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from dateutil import parser, tz
|
||||
from dateutil.relativedelta import relativedelta
|
||||
from openpyxl import load_workbook
|
||||
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
|
||||
from openpyxl.utils import get_column_letter
|
||||
from openpyxl.utils.dataframe import dataframe_to_rows
|
||||
|
||||
# ==================== CONFIG ====================
|
||||
GRAPHQL_URL = "https://api.medevio.cz/graphql"
|
||||
CALENDAR_ID = "144c4e12-347c-49ca-9ec0-8ca965a4470d"
|
||||
CLINIC_SLUG = "mudr-buzalkova"
|
||||
|
||||
DB_CONFIG = {
|
||||
"host": "192.168.1.76",
|
||||
"port": 3307,
|
||||
"user": "root",
|
||||
"password": "Vlado9674+",
|
||||
"database": "medevio",
|
||||
"charset": "utf8mb4",
|
||||
"cursorclass": pymysql.cursors.DictCursor,
|
||||
}
|
||||
|
||||
EXPORT_DIR = Path(r"z:\Dropbox\Ordinace\Reporty")
|
||||
EXPORT_DIR.mkdir(exist_ok=True, parents=True)
|
||||
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
||||
xlsx_path = EXPORT_DIR / f"{timestamp} Agenda + Pozadavky (Merged).xlsx"
|
||||
|
||||
# ==================== LOAD TOKEN ====================
|
||||
def load_gateway_token(storage_path="medevio_storage.json"):
|
||||
path = Path(storage_path)
|
||||
if not path.exists():
|
||||
raise SystemExit(f"❌ Storage file not found: {path}")
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
state = json.load(f)
|
||||
token = next(
|
||||
(c["value"] for c in state["cookies"] if c["name"] == "gateway-access-token"),
|
||||
None,
|
||||
)
|
||||
if not token:
|
||||
raise SystemExit("❌ gateway-access-token not found in storage file.")
|
||||
return token
|
||||
|
||||
|
||||
gateway_token = load_gateway_token()
|
||||
|
||||
headers = {
|
||||
"content-type": "application/json",
|
||||
"authorization": f"Bearer {gateway_token}",
|
||||
"origin": "https://my.medevio.cz",
|
||||
"referer": "https://my.medevio.cz/",
|
||||
}
|
||||
|
||||
# ==================== STYLING ====================
|
||||
widths = {1: 11, 2: 13, 3: 45, 4: 30, 5: 15, 6: 15, 7: 30, 8: 15, 9: 37, 10: 37}
|
||||
header_fill = PatternFill("solid", fgColor="FFFF00")
|
||||
alt_fill = PatternFill("solid", fgColor="F2F2F2")
|
||||
thin_border = Border(
|
||||
left=Side(style="thin", color="000000"),
|
||||
right=Side(style="thin", color="000000"),
|
||||
top=Side(style="thin", color="000000"),
|
||||
bottom=Side(style="thin", color="000000"),
|
||||
)
|
||||
|
||||
|
||||
def format_ws(ws, df):
|
||||
"""Apply unified formatting to a worksheet."""
|
||||
for col_idx in range(1, len(df.columns) + 1):
|
||||
col_letter = get_column_letter(col_idx)
|
||||
cell = ws.cell(row=1, column=col_idx)
|
||||
cell.font = Font(bold=True)
|
||||
cell.alignment = Alignment(horizontal="center", vertical="center")
|
||||
cell.fill = header_fill
|
||||
cell.value = str(cell.value).upper()
|
||||
cell.border = thin_border
|
||||
ws.column_dimensions[col_letter].width = widths.get(col_idx, 20)
|
||||
for r_idx, row in enumerate(ws.iter_rows(min_row=2, max_row=ws.max_row), start=2):
|
||||
for cell in row:
|
||||
cell.border = thin_border
|
||||
if r_idx % 2 == 0:
|
||||
cell.fill = alt_fill
|
||||
ws.freeze_panes = "A2"
|
||||
|
||||
|
||||
# ==================== 1️⃣ LOAD AGENDA (API) ====================
|
||||
print("📡 Querying Medevio API for agenda...")
|
||||
|
||||
dnes = datetime.utcnow().date()
|
||||
since = datetime.combine(dnes, datetime.min.time())
|
||||
until = since + relativedelta(months=1)
|
||||
|
||||
payload = {
|
||||
"operationName": "ClinicAgenda_ListClinicReservations",
|
||||
"variables": {
|
||||
"calendarIds": [CALENDAR_ID],
|
||||
"clinicSlug": CLINIC_SLUG,
|
||||
"since": since.isoformat() + "Z",
|
||||
"until": until.isoformat() + "Z",
|
||||
"locale": "cs",
|
||||
"emptyCalendarIds": False,
|
||||
},
|
||||
"query": """query ClinicAgenda_ListClinicReservations(
|
||||
$calendarIds: [UUID!], $clinicSlug: String!,
|
||||
$locale: Locale!, $since: DateTime!, $until: DateTime!,
|
||||
$emptyCalendarIds: Boolean!
|
||||
) {
|
||||
reservations: listClinicReservations(
|
||||
clinicSlug: $clinicSlug, calendarIds: $calendarIds,
|
||||
since: $since, until: $until
|
||||
) @skip(if: $emptyCalendarIds) {
|
||||
id start end note done color
|
||||
request {
|
||||
id displayTitle(locale: $locale)
|
||||
extendedPatient {
|
||||
name surname dob insuranceCompanyObject { shortName }
|
||||
}
|
||||
}
|
||||
}
|
||||
}""",
|
||||
}
|
||||
|
||||
r = requests.post(GRAPHQL_URL, headers=headers, data=json.dumps(payload))
|
||||
r.raise_for_status()
|
||||
reservations = r.json()["data"]["reservations"]
|
||||
|
||||
rows = []
|
||||
for r in reservations:
|
||||
req = r.get("request") or {}
|
||||
patient = req.get("extendedPatient") or {}
|
||||
insurance = patient.get("insuranceCompanyObject") or {}
|
||||
try:
|
||||
start_dt = parser.isoparse(r.get("start")).astimezone(tz.gettz("Europe/Prague"))
|
||||
end_dt = parser.isoparse(r.get("end")).astimezone(tz.gettz("Europe/Prague"))
|
||||
except Exception:
|
||||
start_dt = end_dt = None
|
||||
date_str = start_dt.strftime("%Y-%m-%d") if start_dt else ""
|
||||
time_interval = (
|
||||
f"{start_dt.strftime('%H:%M')}-{end_dt.strftime('%H:%M')}"
|
||||
if start_dt and end_dt
|
||||
else ""
|
||||
)
|
||||
rows.append(
|
||||
{
|
||||
"Date": date_str,
|
||||
"Time": time_interval,
|
||||
"Title": req.get("displayTitle") or "",
|
||||
"Patient": f"{patient.get('surname','')} {patient.get('name','')}".strip(),
|
||||
"DOB": patient.get("dob") or "",
|
||||
"Insurance": insurance.get("shortName") or "",
|
||||
"Note": r.get("note") or "",
|
||||
"Color": r.get("color") or "",
|
||||
"Request_ID": req.get("id") or "",
|
||||
"Reservation_ID": r.get("id"),
|
||||
}
|
||||
)
|
||||
|
||||
df_agenda = pd.DataFrame(rows).sort_values(["Date", "Time"])
|
||||
print(f"✅ Loaded {len(df_agenda)} agenda rows.")
|
||||
|
||||
|
||||
# ==================== 2️⃣ LOAD OPEN REQUESTS (MySQL) ====================
|
||||
print("📡 Loading open requests from MySQL...")
|
||||
conn = pymysql.connect(**DB_CONFIG)
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT
|
||||
id AS Request_ID,
|
||||
displayTitle AS Title,
|
||||
pacient_prijmeni AS Pacient_Prijmeni,
|
||||
pacient_jmeno AS Pacient_Jmeno,
|
||||
pacient_rodnecislo AS DOB,
|
||||
createdAt AS Created
|
||||
FROM pozadavky
|
||||
WHERE doneAt IS NULL AND removedAt IS NULL
|
||||
ORDER BY createdAt DESC
|
||||
"""
|
||||
)
|
||||
rows = cur.fetchall()
|
||||
conn.close()
|
||||
|
||||
df_open = pd.DataFrame(rows)
|
||||
if not df_open.empty:
|
||||
df_open["Patient"] = (
|
||||
df_open["Pacient_Prijmeni"].fillna("")
|
||||
+ " "
|
||||
+ df_open["Pacient_Jmeno"].fillna("")
|
||||
).str.strip()
|
||||
df_open["Date"] = df_open["Created"].astype(str).str[:10]
|
||||
df_open["Time"] = ""
|
||||
df_open["Insurance"] = ""
|
||||
df_open["Note"] = ""
|
||||
df_open["Color"] = ""
|
||||
df_open["Reservation_ID"] = ""
|
||||
df_open = df_open[
|
||||
[
|
||||
"Date",
|
||||
"Time",
|
||||
"Title",
|
||||
"Patient",
|
||||
"DOB",
|
||||
"Insurance",
|
||||
"Note",
|
||||
"Color",
|
||||
"Request_ID",
|
||||
"Reservation_ID",
|
||||
]
|
||||
]
|
||||
print(f"✅ Loaded {len(df_open)} open requests.")
|
||||
|
||||
|
||||
# ==================== 3️⃣ MERGE + DEDUPLICATE ====================
|
||||
print("🟢 Merging and deduplicating (Agenda preferred)...")
|
||||
|
||||
df_agenda["Source"] = "Agenda"
|
||||
df_open["Source"] = "Open"
|
||||
|
||||
df_merged = pd.concat([df_agenda, df_open], ignore_index=True).fillna("")
|
||||
df_merged = df_merged.sort_values(["Source"], ascending=[True])
|
||||
|
||||
# drop duplicates — prefer Agenda if same Request_ID or same (Patient+Title)
|
||||
df_merged = df_merged.drop_duplicates(
|
||||
subset=["Request_ID", "Patient", "Title"], keep="first"
|
||||
)
|
||||
|
||||
df_merged = df_merged.drop(columns=["Source"], errors="ignore")
|
||||
df_merged = df_merged.sort_values(["Date", "Time"], na_position="last").reset_index(
|
||||
drop=True
|
||||
)
|
||||
print(f"✅ Total merged rows after deduplication: {len(df_merged)}")
|
||||
|
||||
|
||||
# ==================== 4️⃣ WRITE BASE SHEETS ====================
|
||||
with pd.ExcelWriter(xlsx_path, engine="openpyxl") as writer:
|
||||
df_agenda.to_excel(writer, sheet_name="Agenda", index=False)
|
||||
df_open.to_excel(writer, sheet_name="Otevřené požadavky", index=False)
|
||||
df_merged.to_excel(writer, sheet_name="Merged", index=False)
|
||||
|
||||
wb = load_workbook(xlsx_path)
|
||||
for name, df_ref in [
|
||||
("Agenda", df_agenda),
|
||||
("Otevřené požadavky", df_open),
|
||||
("Merged", df_merged),
|
||||
]:
|
||||
ws = wb[name]
|
||||
format_ws(ws, df_ref)
|
||||
|
||||
|
||||
# ==================== 5️⃣ VACCINE SHEETS (from MERGED) ====================
|
||||
VACCINE_SHEETS = {
|
||||
"Chřipka": ["očkování", "chřipka"],
|
||||
"COVID": ["očkování", "covid"],
|
||||
"Pneumokok": ["očkování", "pneumo"],
|
||||
"Hep A": ["očkování", "žloutenka a"],
|
||||
"Hep B": ["očkování", "žloutenka b"],
|
||||
"Hep A+B": ["očkování", "žloutenka a+b"],
|
||||
"Klíšťovka": ["očkování", "klíšť"],
|
||||
}
|
||||
|
||||
|
||||
def kw_pattern(kw):
|
||||
return rf"(?<!\w){re.escape(kw)}(?!\s*\+\s*\w)"
|
||||
|
||||
|
||||
for sheet_name, keywords in VACCINE_SHEETS.items():
|
||||
mask = pd.Series(True, index=df_merged.index)
|
||||
title_series = df_merged["Title"].fillna("")
|
||||
for kw in keywords:
|
||||
pattern = kw_pattern(kw)
|
||||
mask &= title_series.str.contains(pattern, flags=re.IGNORECASE, regex=True)
|
||||
filtered_df = df_merged[mask].copy()
|
||||
if filtered_df.empty:
|
||||
print(f"ℹ️ No matches for '{sheet_name}'")
|
||||
continue
|
||||
ws_new = wb.create_sheet(title=sheet_name)
|
||||
for r in dataframe_to_rows(filtered_df, index=False, header=True):
|
||||
ws_new.append(r)
|
||||
format_ws(ws_new, filtered_df)
|
||||
print(f"🟡 Created sheet '{sheet_name}' ({len(filtered_df)} rows).")
|
||||
|
||||
|
||||
# ==================== SAVE ====================
|
||||
wb.save(xlsx_path)
|
||||
print(f"📘 Exported full merged report:\n{xlsx_path}")
|
||||
Reference in New Issue
Block a user