Z230
This commit is contained in:
@@ -2,24 +2,43 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
Fetches messages from Medevio API.
|
||||
|
||||
Modes:
|
||||
- Incremental (default): Only requests where messagesProcessed IS NULL or < updatedAt
|
||||
- Full resync (--full): Fetches ALL messages for ALL pozadavky
|
||||
Delta sync Medevio communication.
|
||||
Stáhne pouze zprávy změněné po messagesProcessed pro každý požadavek.
|
||||
"""
|
||||
|
||||
import zlib
|
||||
import json
|
||||
import requests
|
||||
import pymysql
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
import time
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
# ==============================
|
||||
# 🔧 CONFIGURATION
|
||||
# UTF-8 SAFE OUTPUT
|
||||
# ==============================
|
||||
try:
|
||||
sys.stdout.reconfigure(encoding='utf-8')
|
||||
sys.stderr.reconfigure(encoding='utf-8')
|
||||
except AttributeError:
|
||||
import io
|
||||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
|
||||
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
|
||||
|
||||
|
||||
def safe_print(text: str):
|
||||
enc = sys.stdout.encoding or ""
|
||||
if not enc.lower().startswith("utf"):
|
||||
text = ''.join(ch for ch in text if ord(ch) < 65536)
|
||||
try:
|
||||
print(text)
|
||||
except UnicodeEncodeError:
|
||||
text = ''.join(ch for ch in text if ord(ch) < 128)
|
||||
print(text)
|
||||
|
||||
|
||||
# ==============================
|
||||
# CONFIG
|
||||
# ==============================
|
||||
TOKEN_PATH = Path("token.txt")
|
||||
|
||||
@@ -35,7 +54,10 @@ DB_CONFIG = {
|
||||
|
||||
GRAPHQL_QUERY_MESSAGES = r"""
|
||||
query UseMessages_ListMessages($requestId: String!, $updatedSince: DateTime) {
|
||||
messages: listMessages(patientRequestId: $requestId, updatedSince: $updatedSince) {
|
||||
messages: listMessages(
|
||||
patientRequestId: $requestId,
|
||||
updatedSince: $updatedSince
|
||||
) {
|
||||
id
|
||||
createdAt
|
||||
updatedAt
|
||||
@@ -54,7 +76,6 @@ query UseMessages_ListMessages($requestId: String!, $updatedSince: DateTime) {
|
||||
contentType
|
||||
url
|
||||
downloadUrl
|
||||
token
|
||||
createdAt
|
||||
updatedAt
|
||||
}
|
||||
@@ -62,54 +83,67 @@ query UseMessages_ListMessages($requestId: String!, $updatedSince: DateTime) {
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
# ==============================
|
||||
# ⏱ DATETIME PARSER
|
||||
# HELPERS
|
||||
# ==============================
|
||||
def parse_dt(s):
|
||||
if not s:
|
||||
return None
|
||||
try:
|
||||
return datetime.fromisoformat(s.replace("Z", "+00:00"))
|
||||
except:
|
||||
pass
|
||||
try:
|
||||
return datetime.strptime(s[:19], "%Y-%m-%dT%H:%M:%S")
|
||||
except:
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
# ==============================
|
||||
# 🔐 TOKEN
|
||||
# ==============================
|
||||
|
||||
def read_token(path: Path) -> str:
|
||||
tok = path.read_text(encoding="utf-8").strip()
|
||||
return tok.replace("Bearer ", "")
|
||||
|
||||
|
||||
# ==============================
|
||||
# 📡 FETCH MESSAGES
|
||||
# FETCH MESSAGES (DELTA)
|
||||
# ==============================
|
||||
def fetch_messages(headers, request_id):
|
||||
def fetch_messages(headers, request_id, updated_since):
|
||||
payload = {
|
||||
"operationName": "UseMessages_ListMessages",
|
||||
"query": GRAPHQL_QUERY_MESSAGES,
|
||||
"variables": {"requestId": request_id, "updatedSince": None},
|
||||
"variables": {
|
||||
"requestId": request_id,
|
||||
"updatedSince": updated_since,
|
||||
},
|
||||
}
|
||||
|
||||
r = requests.post("https://api.medevio.cz/graphql", json=payload, headers=headers, timeout=30)
|
||||
r = requests.post(
|
||||
"https://api.medevio.cz/graphql",
|
||||
json=payload,
|
||||
headers=headers,
|
||||
timeout=30
|
||||
)
|
||||
|
||||
if r.status_code != 200:
|
||||
print("❌ HTTP", r.status_code, "for request", request_id)
|
||||
safe_print(f"❌ HTTP {r.status_code} for request {request_id}")
|
||||
return []
|
||||
return r.json().get("data", {}).get("messages", []) or []
|
||||
|
||||
j = r.json()
|
||||
if "errors" in j:
|
||||
safe_print(f"❌ GraphQL error for {request_id}: {j['errors']}")
|
||||
return []
|
||||
|
||||
return j.get("data", {}).get("messages", []) or []
|
||||
|
||||
|
||||
# ==============================
|
||||
# 💾 SAVE MESSAGE
|
||||
# INSERT MESSAGE
|
||||
# ==============================
|
||||
def insert_message(cur, req_id, msg):
|
||||
|
||||
sender = msg.get("sender") or {}
|
||||
sender_name = " ".join(
|
||||
x for x in [sender.get("name"), sender.get("surname")] if x
|
||||
) or None
|
||||
|
||||
mr = msg.get("medicalRecord") or {}
|
||||
|
||||
sql = """
|
||||
INSERT INTO medevio_conversation (
|
||||
id, request_id,
|
||||
@@ -130,8 +164,6 @@ def insert_message(cur, req_id, msg):
|
||||
attachment_content_type = VALUES(attachment_content_type)
|
||||
"""
|
||||
|
||||
mr = msg.get("medicalRecord") or {}
|
||||
|
||||
cur.execute(sql, (
|
||||
msg.get("id"),
|
||||
req_id,
|
||||
@@ -147,19 +179,16 @@ def insert_message(cur, req_id, msg):
|
||||
mr.get("contentType")
|
||||
))
|
||||
|
||||
|
||||
# ==============================
|
||||
# 💾 DOWNLOAD MESSAGE ATTACHMENT
|
||||
# INSERT ATTACHMENT (DEDUP)
|
||||
# ==============================
|
||||
def insert_download(cur, req_id, msg, existing_ids):
|
||||
|
||||
mr = msg.get("medicalRecord") or {}
|
||||
attachment_id = mr.get("id")
|
||||
if not attachment_id:
|
||||
if not attachment_id or attachment_id in existing_ids:
|
||||
return
|
||||
|
||||
if attachment_id in existing_ids:
|
||||
return # skip duplicates
|
||||
|
||||
url = mr.get("downloadUrl") or mr.get("url")
|
||||
if not url:
|
||||
return
|
||||
@@ -169,7 +198,7 @@ def insert_download(cur, req_id, msg, existing_ids):
|
||||
r.raise_for_status()
|
||||
data = r.content
|
||||
except Exception as e:
|
||||
print("⚠️ Failed to download:", e)
|
||||
safe_print(f"⚠️ Attachment download failed: {e}")
|
||||
return
|
||||
|
||||
filename = url.split("/")[-1].split("?")[0]
|
||||
@@ -196,17 +225,11 @@ def insert_download(cur, req_id, msg, existing_ids):
|
||||
|
||||
existing_ids.add(attachment_id)
|
||||
|
||||
|
||||
# ==============================
|
||||
# 🧠 MAIN
|
||||
# MAIN
|
||||
# ==============================
|
||||
def main():
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--full", action="store_true", help="Load messages for ALL pozadavky")
|
||||
# Force full mode ON
|
||||
args = parser.parse_args(args=["--full"])
|
||||
# args = parser.parse_args()
|
||||
|
||||
token = read_token(TOKEN_PATH)
|
||||
headers = {
|
||||
"Authorization": f"Bearer {token}",
|
||||
@@ -216,64 +239,55 @@ def main():
|
||||
|
||||
conn = pymysql.connect(**DB_CONFIG)
|
||||
|
||||
# ---- Load existing attachments
|
||||
# existing attachments
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT attachment_id FROM medevio_downloads")
|
||||
existing_ids = {row["attachment_id"] for row in cur.fetchall()}
|
||||
existing_ids = {r["attachment_id"] for r in cur.fetchall()}
|
||||
|
||||
print(f"📦 Already downloaded attachments: {len(existing_ids)}\n")
|
||||
|
||||
# ---- Select pozadavky to process
|
||||
# select requests needing sync
|
||||
with conn.cursor() as cur:
|
||||
if args.full:
|
||||
print("🔁 FULL REFRESH MODE: Fetching messages for ALL pozadavky!\n")
|
||||
cur.execute("SELECT id FROM pozadavky")
|
||||
else:
|
||||
print("📥 Incremental mode: Only syncing updated pozadavky.\n")
|
||||
cur.execute("""
|
||||
SELECT id FROM pozadavky
|
||||
WHERE messagesProcessed IS NULL
|
||||
OR messagesProcessed < updatedAt
|
||||
""")
|
||||
requests_to_process = cur.fetchall()
|
||||
cur.execute("""
|
||||
SELECT id, messagesProcessed
|
||||
FROM pozadavky
|
||||
WHERE messagesProcessed IS NULL
|
||||
OR messagesProcessed < updatedAt
|
||||
""")
|
||||
rows = cur.fetchall()
|
||||
|
||||
# =================================
|
||||
# ⏩ SKIP FIRST 3100 AS YESTERDAY
|
||||
# =================================
|
||||
safe_print(f"📋 Found {len(rows)} requests for message delta-sync\n")
|
||||
|
||||
SKIP = 3100
|
||||
if len(requests_to_process) > SKIP:
|
||||
print(f"⏩ Skipping first {SKIP} pozadavky (already processed yesterday).")
|
||||
requests_to_process = requests_to_process[SKIP:]
|
||||
else:
|
||||
print("⚠️ Not enough pozadavky to skip!")
|
||||
|
||||
|
||||
print(f"📋 Requests to process: {len(requests_to_process)}\n")
|
||||
|
||||
# ---- Process each request
|
||||
for idx, row in enumerate(requests_to_process, 1):
|
||||
for i, row in enumerate(rows, 1):
|
||||
req_id = row["id"]
|
||||
print(f"[{idx}/{len(requests_to_process)}] Processing {req_id} …")
|
||||
updated_since = row["messagesProcessed"]
|
||||
if updated_since:
|
||||
updated_since = updated_since.replace(microsecond=0).isoformat() + "Z"
|
||||
|
||||
messages = fetch_messages(headers, req_id)
|
||||
safe_print(f"[{i}/{len(rows)}] {req_id}")
|
||||
|
||||
messages = fetch_messages(headers, req_id, updated_since)
|
||||
if not messages:
|
||||
safe_print(" ⏭ No new messages")
|
||||
else:
|
||||
with conn.cursor() as cur:
|
||||
for msg in messages:
|
||||
insert_message(cur, req_id, msg)
|
||||
insert_download(cur, req_id, msg, existing_ids)
|
||||
conn.commit()
|
||||
safe_print(f" ✅ {len(messages)} new/updated messages")
|
||||
|
||||
with conn.cursor() as cur:
|
||||
for msg in messages:
|
||||
insert_message(cur, req_id, msg)
|
||||
insert_download(cur, req_id, msg, existing_ids)
|
||||
cur.execute(
|
||||
"UPDATE pozadavky SET messagesProcessed = NOW() WHERE id = %s",
|
||||
(req_id,)
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("UPDATE pozadavky SET messagesProcessed = NOW() WHERE id = %s", (req_id,))
|
||||
conn.commit()
|
||||
|
||||
print(f" ✅ {len(messages)} messages saved\n")
|
||||
time.sleep(0.25)
|
||||
|
||||
conn.close()
|
||||
print("🎉 Done!")
|
||||
safe_print("\n🎉 Delta message sync DONE")
|
||||
|
||||
|
||||
# ==============================
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user