Files
medevio/Testy/06 Testy.py
2025-11-14 07:18:30 +01:00

204 lines
4.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import requests
import pymysql
from pathlib import Path
# ==============================
# 🔧 KONFIGURACE
# ==============================
TOKEN_PATH = Path("token.txt")
CLINIC_SLUG = "mudr-buzalkova"
GRAPHQL_URL = "https://api.medevio.cz/graphql"
DB_CONFIG = {
"host": "192.168.1.76",
"port": 3307,
"user": "root",
"password": "Vlado9674+",
"database": "medevio",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor,
}
# počet pacientů na stránku
PAGE_SIZE = 100
# ==============================
# 📌 GRAPHQL QUERY
# ==============================
QUERY = """
query PatientGridImpl_ListClinicPatients(
$clinicSlug: String!,
$filter: ListPatientFilter!,
$pageInfo: PageInfo!,
$sort: [ListPatientsSort!]
) {
patientsList: listPatients(
clinicSlug: $clinicSlug
filter: $filter
pageInfo: $pageInfo
sort: $sort
) {
count
patients {
id
identificationNumber
insuranceCompanyObject {
id
shortName
}
name
surname
phone
sex
status2
type
kind
isInClinic
isUnknownPatient
user {
registrationCompletedTime
}
clinics {
id
name
slug
}
tags(onlyImportant: false) {
id
name
color
icon
}
}
}
}
"""
from datetime import datetime
def normalize_dt(dt_str):
if not dt_str:
return None
# remove Z
dt_str = dt_str.replace("Z", "")
# replace T with space
dt_str = dt_str.replace("T", " ")
# remove fractional seconds
if "." in dt_str:
dt_str = dt_str.split(".")[0]
return dt_str # MySQL can accept "YYYY-MM-DD HH:MM:SS"
# ==============================
# 💾 ULOŽENÍ PACIENTA
# ==============================
def save_patient_summary(cur, p):
sql = """
REPLACE INTO medevio_pacienti (
id, jmeno, prijmeni, rodne_cislo, telefon, pohlavi,
pojistovna_id, pojistovna_nazev,
status, typ, kind,
is_in_clinic, is_unknown,
registration_time,
tags_json, clinics_json,
last_update
)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,NOW())
"""
ins = p.get("insuranceCompanyObject") or {}
user = p.get("user") or {}
cur.execute(sql, (
p.get("id"),
p.get("name"),
p.get("surname"),
p.get("identificationNumber"),
p.get("phone"),
p.get("sex"),
ins.get("id"),
ins.get("shortName"),
p.get("status2"),
p.get("type"),
p.get("kind"),
1 if p.get("isInClinic") else 0,
1 if p.get("isUnknownPatient") else 0,
normalize_dt(user.get("registrationCompletedTime")),
json.dumps(p.get("tags"), ensure_ascii=False),
json.dumps(p.get("clinics"), ensure_ascii=False),
))
# ==============================
# 🧠 MAIN
# ==============================
def main():
token = TOKEN_PATH.read_text().strip()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
conn = pymysql.connect(**DB_CONFIG)
cur = conn.cursor()
offset = 0
total = None
print("⏳ Starting patient sync...\n")
while True:
print(f"➡️ Fetching patients {offset}{offset + PAGE_SIZE} ...")
variables = {
"clinicSlug": CLINIC_SLUG,
"filter": {},
"pageInfo": {"first": PAGE_SIZE, "offset": offset},
"sort": [{"field": "ReverseFullName", "sort": "ASC"}],
}
response = requests.post(
GRAPHQL_URL,
json={"query": QUERY, "variables": variables},
headers=headers,
timeout=30
)
response.raise_for_status()
data = response.json()
block = data["data"]["patientsList"]
if total is None:
total = block["count"]
print(f"📌 Total patients: {total}\n")
patients = block["patients"]
if not patients:
print("✅ No more patients. Finished.")
break
# save each patient
for p in patients:
save_patient_summary(cur, p)
conn.commit()
print(f" ✓ Saved {len(patients)} patients.")
offset += PAGE_SIZE
if offset >= total:
print("\n✅ All patients downloaded.")
break
cur.close()
conn.close()
# ==============================
if __name__ == "__main__":
main()