255 lines
9.4 KiB
Python
255 lines
9.4 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
doplnujici_dotazy_v1.0.py
|
|
=========================
|
|
Verze: 1.0
|
|
Datum: 2026-06-17
|
|
Autor: Claude Code (pro MUDr. Vladimíra Buzalku)
|
|
|
|
Popis
|
|
-----
|
|
Správa kolekce `feasibility.doplnujici_dotazy` — evidence doplňujících dotazů na centra,
|
|
když v SIPIQ chybí odpověď a do dotazníku už NELZE vstoupit. Víme tak, ke kterému centru
|
|
(a ke které otázce) dotaz patří, a v jakém je stavu.
|
|
|
|
Model (domluva 17JUN2026): **1 dok = dotazová UDÁLOST** (může nést více otázek v `questions[]`).
|
|
Když centrum odpoví, odpověď se PROMÍTNE i do `sipiq_responses.answers_supplement{}`
|
|
(s příznakem source="doplneno"); původní Qualtrics `answers` se NEMĚNÍ.
|
|
|
|
Životní cyklus dotazu: open → asked → answered → closed / no_response.
|
|
|
|
Příkazy
|
|
-------
|
|
ensure
|
|
Založí kolekci + indexy (idempotentní).
|
|
|
|
add --center <email|prijmeni|R_id> [--country CZ|SK] --qcodes Q72_1,Q73_1
|
|
[--reason "…"] [--asked-via "…"] [--status asked] [--note "…"] [--apply]
|
|
Založí novou dotazovou událost. Centrum + otázky se dohledají v sipiq_responses
|
|
/ sipiq_questions; identita se denormalizuje. Default dry-run.
|
|
|
|
answer --id <dotaz_id> --qcode Q72_1 --answer "8" [--source "email 18JUN2026"] [--apply]
|
|
Zapíše odpověď k jedné otázce události, promítne do sipiq_responses.answers_supplement,
|
|
přepočítá stav události. Default dry-run.
|
|
|
|
list [--center <email|prijmeni>] [--open]
|
|
Vypíše dotazy (volitelně jen otevřené / pro jedno centrum).
|
|
|
|
Mongo 192.168.1.76:27017, bez auth, pymongo.
|
|
"""
|
|
import argparse
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
|
|
from pymongo import MongoClient, ASCENDING
|
|
from bson import ObjectId
|
|
|
|
MONGO_URI = "mongodb://192.168.1.76:27017"
|
|
DB = "feasibility"
|
|
COL = "doplnujici_dotazy"
|
|
COL_R = "sipiq_responses"
|
|
COL_Q = "sipiq_questions"
|
|
|
|
OPEN_STATES = ("open", "asked")
|
|
|
|
|
|
def now_iso():
|
|
return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds")
|
|
|
|
|
|
def qbase(qcode):
|
|
m = re.match(r"(Q\d+)", qcode)
|
|
return m.group(1) if m else qcode
|
|
|
|
|
|
def db_conn():
|
|
c = MongoClient(MONGO_URI, serverSelectionTimeoutMS=8000)
|
|
c.admin.command("ping")
|
|
return c, c[DB]
|
|
|
|
|
|
def ensure(db):
|
|
db[COL].create_index([("investigator_oid", ASCENDING)])
|
|
db[COL].create_index([("response_id", ASCENDING)])
|
|
db[COL].create_index([("status", ASCENDING)])
|
|
db[COL].create_index([("questions.qcode", ASCENDING)])
|
|
db[COL].create_index([("questions.status", ASCENDING)])
|
|
print(f"OK: kolekce '{COL}' + indexy připraveny. Dokumentů: {db[COL].count_documents({})}")
|
|
|
|
|
|
def find_center(db, key, country=None):
|
|
"""Najde sipiq_responses dle ResponseId / pi_email / příjmení."""
|
|
if key.startswith("R_"):
|
|
d = db[COL_R].find_one({"_id": key})
|
|
if d:
|
|
return d
|
|
d = db[COL_R].find_one({"pi_email": key.lower()})
|
|
if d:
|
|
return d
|
|
flt = {"pi_last_name": re.compile(f"^{re.escape(key)}$", re.I)}
|
|
if country:
|
|
flt["site_country"] = {"CZ": "Czech Republic", "SK": "Slovakia"}.get(country, country)
|
|
cands = list(db[COL_R].find(flt))
|
|
if len(cands) == 1:
|
|
return cands[0]
|
|
if len(cands) > 1:
|
|
raise SystemExit(f"CHYBA: '{key}' je nejednoznačné ({len(cands)} center). Upřesni e-mailem nebo --country / R_id.")
|
|
raise SystemExit(f"CHYBA: centrum '{key}' nenalezeno v {COL_R}.")
|
|
|
|
|
|
def question_meta(db, qcode):
|
|
"""Text + sekce otázky z sipiq_questions (qcode může být leaf, např. Q72_1)."""
|
|
base = qbase(qcode)
|
|
q = db[COL_Q].find_one({"_id": base})
|
|
if not q:
|
|
return {"question_base": base, "question_text": None, "section": None}
|
|
text = q.get("text")
|
|
label = None
|
|
for it in q.get("items", []):
|
|
if it.get("key") == qcode:
|
|
label = it.get("label")
|
|
break
|
|
full = f"{text} — {label}" if label else text
|
|
return {"question_base": base, "question_text": full, "section": q.get("section")}
|
|
|
|
|
|
def cmd_add(db, args, dry):
|
|
center = find_center(db, args.center, args.country)
|
|
qcodes = [q.strip() for q in args.qcodes.split(",") if q.strip()]
|
|
questions = []
|
|
for qc in qcodes:
|
|
meta = question_meta(db, qc)
|
|
questions.append({
|
|
"qcode": qc, "question_base": meta["question_base"],
|
|
"question_text": meta["question_text"], "section": meta["section"],
|
|
"answer": None, "answered_at": None, "answer_source": None, "status": "open",
|
|
})
|
|
ts = now_iso()
|
|
doc = {
|
|
"response_id": center["_id"],
|
|
"investigator_oid": center.get("investigator_oid"),
|
|
"pi_last_name": center.get("pi_last_name"),
|
|
"site_name": center.get("site_name"),
|
|
"site_country": center.get("site_country"),
|
|
"pi_email": center.get("pi_email"),
|
|
"status": args.status,
|
|
"asked_at": ts if args.status == "asked" else None,
|
|
"asked_via": args.asked_via,
|
|
"reason": args.reason or "neodpovězeno v SIPIQ; dotazník už uzavřen",
|
|
"note": args.note,
|
|
"questions": questions,
|
|
"created_at": ts, "updated_at": ts, "history": [],
|
|
}
|
|
print(f"Centrum: {doc['pi_last_name']} / {doc['site_name']} ({doc['site_country']}) resp={doc['response_id']}")
|
|
for q in questions:
|
|
print(f" • {q['qcode']:10} [{q['section']}] {q['question_text']}")
|
|
if dry:
|
|
print("[DRY-RUN] Nezaloženo. Ostrý: --apply")
|
|
return
|
|
res = db[COL].insert_one(doc)
|
|
print(f"[APPLY] Založen dotaz _id={res.inserted_id}")
|
|
|
|
|
|
def cmd_answer(db, args, dry):
|
|
doc = db[COL].find_one({"_id": ObjectId(args.id)})
|
|
if not doc:
|
|
raise SystemExit(f"CHYBA: dotaz _id={args.id} nenalezen.")
|
|
qs = doc["questions"]
|
|
target = next((q for q in qs if q["qcode"] == args.qcode), None)
|
|
if not target:
|
|
raise SystemExit(f"CHYBA: otázka {args.qcode} není v tomto dotazu (má: {[q['qcode'] for q in qs]}).")
|
|
ts = now_iso()
|
|
print(f"Centrum: {doc['pi_last_name']} / {doc['site_name']} resp={doc['response_id']}")
|
|
print(f" {args.qcode}: {target.get('answer')!r} -> {args.answer!r} (zdroj: {args.source})")
|
|
print(f" + promítnutí do {COL_R}.answers_supplement.{args.qcode}")
|
|
if dry:
|
|
print("[DRY-RUN] Nezapsáno. Ostrý: --apply")
|
|
return
|
|
# 1) update otázky v události
|
|
for q in qs:
|
|
if q["qcode"] == args.qcode:
|
|
q["answer"] = args.answer
|
|
q["answered_at"] = ts
|
|
q["answer_source"] = args.source
|
|
q["status"] = "answered"
|
|
all_answered = all(q["status"] == "answered" for q in qs)
|
|
new_status = "answered" if all_answered else "asked"
|
|
db[COL].update_one({"_id": doc["_id"]}, {
|
|
"$set": {"questions": qs, "status": new_status, "updated_at": ts},
|
|
"$push": {"history": {"changed_at": ts, "action": "answer",
|
|
"qcode": args.qcode, "answer": args.answer, "source": args.source}},
|
|
})
|
|
# 2) promítnout do sipiq_responses.answers_supplement (původní answers NEMĚNÍM)
|
|
db[COL_R].update_one({"_id": doc["response_id"]}, {
|
|
"$set": {f"answers_supplement.{args.qcode}": {
|
|
"value": args.answer, "source": "doplneno",
|
|
"doplnujici_dotaz_id": doc["_id"], "answered_at": ts, "answer_source": args.source,
|
|
}}
|
|
})
|
|
print(f"[APPLY] Odpověď zapsána; stav události = {new_status}; promítnuto do {COL_R}.")
|
|
|
|
|
|
def cmd_list(db, args):
|
|
flt = {}
|
|
if args.open:
|
|
flt["status"] = {"$in": list(OPEN_STATES)}
|
|
if args.center:
|
|
key = args.center
|
|
if key.startswith("R_"):
|
|
flt["response_id"] = key
|
|
elif "@" in key:
|
|
flt["pi_email"] = key.lower()
|
|
else:
|
|
flt["pi_last_name"] = re.compile(f"^{re.escape(key)}$", re.I)
|
|
docs = list(db[COL].find(flt).sort("created_at", -1))
|
|
print(f"Dotazů: {len(docs)}")
|
|
for d in docs:
|
|
print(f"\n[{d['_id']}] {d['pi_last_name']} / {d['site_name']} ({d.get('site_country')}) — {d['status']}")
|
|
for q in d["questions"]:
|
|
a = q.get("answer")
|
|
print(f" {q['qcode']:10} {q['status']:9} {('= '+str(a)) if a else '(čeká)'} | {q.get('question_text')}")
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
sub = ap.add_subparsers(dest="cmd", required=True)
|
|
sub.add_parser("ensure")
|
|
pa = sub.add_parser("add")
|
|
pa.add_argument("--center", required=True)
|
|
pa.add_argument("--country")
|
|
pa.add_argument("--qcodes", required=True)
|
|
pa.add_argument("--reason")
|
|
pa.add_argument("--asked-via", dest="asked_via")
|
|
pa.add_argument("--status", default="open", choices=["open", "asked"])
|
|
pa.add_argument("--note")
|
|
pa.add_argument("--apply", action="store_true")
|
|
pn = sub.add_parser("answer")
|
|
pn.add_argument("--id", required=True)
|
|
pn.add_argument("--qcode", required=True)
|
|
pn.add_argument("--answer", required=True)
|
|
pn.add_argument("--source")
|
|
pn.add_argument("--apply", action="store_true")
|
|
pl = sub.add_parser("list")
|
|
pl.add_argument("--center")
|
|
pl.add_argument("--open", action="store_true")
|
|
args = ap.parse_args()
|
|
|
|
client, db = db_conn()
|
|
try:
|
|
if args.cmd == "ensure":
|
|
ensure(db)
|
|
elif args.cmd == "add":
|
|
cmd_add(db, args, dry=not args.apply)
|
|
elif args.cmd == "answer":
|
|
cmd_answer(db, args, dry=not args.apply)
|
|
elif args.cmd == "list":
|
|
cmd_list(db, args)
|
|
finally:
|
|
client.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|