#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ sipiq_import_v1.0.py ==================== Verze: 1.0 Datum: 2026-06-17 Autor: Claude Code (pro MUDr. Vladimíra Buzalku) Popis ----- Import SIPIQ odpovědí (Qualtrics CSV export, studie 77242113UCO3002 / ICONIC DAWN) do MongoDB db `feasibility`. Cílem je: (a) umožnit křížovou analýzu „otázka × otázka" (ploché odpovědi keyed by Qcode), (b) umožnit zrekonstruovat KOMPLETNÍ SIPIQ tak, jak ho zkoušející vidí v PDF, jen s vyplněnými odpověďmi (slovník otázek se sekcí/pořadím/popisky). Dvě kolekce v db `feasibility`: * sipiq_questions – slovník dotazníku (1 dok = 1 logická otázka; section, order, text, items[], type, options). Idempotentní (upsert dle _id). * sipiq_responses – 1 dok = 1 odpověď (_id = Qualtrics ResponseId). Identita centra/PI nahoře, ploché answers{}, meta{}, soft-link investigator_oid, delta bookkeeping (content_sha256, history[], timestamps). DELTA import (přepíše JEN změněná data): - nová odpověď -> insert - existuje, beze změn -> aktualizuje pouze last_seen_at (+ source_file) - existuje, něco se změnilo -> $set jen změněných polí + push do history[] {key,old,new} Soft-link na feasibility.investigators: - primárně pi_email == email / email2 (lowercase) - fallback příjmení (bez diakritiky, lower) + země (CZ/SK) - nedestruktivní: kolekci investigators NEMĚNÍ, jen ukládá investigator_oid do response. Rozsah: default CZ + SK (--scope czsk). --scope all = všech 276. Použití: python sipiq_import_v1.0.py --csv "" --dry-run python sipiq_import_v1.0.py --csv "" --apply Závislosti: pymongo (.venv). Mongo 192.168.1.76:27017, bez auth. """ import argparse import csv import hashlib import json import re import sys import unicodedata from datetime import datetime, timezone try: from pymongo import MongoClient except ImportError: print("CHYBA: pymongo není nainstalován v aktuálním pythonu.", file=sys.stderr) raise MONGO_URI = "mongodb://192.168.1.76:27017" DB_NAME = "feasibility" COL_Q = "sipiq_questions" COL_R = "sipiq_responses" # Qualtrics systémová meta pole (NEjdou do answers) META_COLS = { "StartDate", "EndDate", "Status", "IPAddress", "Progress", "Duration (in seconds)", "Finished", "RecordedDate", "ResponseId", "RecipientLastName", "RecipientFirstName", "RecipientEmail", "ExternalReference", "LocationLatitude", "LocationLongitude", "DistributionChannel", "UserLanguage", } # Embedded SDL pole povýšená nahoru do dokumentu (queryable identita) PROMOTE = [ "site_name", "site_address", "site_city", "site_state", "site_postcode", "site_country", "pi_first_name", "pi_last_name", "pi_phone", "pi_email", "sdl_site_id", "fire_site_id", "fire_investigator_id", "mailinglist_id", "survey_generated_by", "Date", "Time", ] # Sekce dle ověřeného katalogu (mapování báze Q-čísla -> sekce v PDF) SECTION_BY_QNUM = {} def _sec(rng, name): for n in rng: SECTION_BY_QNUM[n] = name _sec([2], "J&J Internal Assessment") _sec([6, 7, 8, 9, 10, 11, 12, 13], "Contact Information") _sec(range(14, 22), "Confidentiality Statement") _sec([25, 26, 27], "Interest") _sec([29, 30, 31, 32, 33, 34], "Protocol Requirements") _sec([36, 37, 38], "Enrollment") _sec([40, 41, 42, 43], "Patient Demographics Overview") _sec([45, 46, 47, 48, 49], "Site Overview") _sec([51], "Operational Considerations") _sec([53, 54], "Comments") _sec([57, 58, 59, 60, 61], "Patient Population") _sec([63, 64, 65, 66, 67], "Site Experience and Staffing") _sec([69], "Equipment and Facility Requirements") _sec([71, 72, 73, 74, 75], "Institutional Review Board, Ethics Committee, and Contracts") # Plné znění otázek, které Qualtrics v hlavičce CSV ořezává "..." (maticové otázky). # Zdroj: prázdný SIPIQ PDF (ICONIC ... _SipIQ_V1_13MAY2026.pdf). STEM_OVERRIDE = { "Q31": "At your site, at what line(s) of treatment do you most commonly prescribe " "vedolizumab for patients with moderately to severely active ulcerative colitis?", "Q63": "Do you or your site staff have experience in performing the following types of " "study assessments/procedures?", "Q64": "The following personnel are required to run the study. " "Will your site have the following available?", "Q69": "The following equipment and facilities are required to run the studies. " "Are these available at your site?", } def now_iso(): return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds") def strip_accents(s): if not s: return "" nfkd = unicodedata.normalize("NFKD", s) return "".join(c for c in nfkd if not unicodedata.combining(c)) def norm_name(s): return re.sub(r"\s+", " ", strip_accents(s or "").lower()).strip() def sanitize_key(qcode): """Qcode -> klíč do answers{} (MongoDB-safe): '#' a '.' -> '_'.""" return qcode.replace("#", "_").replace(".", "_") def qnum(qcode): """Číslo otázky z Qcode (Q63#1_2 -> 63, Q40_6_TEXT -> 40).""" m = re.match(r"Q(\d+)", qcode) return int(m.group(1)) if m else None def qbase(qcode): """Logická báze otázky (Q63#1_2 -> Q63, Q40_6 -> Q40, Q25 -> Q25).""" m = re.match(r"(Q\d+)", qcode) return m.group(1) if m else qcode def import_id(h3_cell): try: return json.loads(h3_cell).get("ImportId", "") except Exception: return h3_cell def split_text(text): """Vrátí (stem, item_label). Stem = text otázky, item_label = popisek podčásti.""" parts = [p.strip() for p in re.split(r"\s+-\s+", text)] stem = parts[0] if len(parts) == 1: return stem, None # poslední část = popisek řádku/části; vyčisti Qualtrics artefakty label_parts = parts[1:] # zahodit "Selected Choice" (artefakt single-choice s Other) label_parts = [p for p in label_parts if p.lower() != "selected choice"] # zahodit interní statement kód typu "Q63#1" label_parts = [p for p in label_parts if not re.fullmatch(r"Q\d+#\d+", p)] label = " - ".join(label_parts) if label_parts else None return stem, label def detect_type(qcode, observed): """Heuristika typu otázky z Qcode a pozorovaných hodnot.""" has_hash = "#" in qcode vals = [v for v in observed if v] yesno = vals and all(v in ("Yes", "No") for v in vals) numeric = vals and all(re.fullmatch(r"-?\d+(\.\d+)?", v) for v in vals) if has_hash and yesno: return "matrix_yesno" if has_hash and numeric: return "matrix_percent" if has_hash: return "matrix" if numeric: return "numeric" if yesno: return "yesno" return "single_or_text" # --------------------------------------------------------------------------- def load_csv(path): with open(path, encoding="utf-8-sig", newline="") as fh: rows = list(csv.reader(fh)) h1, h2, h3 = rows[0], rows[1], rows[2] data = rows[3:] cols = [] for i, (code, text, imp) in enumerate(zip(h1, h2, h3)): cols.append({"i": i, "code": code, "text": text, "qid": import_id(imp)}) return cols, data def col_getter(cols, data): idx = {c["code"]: c["i"] for c in cols} def get(row, code): i = idx.get(code) return (row[i].strip() if i is not None and i < len(row) else "") return get, idx def is_question_col(code): return bool(re.match(r"Q\d", code)) # --------------------------------------------------------------------------- def build_questions(cols, data): """Slovník otázek -> list dokumentů (1 = 1 logická otázka).""" # observed hodnoty per Qcode (pro typ + options) qcols = [c for c in cols if is_question_col(c["code"])] observed = {c["code"]: set() for c in qcols} for row in data: for c in qcols: v = (row[c["i"]].strip() if c["i"] < len(row) else "") if v: observed[c["code"]].add(v) groups = {} # base -> dict order_seen = [] for c in qcols: base = qbase(c["code"]) if base not in groups: groups[base] = { "_id": base, "order": c["i"], "qnum": qnum(c["code"]), "section": SECTION_BY_QNUM.get(qnum(c["code"]), "Other"), "qids": [], "text": split_text(c["text"])[0], "items": [], "_obs": set(), "_types": [], } order_seen.append(base) g = groups[base] base_qid = re.match(r"(QID\d+)", c["qid"] or "") if base_qid and base_qid.group(1) not in g["qids"]: g["qids"].append(base_qid.group(1)) stem, label = split_text(c["text"]) key = sanitize_key(c["code"]) item = {"key": key, "qcode": c["code"], "qid": c["qid"]} if label: item["label"] = label g["items"].append(item) g["_obs"] |= observed[c["code"]] g["_types"].append(detect_type(c["code"], observed[c["code"]])) out = [] for n, base in enumerate(order_seen): g = groups[base] obs = sorted(g.pop("_obs")) types = g.pop("_types") # typ skupiny: nejčastější netriviální gtype = max(set(types), key=types.count) if types else "single_or_text" g["type"] = gtype # options jen u kategorických (yesno/single) if gtype in ("yesno", "matrix_yesno"): g["options"] = ["Yes", "No"] elif gtype == "single_or_text" and obs and len(obs) <= 12: g["options"] = obs else: g["options"] = [] if base in STEM_OVERRIDE: g["text"] = STEM_OVERRIDE[base] g["order"] = n # přečíslovat 0..N dle pořadí v CSV # pokud má jen 1 item bez labelu, items vynech (je to prostá otázka) if len(g["items"]) == 1 and "label" not in g["items"][0]: g["items"] = [] out.append(g) return out # --------------------------------------------------------------------------- def build_response(cols, get, row, source_file): rid = get(row, "ResponseId") answers = {} for c in cols: if is_question_col(c["code"]): v = (row[c["i"]].strip() if c["i"] < len(row) else "") if v: answers[sanitize_key(c["code"])] = v def g(*names): for nm in names: v = get(row, nm) if v: return v return None meta = { "start_date": get(row, "StartDate") or None, "end_date": get(row, "EndDate") or None, "recorded_date": get(row, "RecordedDate") or None, "status": get(row, "Status") or None, "progress": int(get(row, "Progress")) if get(row, "Progress").isdigit() else get(row, "Progress") or None, "finished": get(row, "Finished") in ("True", "1", "TRUE"), "duration_sec": int(get(row, "Duration (in seconds)")) if get(row, "Duration (in seconds)").isdigit() else None, "user_language": get(row, "UserLanguage") or None, "distribution_channel": get(row, "DistributionChannel") or None, "ip_address": get(row, "IPAddress") or None, "location_lat": get(row, "LocationLatitude") or None, "location_lng": get(row, "LocationLongitude") or None, "survey_date": get(row, "Date") or None, "survey_time": get(row, "Time") or None, } doc = { "_id": rid, "study": "77242113UCO3002", "site_country": get(row, "site_country") or None, "site_name": get(row, "site_name") or None, "site_city": get(row, "site_city") or None, "site_state": get(row, "site_state") or None, "site_postcode": get(row, "site_postcode") or None, "site_address": get(row, "site_address") or None, "pi_first_name": get(row, "pi_first_name") or None, "pi_last_name": get(row, "pi_last_name") or None, "pi_email": (get(row, "pi_email") or "").lower() or None, "pi_phone": get(row, "pi_phone") or None, "sdl_site_id": get(row, "sdl_site_id") or None, "fire_site_id": get(row, "fire_site_id") or None, "fire_investigator_id": get(row, "fire_investigator_id") or None, "mailinglist_id": get(row, "mailinglist_id") or None, "survey_generated_by": get(row, "survey_generated_by") or None, "recipient_email": (get(row, "RecipientEmail") or "").lower() or None, "recipient_last_name": get(row, "RecipientLastName") or None, "recipient_first_name": get(row, "RecipientFirstName") or None, "meta": meta, "is_full_sipiq": any(k.startswith(("Q57", "Q58", "Q59", "Q63", "Q66", "Q71")) for k in answers), "interested": answers.get("Q25"), "answers": answers, "investigator_oid": None, "investigator_match": None, "source_file": source_file, } return doc def content_hash(doc): payload = {k: doc[k] for k in doc if k not in ("content_sha256", "first_imported_at", "last_seen_at", "last_updated_at", "history", "investigator_oid", "investigator_match", "source_file")} blob = json.dumps(payload, sort_keys=True, ensure_ascii=False, default=str) return hashlib.sha256(blob.encode("utf-8")).hexdigest() # --------------------------------------------------------------------------- def load_investigators(db): inv = list(db.investigators.find( {"zeme": {"$in": ["Czech Republic", "Slovakia"]}}, {"prijmeni": 1, "jmeno": 1, "email": 1, "email2": 1, "zeme": 1, "KROK": 1, "pracoviste": 1}, )) by_email = {} by_name = {} for d in inv: for ef in ("email", "email2"): e = (d.get(ef) or "").lower().strip() if e: by_email.setdefault(e, d) nm = norm_name(d.get("prijmeni")) if nm: by_name.setdefault((nm, d.get("zeme")), []).append(d) return inv, by_email, by_name def soft_link(doc, by_email, by_name): e = (doc.get("pi_email") or "").lower().strip() if e and e in by_email: d = by_email[e] return d["_id"], f"email:{e}", d e2 = (doc.get("recipient_email") or "").lower().strip() if e2 and e2 in by_email: d = by_email[e2] return d["_id"], f"recipient_email:{e2}", d nm = norm_name(doc.get("pi_last_name")) cand = by_name.get((nm, doc.get("site_country")), []) if len(cand) == 1: return cand[0]["_id"], f"prijmeni:{nm}", cand[0] if len(cand) > 1: return None, f"prijmeni_ambiguous:{nm}({len(cand)})", None return None, "NENALEZENO", None # --------------------------------------------------------------------------- def main(): ap = argparse.ArgumentParser() ap.add_argument("--csv", required=True) ap.add_argument("--scope", choices=["czsk", "all"], default="czsk") ap.add_argument("--apply", action="store_true", help="ostrý zápis (jinak dry-run)") ap.add_argument("--dry-run", action="store_true") args = ap.parse_args() dry = not args.apply source_file = args.csv.replace("\\", "/").split("/")[-1] cols, data = load_csv(args.csv) get, idx = col_getter(cols, data) # filtr rozsahu if args.scope == "czsk": data = [r for r in data if get(r, "site_country") in ("Czech Republic", "Slovakia")] print(f"Zdroj: {source_file} | rozsah={args.scope} | odpovědí k importu: {len(data)}") # --- slovník otázek (staví se z PLNÉHO CSV, ne jen scope) --- cols_all, data_all = load_csv(args.csv) questions = build_questions(cols_all, data_all) print(f"Slovník otázek: {len(questions)} logických otázek " f"(z toho {sum(1 for q in questions if q['items'])} vícedílných).") # --- Mongo --- client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=8000) db = client[DB_NAME] client.admin.command("ping") inv, by_email, by_name = load_investigators(db) print(f"Investigatorů CZ+SK v DB: {len(inv)}") # --- response dokumenty + soft-link --- docs = [] link_rows = [] for r in data: doc = build_response(cols, get, r, source_file) oid, how, matched = soft_link(doc, by_email, by_name) doc["investigator_oid"] = oid doc["investigator_match"] = how doc["content_sha256"] = content_hash(doc) docs.append(doc) link_rows.append((doc, how, matched)) # --- delta proti DB --- existing = {d["_id"]: d for d in db[COL_R].find({}, {"content_sha256": 1})} to_insert = [d for d in docs if d["_id"] not in existing] to_update, unchanged = [], [] for d in docs: if d["_id"] in existing: if existing[d["_id"]].get("content_sha256") != d["content_sha256"]: to_update.append(d) else: unchanged.append(d) # ===================== REPORT ===================== print("\n=== SOFT-LINK na investigators ===") matched_k7 = matched_other = unmatched = 0 for doc, how, m in link_rows: krok = (m or {}).get("KROK", "") tag = "✓" if m else "✗" if m and str(krok).startswith("7"): matched_k7 += 1 elif m: matched_other += 1 else: unmatched += 1 print(f" {tag} {doc.get('site_country','?')[:2]} {str(doc.get('pi_last_name'))[:18]:18} " f"{str(doc.get('pi_email'))[:32]:32} -> {how[:40]:40} {('KROK '+str(krok)) if m else ''}") print(f" Souhrn: napárováno KROK7={matched_k7}, jiný KROK={matched_other}, nenapárováno={unmatched}") print("\n=== DELTA ===") print(f" INSERT (nové): {len(to_insert)}") print(f" UPDATE (změněné): {len(to_update)}") print(f" beze změny: {len(unchanged)}") # ukázka 1 dokumentu if docs: s = dict(docs[0]) s["answers"] = {k: s["answers"][k] for k in list(s["answers"])[:6]} s["answers"]["…"] = f"(+{len(docs[0]['answers'])-6} dalších)" print("\n=== UKÁZKA response dokumentu (zkráceno) ===") print(json.dumps(s, ensure_ascii=False, indent=2, default=str)[:1800]) if dry: print("\n[DRY-RUN] Nic se nezapsalo. Ostrý běh: přidej --apply") client.close() return # ===================== ZÁPIS ===================== # 1) slovník otázek (idempotentní upsert) nq = 0 for q in questions: db[COL_Q].replace_one({"_id": q["_id"]}, q, upsert=True) nq += 1 print(f"\n[APPLY] sipiq_questions: upsertnuto {nq}") # 2) responses (delta) ts = now_iso() ni = nu = ns = 0 for d in docs: cur = db[COL_R].find_one({"_id": d["_id"]}) if cur is None: d["first_imported_at"] = ts d["last_seen_at"] = ts d["last_updated_at"] = ts d["history"] = [] db[COL_R].insert_one(d) ni += 1 elif cur.get("content_sha256") != d["content_sha256"]: changes = diff_docs(cur, d) db[COL_R].update_one({"_id": d["_id"]}, { "$set": {**{k: d[k] for k in d if k not in ("_id",)}, "last_seen_at": ts, "last_updated_at": ts}, "$push": {"history": {"changed_at": ts, "source_file": source_file, "changes": changes}}, }) nu += 1 else: db[COL_R].update_one({"_id": d["_id"]}, {"$set": {"last_seen_at": ts, "source_file": source_file}}) ns += 1 print(f"[APPLY] sipiq_responses: insert={ni}, update={nu}, beze změny={ns}") client.close() def diff_docs(old, new): """Field-level diff pro history (jen answers + povýšená pole + meta).""" changes = [] def walk(prefix, o, n): keys = set((o or {}).keys()) | set((n or {}).keys()) for k in sorted(keys): ov, nv = (o or {}).get(k), (n or {}).get(k) if isinstance(ov, dict) or isinstance(nv, dict): walk(f"{prefix}{k}.", ov or {}, nv or {}) elif ov != nv: changes.append({"key": f"{prefix}{k}", "old": ov, "new": nv}) for field in ("answers", "meta"): walk(f"{field}.", old.get(field, {}), new.get(field, {})) for k in ("site_name", "pi_email", "pi_last_name", "interested", "is_full_sipiq"): if old.get(k) != new.get(k): changes.append({"key": k, "old": old.get(k), "new": new.get(k)}) return changes if __name__ == "__main__": main()