notebookvb

This commit is contained in:
Vladimir Buzalka
2026-05-14 05:57:16 +02:00
parent 68416deae6
commit d33c0da218
6 changed files with 605 additions and 79 deletions
+25
View File
@@ -0,0 +1,25 @@
CREATE TABLE IF NOT EXISTS medevio.medevio_pacient (
patient_id VARCHAR(36) NOT NULL PRIMARY KEY,
name VARCHAR(100),
surname VARCHAR(100),
identification_number VARCHAR(20) COMMENT 'Rodne cislo bez lomitka',
sex VARCHAR(10) COMMENT 'Male / Female',
dob DATE COMMENT 'Datum narozeni',
email VARCHAR(200),
phone VARCHAR(50),
insurance_code INT COMMENT 'Kod pojistovny (111=VZP, 207=OZP, ...)',
insurance_name VARCHAR(100) COMMENT 'Nazev pojistovny',
status VARCHAR(20) COMMENT 'ACTIVE / INACTIVE',
is_in_clinic TINYINT(1) DEFAULT 1,
has_mobile_app TINYINT(1) DEFAULT 0,
anamnesis_shared TINYINT(1) DEFAULT 0,
note TEXT COMMENT 'Interni poznamka lekare',
city VARCHAR(200),
street VARCHAR(200),
house_number VARCHAR(50),
user_id VARCHAR(36) COMMENT 'ID uzivatelskeho uctu (PatientUser)',
user_email VARCHAR(200) COMMENT 'Email spravujici osoby',
user_phone VARCHAR(50) COMMENT 'Telefon spravujici osoby',
created_at DATETIME COMMENT 'Kdy byl pacient vytvoren v Medeviu',
synced_at DATETIME NOT NULL COMMENT 'Cas posledni synchronizace'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
@@ -0,0 +1,337 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Sync Medevio patients to MySQL table medevio.medevio_pacient.
Stahuje seznam pacientů přes GraphQL API (stránkovaně po 50),
pro každého pacienta stáhne detail a uloží/aktualizuje v MySQL.
"""
import sys
try:
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
except AttributeError:
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
import json
import time
import pymysql
import requests
from pathlib import Path
from datetime import datetime
from dateutil import parser as dtparser, tz
# ==================== CONFIG ====================
GRAPHQL_URL = "https://api.medevio.cz/graphql"
CLINIC_SLUG = "mudr-buzalkova"
PAGE_SIZE = 50
DELAY_BETWEEN_REQUESTS = 0.3
TOKEN_PATH = Path(__file__).resolve().parent.parent / "token.txt"
DB_CONFIG = {
"host": "192.168.1.76",
"port": 3306,
"user": "root",
"password": "Vlado9674+",
"database": "medevio",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor,
}
PRAGUE_TZ = tz.gettz("Europe/Prague")
# ==================== TABLE ====================
CREATE_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS medevio_pacient (
patient_id VARCHAR(36) NOT NULL PRIMARY KEY,
name VARCHAR(100),
surname VARCHAR(100),
identification_number VARCHAR(20) COMMENT 'Rodne cislo bez lomitka',
sex VARCHAR(10) COMMENT 'Male / Female',
dob DATE COMMENT 'Datum narozeni',
email VARCHAR(200),
phone VARCHAR(50),
insurance_code INT COMMENT 'Kod pojistovny (111=VZP, 207=OZP, ...)',
insurance_name VARCHAR(100) COMMENT 'Nazev pojistovny',
status VARCHAR(20) COMMENT 'ACTIVE / INACTIVE',
is_in_clinic TINYINT(1) DEFAULT 1,
has_mobile_app TINYINT(1) DEFAULT 0,
anamnesis_shared TINYINT(1) DEFAULT 0,
note TEXT COMMENT 'Interni poznamka lekare',
city VARCHAR(200),
street VARCHAR(200),
house_number VARCHAR(50),
user_id VARCHAR(36) COMMENT 'ID uzivatelskeho uctu (PatientUser)',
user_email VARCHAR(200) COMMENT 'Email spravujici osoby',
user_phone VARCHAR(50) COMMENT 'Telefon spravujici osoby',
created_at DATETIME COMMENT 'Kdy byl pacient vytvoren v Medeviu',
synced_at DATETIME NOT NULL COMMENT 'Cas posledni synchronizace'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
"""
# ==================== UPSERT ====================
UPSERT_SQL = """
INSERT INTO medevio_pacient (
patient_id, name, surname, identification_number,
sex, dob, email, phone,
insurance_code, insurance_name,
status, is_in_clinic, has_mobile_app, anamnesis_shared,
note, city, street, house_number,
user_id, user_email, user_phone,
created_at, synced_at
) VALUES (
%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
)
ON DUPLICATE KEY UPDATE
name = VALUES(name),
surname = VALUES(surname),
identification_number = VALUES(identification_number),
sex = VALUES(sex),
dob = VALUES(dob),
email = VALUES(email),
phone = VALUES(phone),
insurance_code = VALUES(insurance_code),
insurance_name = VALUES(insurance_name),
status = VALUES(status),
is_in_clinic = VALUES(is_in_clinic),
has_mobile_app = VALUES(has_mobile_app),
anamnesis_shared = VALUES(anamnesis_shared),
note = VALUES(note),
city = VALUES(city),
street = VALUES(street),
house_number = VALUES(house_number),
user_id = VALUES(user_id),
user_email = VALUES(user_email),
user_phone = VALUES(user_phone),
created_at = VALUES(created_at),
synced_at = VALUES(synced_at)
"""
# ==================== GRAPHQL QUERIES ====================
LIST_PATIENTS_QUERY = """
query SyncListPatients($clinicSlug: String!, $pageInfo: PageInfo!, $filter: ListPatientFilter!) {
patientsList: listPatients(clinicSlug: $clinicSlug, filter: $filter, pageInfo: $pageInfo) {
count
patients {
id
identificationNumber
name
surname
sex
phone
status2
isInClinic
locale
insuranceCompanyObject { id shortName }
user { id phone name surname }
}
}
}
"""
DETAIL_PATIENT_QUERY = """
query GetPatientDetail($clinicSlug: String!, $patientId: String!) {
patient: getPatientForClinic(clinicSlug: $clinicSlug, patientId: $patientId) {
id
name
surname
identificationNumber
sex
dob
email
phone
status
isInClinic
hasMobileApp
anamnesisShared
note
city
street
houseNumber
createdAt
insuranceCompanyObject {
code
name
}
user {
id
email
phone
}
}
}
"""
def parse_dt(iso_str):
if not iso_str:
return None
try:
return dtparser.isoparse(iso_str).astimezone(PRAGUE_TZ).replace(tzinfo=None)
except Exception:
return None
def make_headers(token):
return {
"content-type": "application/json",
"authorization": f"Bearer {token}",
"origin": "https://my.medevio.cz",
"referer": "https://my.medevio.cz/",
}
def gql_request(headers, query, variables):
payload = {"query": query, "variables": variables}
resp = requests.post(GRAPHQL_URL, headers=headers, json=payload, timeout=30)
resp.raise_for_status()
data = resp.json()
if "errors" in data:
print(f" GQL errors: {data['errors']}")
return None
return data.get("data")
def fetch_patients_list(headers):
"""Stáhne seznam všech patient IDs stránkovaně přes listPatients."""
all_patients = []
offset = 0
total = None
while True:
data = gql_request(headers, LIST_PATIENTS_QUERY, {
"clinicSlug": CLINIC_SLUG,
"filter": {},
"pageInfo": {"first": PAGE_SIZE, "offset": offset},
})
if not data or "patientsList" not in data:
print(f" Chyba pri stahovani seznamu (offset={offset}).")
return None
result = data["patientsList"]
if total is None:
total = result.get("count", 0)
print(f" Celkem pacientu v Medevio: {total}")
patients = result.get("patients") or []
if not patients:
break
all_patients.extend(patients)
offset += PAGE_SIZE
print(f" Stazeno {len(all_patients)}/{total}...")
if offset >= total:
break
time.sleep(DELAY_BETWEEN_REQUESTS)
return all_patients
def fetch_patient_detail(headers, patient_id):
"""Stáhne detail jednoho pacienta."""
data = gql_request(headers, DETAIL_PATIENT_QUERY, {
"clinicSlug": CLINIC_SLUG,
"patientId": patient_id,
})
if data and "patient" in data:
return data["patient"]
return None
def patient_to_row(p):
"""Převede patient dict na tuple pro UPSERT."""
insurance = p.get("insuranceCompanyObject") or {}
user = p.get("user") or {}
now = datetime.now().replace(microsecond=0)
return (
p.get("id"),
p.get("name"),
p.get("surname"),
p.get("identificationNumber"),
p.get("sex"),
p.get("dob"),
p.get("email"),
p.get("phone"),
insurance.get("code"),
insurance.get("name"),
p.get("status") or p.get("status2"),
1 if p.get("isInClinic") else 0,
1 if p.get("hasMobileApp") else 0,
1 if p.get("anamnesisShared") else 0,
p.get("note"),
p.get("city"),
p.get("street"),
p.get("houseNumber"),
user.get("id"),
user.get("email"),
user.get("phone"),
parse_dt(p.get("createdAt")),
now,
)
def upsert_patients(conn, patients):
rows = [patient_to_row(p) for p in patients]
with conn.cursor() as cur:
cur.executemany(UPSERT_SQL, rows)
conn.commit()
return len(rows)
def main():
token = TOKEN_PATH.read_text(encoding="utf-8").strip()
if token.startswith("Bearer "):
token = token.split(" ", 1)[1]
headers = make_headers(token)
conn = pymysql.connect(**DB_CONFIG)
with conn.cursor() as cur:
cur.execute(CREATE_TABLE_SQL)
conn.commit()
print("Tabulka medevio_pacient pripravena.")
# --- Krok 1: stáhni seznam patient IDs ---
print("\nStahuji seznam pacientu...")
patients = fetch_patients_list(headers)
if patients is None or len(patients) == 0:
print("Nepodařilo se stáhnout seznam pacientů.")
conn.close()
return
# --- Krok 2: pro každého stáhni detail ---
print(f"\nStahuji detaily pro {len(patients)} pacientu...")
detailed = []
errors = 0
for i, p in enumerate(patients):
detail = fetch_patient_detail(headers, p["id"])
if detail:
detailed.append(detail)
else:
errors += 1
if (i + 1) % 50 == 0:
print(f" Detail {i + 1}/{len(patients)}...")
time.sleep(DELAY_BETWEEN_REQUESTS)
# --- Krok 3: upsert do MySQL ---
count = upsert_patients(conn, detailed)
print(f"\nHotovo — {count} pacientu upsertovano.")
if errors:
print(f" ({errors} pacientu se nepodařilo stáhnout)")
conn.close()
print("\nDokonceno.")
if __name__ == "__main__":
main()