# ============================================================ # vtmf_pipeline_v1.0.py # Verze: 1.0 # Datum: 2026-06-12 # Popis: Kompletní workflow V-TMF (J&J Veeva Vault), studie # 77242113UCO3001. Jeden běh udělá: # 1) login do Vaultu (persistentní session + ruční 2FA), # 2) export reportu "Document Inventory Report - Study # Level" do Excelu (Data Only) do WhatToDownload/, # 3) parse reportu a synchronizaci do MongoDB # (Tower, db VTMF, kolekce documents, # klíč = VTMF číslo + verze): # - nové dokumenty se založí, # - změny polí se promítnou (+ history[]), # - dokumenty chybějící v reportu se označí # deleted=True a stažený soubor dostane ' [D]', # - znovuobjevené se vzkřísí a ' [D]' se odebere, # 4) stažení všech dosud nestažených dokumentů do # U:\Dropbox\!!!Days\Downloads Z230\VTMF-77242113UCO3001\ # \\"YYYY-MM-DD Description # [VTMF-x] [v1.0]." + zápis stavu do Mongo. # # Tracking stahování je KOMPLETNĚ v Mongo; starý # download_state.csv se při prvním běhu jednorázově # namigruje a přejmenuje na .imported. # # Vychází z download_vault_v2.1 (v TRASH/) — login, dialogy # a stahování beze změny; nové jsou kroky 2 a 3. # # Heslo se NIKDY nedává natvrdo do skriptu — čte se z .env # v rootu projektu Janssen (VAULT_USER / VAULT_PASS). # ============================================================ import csv import os import re import sys from datetime import datetime from pathlib import Path from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout from pymongo import MongoClient, ASCENDING # --- Konfigurace ------------------------------------------------------- LOGIN_URL = ("https://fedlogin.jnj.com/idp/eyJ2c2lkIjoiam5qX3ZlZXZhIn0/" "startSSO.ping?PartnerSpId=janssenetmf.veevavault.com" "&IdpAdapterId=CompIWALDAPEXTFORM" "&TargetResource=https%3A%2F%2Fvtmf.veevavault.com%2F") # Report Document Inventory Report - Study Level, filtr na studii REPORT_URL = ("https://vtmf.veevavault.com/ui/#reporting/viewer/" "0RP000000000182?study__v%2C%2C%2CIN=0ST000000137008") VAULT_UI_PATTERN = "**vtmf.veevavault.com/ui**" # úspěšný vstup do Vaultu SCRIPT_DIR = Path(__file__).resolve().parent PROFILE_DIR = SCRIPT_DIR / "vault_profile" # perzistentní session ENV_FILE = SCRIPT_DIR.parent / ".env" # root projektu Janssen DEBUG_DIR = SCRIPT_DIR / "debug" # diagnostické výstupy EXCEL_DIR = SCRIPT_DIR / "WhatToDownload" # stažené reporty PROCESSED_DIR = EXCEL_DIR / "Zpracovano" # archiv zpracovaných OLD_STATE_FILE = SCRIPT_DIR / "download_state.csv" # legacy CSV (migrace) DOWNLOAD_ROOT = Path(r"U:\Dropbox\!!!Days\Downloads Z230\VTMF-77242113UCO3001") MONGO_URI = "mongodb://192.168.1.76:27017" MONGO_DB = "VTMF" MONGO_COLL = "documents" # Kolik dokumentů stáhnout v tomto běhu (None = všechny zbývající) LIMIT = 10 # Pole reportu, jejichž změny se promítají a verzují do history[] TRACKED_FIELDS = ("name", "status", "type", "subtype", "desc", "date", "url", "studies") MAX_ATTEMPTS = 2 # pokusy na jeden dokument RETRY_PAUSE_MS = 5000 # pauza před opakováním BETWEEN_DOCS_MS = 500 # pauza mezi dokumenty def log(msg): print(msg, flush=True) def load_env_file(path): """Načte KEY=VALUE řádky z .env do os.environ. Už nastavené env proměnné mají přednost, .env je nepřepisuje.""" if not path.exists(): log(f"[!] .env nenalezen: {path}") return for line in path.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, _, value = line.partition("=") key, value = key.strip(), value.strip().strip('"').strip("'") if value and key not in os.environ: os.environ[key] = value ENV_SECTION_HEADER = "# --- Veeva Vault (J&J V-TMF) — VTMFDownloadFiles/download_vault ---" ENV_KEYS = ("VAULT_USER", "VAULT_PASS") def ensure_credentials(): """Načte .env; pokud VAULT_USER/VAULT_PASS chybí, založí/doplní v .env šablonu, vyzve uživatele k doplnění a ukončí skript.""" load_env_file(ENV_FILE) if all(os.environ.get(k) for k in ENV_KEYS): return existing = ENV_FILE.read_text(encoding="utf-8") if ENV_FILE.exists() else "" missing_lines = [f"{k}=" for k in ENV_KEYS if not re.search(rf"^\s*{k}\s*=", existing, re.M)] if not ENV_FILE.exists(): ENV_FILE.write_text( "# .env — lokální přihlašovací údaje (NEVERZOVAT, je v .gitignore)\n\n" + ENV_SECTION_HEADER + "\n" + "\n".join(missing_lines) + "\n", encoding="utf-8") log(f"[i] Založil jsem nový .env: {ENV_FILE}") elif missing_lines: with open(ENV_FILE, "a", encoding="utf-8") as f: f.write("\n" + ENV_SECTION_HEADER + "\n" + "\n".join(missing_lines) + "\n") log(f"[i] Doplnil jsem chybějící řádky do .env: {ENV_FILE}") print("\n" + "=" * 60) print(" CHYBÍ PŘIHLAŠOVACÍ ÚDAJE.") print(f" Doplň VAULT_USER a VAULT_PASS do souboru:") print(f" {ENV_FILE}") print(" a spusť skript znovu.") print("=" * 60) sys.exit(1) # --- Parsování Excelu -------------------------------------------------- HYPERLINK_RE = re.compile(r'HYPERLINK\("([^"]+)"\s*,\s*"([^"]+)"\)') VERSION_RE = re.compile(r"\((v[^)]+)\)\s*$") # nepovolené znaky Windows názvů + řídicí znaky + unicode artefakt � BAD_CHARS_RE = re.compile(r"[<>:\"/\\|?*\x00-\x1f�]") def clean_filename(s): """Očistí string na platné jméno souboru/složky ve Windows.""" s = BAD_CHARS_RE.sub("_", str(s)) s = re.sub(r"\s+", " ", s) # vícenásobné mezery -> jedna s = re.sub(r"_{2,}", "_", s) # vícenásobná podtržítka -> jedno return s.strip(" ._") # okraje: mezery, tečky, podtržítka def display_text(cell): """Zobrazený text buňky — u =HYPERLINK vzorce druhý argument.""" raw = str(cell.value or "").strip() m = HYPERLINK_RE.search(raw) return m.group(2).strip() if m else raw def extract_doc_url(raw): """Z HYPERLINK hodnoty (nebo i rozbité URL) vytáhne čistou doc URL ve tvaru https:///ui/#doc_info///.""" m = re.search(r"(https://[^/\"]+/ui/#doc_info/\d+/\d+/\d+)", str(raw)) if not m: raise ValueError(f"Nenašel jsem doc URL v: {raw!r}") return m.group(1) def read_documents_from_excel(path): """Načte dokumenty z daného .xlsx reportu. Vrací list dictů: vtmf, version, url, name, status, type, subtype, desc, date, studies. Document Name/Number/Status jsou =HYPERLINK vzorce — URL i text se berou regexem. Report má rozbité deklarované rozměry, čte se přímou iterací řádků.""" from openpyxl import load_workbook log(f"[i] Parsování reportu: {path.name}") wb = load_workbook(path, data_only=False) # potřebujeme vzorce ws = wb[wb.sheetnames[0]] rows = ws.iter_rows() header = [c.value for c in next(rows)] try: i_num = header.index("Document Number") i_name = header.index("Document Name") i_status = header.index("Document Status") i_type = header.index("Type") i_sub = header.index("Subtype") i_desc = header.index("Description") i_date = header.index("Document Date") i_study = header.index("Study") except ValueError as e: raise RuntimeError(f"V reportu chybí očekávaný sloupec: {e}") docs, bad = [], [] for row in rows: cell = row[i_num] if cell.value is None: continue raw = str(cell.value) m = HYPERLINK_RE.search(raw) if m: url_raw, vtmf = m.group(1), m.group(2) elif cell.hyperlink: # pravý hyperlink místo vzorce url_raw, vtmf = cell.hyperlink.target, raw else: bad.append(raw) continue try: url = extract_doc_url(url_raw) except ValueError: bad.append(raw) continue name = display_text(row[i_name]) vm = VERSION_RE.search(name) version = vm.group(1) if vm else "v?" desc = clean_filename(display_text(row[i_desc])) if not desc: # fallback: Document Name bez koncové verze (jde zvlášť na konec) desc = clean_filename(VERSION_RE.sub("", name)) date = row[i_date].value # datetime nebo None docs.append({ "vtmf": vtmf.strip(), "version": version, "url": url, "name": name, "status": display_text(row[i_status]), "type": clean_filename(display_text(row[i_type])), "subtype": clean_filename(display_text(row[i_sub])), "desc": desc, "date": date if hasattr(date, "strftime") else None, "studies": display_text(row[i_study]), }) log(f"[i] Načteno {len(docs)} dokumentů" + (f", {len(bad)} řádků bez použitelné URL (přeskočeno)" if bad else "")) return docs def build_target_path(doc, suggested_filename): """Cílová cesta: DOWNLOAD_ROOT\\Type\\Subtype\\ 'YYYY-MM-DD Description [VTMF-xxx] [v1.0].'. Datum/verze se vynechají, když nejsou k dispozici.""" ext = Path(suggested_filename).suffix # skutečná přípona vč. tečky date_prefix = doc["date"].strftime("%Y-%m-%d") + " " if doc["date"] else "" version = f" [{doc['version']}]" if doc.get("version") else "" filename = f"{date_prefix}{doc['desc']} [{doc['vtmf']}]{version}{ext}" return DOWNLOAD_ROOT / doc["type"] / doc["subtype"] / filename def deleted_marker_path(path): """Jméno souboru s příznakem smazání: 'x.pdf' -> 'x [D].pdf'.""" p = Path(path) return p.with_name(f"{p.stem} [D]{p.suffix}") # --- MongoDB synchronizace --------------------------------------------- def doc_key(vtmf, version): return f"{vtmf}|{version}" def get_collection(): client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) client.admin.command("ping") coll = client[MONGO_DB][MONGO_COLL] coll.create_index([("vtmf", ASCENDING), ("version", ASCENDING)], unique=True) coll.create_index([("deleted", ASCENDING), ("downloaded", ASCENDING)]) return coll def migrate_old_csv(coll): """Jednorázová migrace download_state.csv do Mongo: záznamy 'ok' se zapíší jako downloaded=True k odpovídajícímu VTMF (aktuální, nesmazané verzi). CSV se pak přejmenuje na .imported.""" if not OLD_STATE_FILE.exists(): return migrated = 0 with open(OLD_STATE_FILE, newline="", encoding="utf-8") as f: for row in csv.DictReader(f): if row["result"] != "ok": continue r = coll.update_one( {"vtmf": row["vtmf"], "deleted": False, "downloaded": {"$ne": True}}, {"$set": {"downloaded": True, "file": row["file"], "downloaded_at": row["timestamp"]}}) migrated += r.modified_count OLD_STATE_FILE.rename(OLD_STATE_FILE.with_suffix(".csv.imported")) log(f"[i] Migrace download_state.csv -> Mongo: {migrated} záznamů; " f"CSV přejmenováno na .imported") def sync_report_to_mongo(coll, docs): """Promítne aktuální report do kolekce documents. Klíč = (vtmf, version). Nové založí, změny polí promítne (s history[]), chybějící označí deleted + soubor přejmenuje s ' [D]', znovuobjevené vzkřísí a ' [D]' odebere.""" now = datetime.now() stats = {"new": 0, "updated": 0, "unchanged": 0, "resurrected": 0, "marked_deleted": 0} current_keys = set() for d in docs: key = doc_key(d["vtmf"], d["version"]) current_keys.add(key) existing = coll.find_one({"_id": key}) if existing is None: coll.insert_one({ "_id": key, **d, "first_seen": now, "last_seen": now, "deleted": False, "downloaded": False, "file": None, "history": [], }) stats["new"] += 1 continue changes = {} for fld in TRACKED_FIELDS: if existing.get(fld) != d.get(fld): changes[fld] = {"old": existing.get(fld), "new": d.get(fld)} update = {"$set": {**d, "last_seen": now, "deleted": False}} if changes: update["$push"] = {"history": {"ts": now, "changes": changes}} stats["updated"] += 1 else: stats["unchanged"] += 1 if existing.get("deleted"): # dokument se do reportu vrátil -> odebrat [D] ze souboru stats["resurrected"] += 1 stats["unchanged"] -= 0 # (počítá se výše jako updated/unchanged) old_file = existing.get("file") if old_file: marked = deleted_marker_path(old_file) if marked.exists() and not Path(old_file).exists(): marked.rename(old_file) log(f"[i] {key}: soubor vrácen z ' [D]' zpět.") update["$set"]["file"] = str(old_file) coll.update_one({"_id": key}, update) # dokumenty, které v aktuálním reportu nejsou -> deleted + ' [D]' for rec in coll.find({"deleted": False}): if rec["_id"] in current_keys: continue upd = {"deleted": True, "deleted_at": now} f = rec.get("file") if f and Path(f).exists(): marked = deleted_marker_path(f) try: Path(f).rename(marked) upd["file"] = str(marked) log(f"[i] {rec['_id']}: soubor označen ' [D]'.") except OSError as e: log(f"[!] {rec['_id']}: přejmenování na [D] selhalo: {e}") coll.update_one({"_id": rec["_id"]}, {"$set": upd, "$push": {"history": {"ts": now, "changes": {"deleted": { "old": False, "new": True}}}}}) stats["marked_deleted"] += 1 log(f"[ok] Mongo sync: {stats['new']} nových, {stats['updated']} změněných, " f"{stats['unchanged']} beze změny, {stats['resurrected']} obnovených, " f"{stats['marked_deleted']} označených deleted.") return stats # --- Přihlášení -------------------------------------------------------- def submit_login_form(page, password_box): """Odešle login formulář. Zkouší postupně tlačítka Sign On / Login / OK / submit input; když žádné nenajde, stiskne Enter v poli hesla.""" candidates = [ page.get_by_role("button", name=re.compile("sign\\s*on", re.I)), page.get_by_role("button", name=re.compile("log\\s*in|sign\\s*in", re.I)), page.locator("input[type='submit']"), page.locator("button[type='submit']"), page.get_by_role("button", name=re.compile("^ok$", re.I)), ] for loc in candidates: try: if loc.count() and loc.first.is_visible(): label = (loc.first.inner_text() or loc.first.get_attribute("value") or "submit").strip() log(f"[i] Odesílám formulář tlačítkem '{label}'...") loc.first.click() return except Exception: continue log("[i] Tlačítko nenalezeno, odesílám Enterem v poli hesla...") password_box.press("Enter") def login_if_needed(page): """Otevře login URL, vyplní jméno+heslo, detekuje 2FA a počká na ruční potvrzení. Pokud perzistentní session žije, login přeskočí.""" log(f"[i] Otevírám přihlašovací URL...") page.goto(LOGIN_URL, wait_until="domcontentloaded") if "vtmf.veevavault.com/ui" in page.url: log("[i] Už přihlášen (perzistentní session).") return user_box = page.locator("input[type='text']").first try: user_box.wait_for(timeout=8000) except PWTimeout: if "vtmf.veevavault.com/ui" in page.url: log("[i] Přihlášen bez formuláře (session redirect).") return raise RuntimeError( f"Nenašel jsem login formulář ani Vault. Aktuální URL: {page.url}") username = os.environ["VAULT_USER"] password = os.environ["VAULT_PASS"] log("[i] Vyplňuji přihlašovací údaje...") user_box.fill(username) password_box = page.locator("input[type='password']").first password_box.fill(password) submit_login_form(page, password_box) log("[i] Odeslán login, čekám na výsledek...") try: page.wait_for_url(VAULT_UI_PATTERN, timeout=15000) log("[ok] Přihlášen rovnou (bez 2FA).") return except PWTimeout: pass # nejsme ve Vaultu -> pravděpodobně 2FA výzva err = page.locator("text=/invalid|incorrect|failed/i") try: if err.count() and err.first.is_visible(): raise RuntimeError(f"Login selhal: {err.first.inner_text().strip()}") except PWTimeout: pass print("\n" + "=" * 60) print(" VYŽADOVÁNO OVĚŘENÍ NA TELEFONU (2FA).") print(" Potvrď přihlášení v mobilní aplikaci.") print("=" * 60) input(" Až to potvrdíš, stiskni ENTER pro pokračování... ") page.wait_for_url(VAULT_UI_PATTERN, timeout=120000) log("[ok] Přihlášení dokončeno.") def verify_inside(page): """Ověří, že jsme uvnitř Vaultu (URL na /ui).""" page.wait_for_url(VAULT_UI_PATTERN, timeout=30000) log(f"[ok] Uvnitř Vaultu: {page.url}") def dialog_visible(page): """True, pokud je na stránce viditelný jQuery UI dialog.""" try: dlg = page.locator(".ui-dialog") return bool(dlg.count() and dlg.first.is_visible()) except Exception: return False def save_dialog_debug(page, tag): """Uloží diagnostiku neumlčitelného dialogu: screenshot, HTML všech frames a výpis kandidátů na zavírací tlačítka. Vrátí cestu složky.""" out = DEBUG_DIR / datetime.now().strftime(f"%Y-%m-%d_%H-%M-%S_{tag}") out.mkdir(parents=True, exist_ok=True) try: page.screenshot(path=str(out / "screenshot.png"), full_page=False) except Exception as e: (out / "screenshot_error.txt").write_text(str(e), encoding="utf-8") report = [] for i, frame in enumerate(page.frames): report.append(f"=== frame[{i}] url={frame.url}") try: (out / f"frame_{i}.html").write_text(frame.content(), encoding="utf-8") for sel in (".ui-dialog", "a.ok.vv_button", ".ui-dialog-titlebar-close", "button", "input[type='button']"): n = frame.locator(sel).count() if n: report.append(f" {sel}: {n}x") except Exception as e: report.append(f" [chyba čtení framu: {e}]") (out / "frames_report.txt").write_text("\n".join(report), encoding="utf-8") log(f"[!] Diagnostika dialogu uložena do: {out}") return out # Viditelné OK tlačítko dialogu — je to , ne