Files
janssen/Feasibility/doplnujici_dotazy_v1.0.py
T
2026-06-17 15:05:10 +02:00

255 lines
9.4 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
doplnujici_dotazy_v1.0.py
=========================
Verze: 1.0
Datum: 2026-06-17
Autor: Claude Code (pro MUDr. Vladimíra Buzalku)
Popis
-----
Správa kolekce `feasibility.doplnujici_dotazy` — evidence doplňujících dotazů na centra,
když v SIPIQ chybí odpověď a do dotazníku už NELZE vstoupit. Víme tak, ke kterému centru
(a ke které otázce) dotaz patří, a v jakém je stavu.
Model (domluva 17JUN2026): **1 dok = dotazová UDÁLOST** (může nést více otázek v `questions[]`).
Když centrum odpoví, odpověď se PROMÍTNE i do `sipiq_responses.answers_supplement{}`
(s příznakem source="doplneno"); původní Qualtrics `answers` se NEMĚNÍ.
Životní cyklus dotazu: open → asked → answered → closed / no_response.
Příkazy
-------
ensure
Založí kolekci + indexy (idempotentní).
add --center <email|prijmeni|R_id> [--country CZ|SK] --qcodes Q72_1,Q73_1
[--reason ""] [--asked-via ""] [--status asked] [--note ""] [--apply]
Založí novou dotazovou událost. Centrum + otázky se dohledají v sipiq_responses
/ sipiq_questions; identita se denormalizuje. Default dry-run.
answer --id <dotaz_id> --qcode Q72_1 --answer "8" [--source "email 18JUN2026"] [--apply]
Zapíše odpověď k jedné otázce události, promítne do sipiq_responses.answers_supplement,
přepočítá stav události. Default dry-run.
list [--center <email|prijmeni>] [--open]
Vypíše dotazy (volitelně jen otevřené / pro jedno centrum).
Mongo 192.168.1.76:27017, bez auth, pymongo.
"""
import argparse
import re
import sys
from datetime import datetime, timezone
from pymongo import MongoClient, ASCENDING
from bson import ObjectId
MONGO_URI = "mongodb://192.168.1.76:27017"
DB = "feasibility"
COL = "doplnujici_dotazy"
COL_R = "sipiq_responses"
COL_Q = "sipiq_questions"
OPEN_STATES = ("open", "asked")
def now_iso():
return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds")
def qbase(qcode):
m = re.match(r"(Q\d+)", qcode)
return m.group(1) if m else qcode
def db_conn():
c = MongoClient(MONGO_URI, serverSelectionTimeoutMS=8000)
c.admin.command("ping")
return c, c[DB]
def ensure(db):
db[COL].create_index([("investigator_oid", ASCENDING)])
db[COL].create_index([("response_id", ASCENDING)])
db[COL].create_index([("status", ASCENDING)])
db[COL].create_index([("questions.qcode", ASCENDING)])
db[COL].create_index([("questions.status", ASCENDING)])
print(f"OK: kolekce '{COL}' + indexy připraveny. Dokumentů: {db[COL].count_documents({})}")
def find_center(db, key, country=None):
"""Najde sipiq_responses dle ResponseId / pi_email / příjmení."""
if key.startswith("R_"):
d = db[COL_R].find_one({"_id": key})
if d:
return d
d = db[COL_R].find_one({"pi_email": key.lower()})
if d:
return d
flt = {"pi_last_name": re.compile(f"^{re.escape(key)}$", re.I)}
if country:
flt["site_country"] = {"CZ": "Czech Republic", "SK": "Slovakia"}.get(country, country)
cands = list(db[COL_R].find(flt))
if len(cands) == 1:
return cands[0]
if len(cands) > 1:
raise SystemExit(f"CHYBA: '{key}' je nejednoznačné ({len(cands)} center). Upřesni e-mailem nebo --country / R_id.")
raise SystemExit(f"CHYBA: centrum '{key}' nenalezeno v {COL_R}.")
def question_meta(db, qcode):
"""Text + sekce otázky z sipiq_questions (qcode může být leaf, např. Q72_1)."""
base = qbase(qcode)
q = db[COL_Q].find_one({"_id": base})
if not q:
return {"question_base": base, "question_text": None, "section": None}
text = q.get("text")
label = None
for it in q.get("items", []):
if it.get("key") == qcode:
label = it.get("label")
break
full = f"{text}{label}" if label else text
return {"question_base": base, "question_text": full, "section": q.get("section")}
def cmd_add(db, args, dry):
center = find_center(db, args.center, args.country)
qcodes = [q.strip() for q in args.qcodes.split(",") if q.strip()]
questions = []
for qc in qcodes:
meta = question_meta(db, qc)
questions.append({
"qcode": qc, "question_base": meta["question_base"],
"question_text": meta["question_text"], "section": meta["section"],
"answer": None, "answered_at": None, "answer_source": None, "status": "open",
})
ts = now_iso()
doc = {
"response_id": center["_id"],
"investigator_oid": center.get("investigator_oid"),
"pi_last_name": center.get("pi_last_name"),
"site_name": center.get("site_name"),
"site_country": center.get("site_country"),
"pi_email": center.get("pi_email"),
"status": args.status,
"asked_at": ts if args.status == "asked" else None,
"asked_via": args.asked_via,
"reason": args.reason or "neodpovězeno v SIPIQ; dotazník už uzavřen",
"note": args.note,
"questions": questions,
"created_at": ts, "updated_at": ts, "history": [],
}
print(f"Centrum: {doc['pi_last_name']} / {doc['site_name']} ({doc['site_country']}) resp={doc['response_id']}")
for q in questions:
print(f"{q['qcode']:10} [{q['section']}] {q['question_text']}")
if dry:
print("[DRY-RUN] Nezaloženo. Ostrý: --apply")
return
res = db[COL].insert_one(doc)
print(f"[APPLY] Založen dotaz _id={res.inserted_id}")
def cmd_answer(db, args, dry):
doc = db[COL].find_one({"_id": ObjectId(args.id)})
if not doc:
raise SystemExit(f"CHYBA: dotaz _id={args.id} nenalezen.")
qs = doc["questions"]
target = next((q for q in qs if q["qcode"] == args.qcode), None)
if not target:
raise SystemExit(f"CHYBA: otázka {args.qcode} není v tomto dotazu (má: {[q['qcode'] for q in qs]}).")
ts = now_iso()
print(f"Centrum: {doc['pi_last_name']} / {doc['site_name']} resp={doc['response_id']}")
print(f" {args.qcode}: {target.get('answer')!r} -> {args.answer!r} (zdroj: {args.source})")
print(f" + promítnutí do {COL_R}.answers_supplement.{args.qcode}")
if dry:
print("[DRY-RUN] Nezapsáno. Ostrý: --apply")
return
# 1) update otázky v události
for q in qs:
if q["qcode"] == args.qcode:
q["answer"] = args.answer
q["answered_at"] = ts
q["answer_source"] = args.source
q["status"] = "answered"
all_answered = all(q["status"] == "answered" for q in qs)
new_status = "answered" if all_answered else "asked"
db[COL].update_one({"_id": doc["_id"]}, {
"$set": {"questions": qs, "status": new_status, "updated_at": ts},
"$push": {"history": {"changed_at": ts, "action": "answer",
"qcode": args.qcode, "answer": args.answer, "source": args.source}},
})
# 2) promítnout do sipiq_responses.answers_supplement (původní answers NEMĚNÍM)
db[COL_R].update_one({"_id": doc["response_id"]}, {
"$set": {f"answers_supplement.{args.qcode}": {
"value": args.answer, "source": "doplneno",
"doplnujici_dotaz_id": doc["_id"], "answered_at": ts, "answer_source": args.source,
}}
})
print(f"[APPLY] Odpověď zapsána; stav události = {new_status}; promítnuto do {COL_R}.")
def cmd_list(db, args):
flt = {}
if args.open:
flt["status"] = {"$in": list(OPEN_STATES)}
if args.center:
key = args.center
if key.startswith("R_"):
flt["response_id"] = key
elif "@" in key:
flt["pi_email"] = key.lower()
else:
flt["pi_last_name"] = re.compile(f"^{re.escape(key)}$", re.I)
docs = list(db[COL].find(flt).sort("created_at", -1))
print(f"Dotazů: {len(docs)}")
for d in docs:
print(f"\n[{d['_id']}] {d['pi_last_name']} / {d['site_name']} ({d.get('site_country')}) — {d['status']}")
for q in d["questions"]:
a = q.get("answer")
print(f" {q['qcode']:10} {q['status']:9} {('= '+str(a)) if a else '(čeká)'} | {q.get('question_text')}")
def main():
ap = argparse.ArgumentParser()
sub = ap.add_subparsers(dest="cmd", required=True)
sub.add_parser("ensure")
pa = sub.add_parser("add")
pa.add_argument("--center", required=True)
pa.add_argument("--country")
pa.add_argument("--qcodes", required=True)
pa.add_argument("--reason")
pa.add_argument("--asked-via", dest="asked_via")
pa.add_argument("--status", default="open", choices=["open", "asked"])
pa.add_argument("--note")
pa.add_argument("--apply", action="store_true")
pn = sub.add_parser("answer")
pn.add_argument("--id", required=True)
pn.add_argument("--qcode", required=True)
pn.add_argument("--answer", required=True)
pn.add_argument("--source")
pn.add_argument("--apply", action="store_true")
pl = sub.add_parser("list")
pl.add_argument("--center")
pl.add_argument("--open", action="store_true")
args = ap.parse_args()
client, db = db_conn()
try:
if args.cmd == "ensure":
ensure(db)
elif args.cmd == "add":
cmd_add(db, args, dry=not args.apply)
elif args.cmd == "answer":
cmd_answer(db, args, dry=not args.apply)
elif args.cmd == "list":
cmd_list(db, args)
finally:
client.close()
if __name__ == "__main__":
main()