749 lines
29 KiB
Python
749 lines
29 KiB
Python
# ============================================================
|
|
# vtmf_pipeline_v1.0.py
|
|
# Verze: 1.0
|
|
# Datum: 2026-06-12
|
|
# Popis: Kompletní workflow V-TMF (J&J Veeva Vault), studie
|
|
# 77242113UCO3001. Jeden běh udělá:
|
|
# 1) login do Vaultu (persistentní session + ruční 2FA),
|
|
# 2) export reportu "Document Inventory Report - Study
|
|
# Level" do Excelu (Data Only) do WhatToDownload/,
|
|
# 3) parse reportu a synchronizaci do MongoDB
|
|
# (Tower, db VTMF, kolekce documents,
|
|
# klíč = VTMF číslo + verze):
|
|
# - nové dokumenty se založí,
|
|
# - změny polí se promítnou (+ history[]),
|
|
# - dokumenty chybějící v reportu se označí
|
|
# deleted=True a stažený soubor dostane ' [D]',
|
|
# - znovuobjevené se vzkřísí a ' [D]' se odebere,
|
|
# 4) stažení všech dosud nestažených dokumentů do
|
|
# U:\Dropbox\!!!Days\Downloads Z230\VTMF-77242113UCO3001\
|
|
# <Type>\<Subtype>\"YYYY-MM-DD Description
|
|
# [VTMF-x] [v1.0].<přípona>" + zápis stavu do Mongo.
|
|
#
|
|
# Tracking stahování je KOMPLETNĚ v Mongo; starý
|
|
# download_state.csv se při prvním běhu jednorázově
|
|
# namigruje a přejmenuje na .imported.
|
|
#
|
|
# Vychází z download_vault_v2.1 (v TRASH/) — login, dialogy
|
|
# a stahování beze změny; nové jsou kroky 2 a 3.
|
|
#
|
|
# Heslo se NIKDY nedává natvrdo do skriptu — čte se z .env
|
|
# v rootu projektu Janssen (VAULT_USER / VAULT_PASS).
|
|
# ============================================================
|
|
|
|
import csv
|
|
import os
|
|
import re
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
|
|
from pymongo import MongoClient, ASCENDING
|
|
|
|
# --- Konfigurace -------------------------------------------------------
|
|
|
|
LOGIN_URL = ("https://fedlogin.jnj.com/idp/eyJ2c2lkIjoiam5qX3ZlZXZhIn0/"
|
|
"startSSO.ping?PartnerSpId=janssenetmf.veevavault.com"
|
|
"&IdpAdapterId=CompIWALDAPEXTFORM"
|
|
"&TargetResource=https%3A%2F%2Fvtmf.veevavault.com%2F")
|
|
|
|
# Report Document Inventory Report - Study Level, filtr na studii
|
|
REPORT_URL = ("https://vtmf.veevavault.com/ui/#reporting/viewer/"
|
|
"0RP000000000182?study__v%2C%2C%2CIN=0ST000000137008")
|
|
|
|
VAULT_UI_PATTERN = "**vtmf.veevavault.com/ui**" # úspěšný vstup do Vaultu
|
|
|
|
SCRIPT_DIR = Path(__file__).resolve().parent
|
|
PROFILE_DIR = SCRIPT_DIR / "vault_profile" # perzistentní session
|
|
ENV_FILE = SCRIPT_DIR.parent / ".env" # root projektu Janssen
|
|
DEBUG_DIR = SCRIPT_DIR / "debug" # diagnostické výstupy
|
|
EXCEL_DIR = SCRIPT_DIR / "WhatToDownload" # stažené reporty
|
|
PROCESSED_DIR = EXCEL_DIR / "Zpracovano" # archiv zpracovaných
|
|
OLD_STATE_FILE = SCRIPT_DIR / "download_state.csv" # legacy CSV (migrace)
|
|
DOWNLOAD_ROOT = Path(r"U:\Dropbox\!!!Days\Downloads Z230\VTMF-77242113UCO3001")
|
|
|
|
MONGO_URI = "mongodb://192.168.1.76:27017"
|
|
MONGO_DB = "VTMF"
|
|
MONGO_COLL = "documents"
|
|
|
|
# Kolik dokumentů stáhnout v tomto běhu (None = všechny zbývající)
|
|
LIMIT = 10
|
|
# Pole reportu, jejichž změny se promítají a verzují do history[]
|
|
TRACKED_FIELDS = ("name", "status", "type", "subtype", "desc",
|
|
"date", "url", "studies")
|
|
|
|
MAX_ATTEMPTS = 2 # pokusy na jeden dokument
|
|
RETRY_PAUSE_MS = 5000 # pauza před opakováním
|
|
BETWEEN_DOCS_MS = 500 # pauza mezi dokumenty
|
|
|
|
|
|
def log(msg):
|
|
print(msg, flush=True)
|
|
|
|
|
|
def load_env_file(path):
|
|
"""Načte KEY=VALUE řádky z .env do os.environ.
|
|
Už nastavené env proměnné mají přednost, .env je nepřepisuje."""
|
|
if not path.exists():
|
|
log(f"[!] .env nenalezen: {path}")
|
|
return
|
|
for line in path.read_text(encoding="utf-8").splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, _, value = line.partition("=")
|
|
key, value = key.strip(), value.strip().strip('"').strip("'")
|
|
if value and key not in os.environ:
|
|
os.environ[key] = value
|
|
|
|
|
|
ENV_SECTION_HEADER = "# --- Veeva Vault (J&J V-TMF) — VTMFDownloadFiles/download_vault ---"
|
|
ENV_KEYS = ("VAULT_USER", "VAULT_PASS")
|
|
|
|
|
|
def ensure_credentials():
|
|
"""Načte .env; pokud VAULT_USER/VAULT_PASS chybí, založí/doplní
|
|
v .env šablonu, vyzve uživatele k doplnění a ukončí skript."""
|
|
load_env_file(ENV_FILE)
|
|
if all(os.environ.get(k) for k in ENV_KEYS):
|
|
return
|
|
|
|
existing = ENV_FILE.read_text(encoding="utf-8") if ENV_FILE.exists() else ""
|
|
missing_lines = [f"{k}=" for k in ENV_KEYS
|
|
if not re.search(rf"^\s*{k}\s*=", existing, re.M)]
|
|
|
|
if not ENV_FILE.exists():
|
|
ENV_FILE.write_text(
|
|
"# .env — lokální přihlašovací údaje (NEVERZOVAT, je v .gitignore)\n\n"
|
|
+ ENV_SECTION_HEADER + "\n"
|
|
+ "\n".join(missing_lines) + "\n",
|
|
encoding="utf-8")
|
|
log(f"[i] Založil jsem nový .env: {ENV_FILE}")
|
|
elif missing_lines:
|
|
with open(ENV_FILE, "a", encoding="utf-8") as f:
|
|
f.write("\n" + ENV_SECTION_HEADER + "\n"
|
|
+ "\n".join(missing_lines) + "\n")
|
|
log(f"[i] Doplnil jsem chybějící řádky do .env: {ENV_FILE}")
|
|
|
|
print("\n" + "=" * 60)
|
|
print(" CHYBÍ PŘIHLAŠOVACÍ ÚDAJE.")
|
|
print(f" Doplň VAULT_USER a VAULT_PASS do souboru:")
|
|
print(f" {ENV_FILE}")
|
|
print(" a spusť skript znovu.")
|
|
print("=" * 60)
|
|
sys.exit(1)
|
|
|
|
|
|
# --- Parsování Excelu --------------------------------------------------
|
|
|
|
HYPERLINK_RE = re.compile(r'HYPERLINK\("([^"]+)"\s*,\s*"([^"]+)"\)')
|
|
VERSION_RE = re.compile(r"\((v[^)]+)\)\s*$")
|
|
# nepovolené znaky Windows názvů + řídicí znaky + unicode artefakt �
|
|
BAD_CHARS_RE = re.compile(r"[<>:\"/\\|?*\x00-\x1f�]")
|
|
|
|
|
|
def clean_filename(s):
|
|
"""Očistí string na platné jméno souboru/složky ve Windows."""
|
|
s = BAD_CHARS_RE.sub("_", str(s))
|
|
s = re.sub(r"\s+", " ", s) # vícenásobné mezery -> jedna
|
|
s = re.sub(r"_{2,}", "_", s) # vícenásobná podtržítka -> jedno
|
|
return s.strip(" ._") # okraje: mezery, tečky, podtržítka
|
|
|
|
|
|
def display_text(cell):
|
|
"""Zobrazený text buňky — u =HYPERLINK vzorce druhý argument."""
|
|
raw = str(cell.value or "").strip()
|
|
m = HYPERLINK_RE.search(raw)
|
|
return m.group(2).strip() if m else raw
|
|
|
|
|
|
def extract_doc_url(raw):
|
|
"""Z HYPERLINK hodnoty (nebo i rozbité URL) vytáhne čistou doc URL
|
|
ve tvaru https://<host>/ui/#doc_info/<id>/<major>/<minor>."""
|
|
m = re.search(r"(https://[^/\"]+/ui/#doc_info/\d+/\d+/\d+)", str(raw))
|
|
if not m:
|
|
raise ValueError(f"Nenašel jsem doc URL v: {raw!r}")
|
|
return m.group(1)
|
|
|
|
|
|
def read_documents_from_excel(path):
|
|
"""Načte dokumenty z daného .xlsx reportu. Vrací list dictů:
|
|
vtmf, version, url, name, status, type, subtype, desc, date, studies.
|
|
Document Name/Number/Status jsou =HYPERLINK vzorce — URL i text se
|
|
berou regexem. Report má rozbité deklarované rozměry, čte se
|
|
přímou iterací řádků."""
|
|
from openpyxl import load_workbook
|
|
|
|
log(f"[i] Parsování reportu: {path.name}")
|
|
wb = load_workbook(path, data_only=False) # potřebujeme vzorce
|
|
ws = wb[wb.sheetnames[0]]
|
|
|
|
rows = ws.iter_rows()
|
|
header = [c.value for c in next(rows)]
|
|
try:
|
|
i_num = header.index("Document Number")
|
|
i_name = header.index("Document Name")
|
|
i_status = header.index("Document Status")
|
|
i_type = header.index("Type")
|
|
i_sub = header.index("Subtype")
|
|
i_desc = header.index("Description")
|
|
i_date = header.index("Document Date")
|
|
i_study = header.index("Study")
|
|
except ValueError as e:
|
|
raise RuntimeError(f"V reportu chybí očekávaný sloupec: {e}")
|
|
|
|
docs, bad = [], []
|
|
for row in rows:
|
|
cell = row[i_num]
|
|
if cell.value is None:
|
|
continue
|
|
raw = str(cell.value)
|
|
m = HYPERLINK_RE.search(raw)
|
|
if m:
|
|
url_raw, vtmf = m.group(1), m.group(2)
|
|
elif cell.hyperlink: # pravý hyperlink místo vzorce
|
|
url_raw, vtmf = cell.hyperlink.target, raw
|
|
else:
|
|
bad.append(raw)
|
|
continue
|
|
try:
|
|
url = extract_doc_url(url_raw)
|
|
except ValueError:
|
|
bad.append(raw)
|
|
continue
|
|
|
|
name = display_text(row[i_name])
|
|
vm = VERSION_RE.search(name)
|
|
version = vm.group(1) if vm else "v?"
|
|
|
|
desc = clean_filename(display_text(row[i_desc]))
|
|
if not desc:
|
|
# fallback: Document Name bez koncové verze (jde zvlášť na konec)
|
|
desc = clean_filename(VERSION_RE.sub("", name))
|
|
|
|
date = row[i_date].value # datetime nebo None
|
|
docs.append({
|
|
"vtmf": vtmf.strip(),
|
|
"version": version,
|
|
"url": url,
|
|
"name": name,
|
|
"status": display_text(row[i_status]),
|
|
"type": clean_filename(display_text(row[i_type])),
|
|
"subtype": clean_filename(display_text(row[i_sub])),
|
|
"desc": desc,
|
|
"date": date if hasattr(date, "strftime") else None,
|
|
"studies": display_text(row[i_study]),
|
|
})
|
|
|
|
log(f"[i] Načteno {len(docs)} dokumentů"
|
|
+ (f", {len(bad)} řádků bez použitelné URL (přeskočeno)" if bad else ""))
|
|
return docs
|
|
|
|
|
|
def build_target_path(doc, suggested_filename):
|
|
"""Cílová cesta: DOWNLOAD_ROOT\\Type\\Subtype\\
|
|
'YYYY-MM-DD Description [VTMF-xxx] [v1.0].<skutečná přípona>'.
|
|
Datum/verze se vynechají, když nejsou k dispozici."""
|
|
ext = Path(suggested_filename).suffix # skutečná přípona vč. tečky
|
|
date_prefix = doc["date"].strftime("%Y-%m-%d") + " " if doc["date"] else ""
|
|
version = f" [{doc['version']}]" if doc.get("version") else ""
|
|
filename = f"{date_prefix}{doc['desc']} [{doc['vtmf']}]{version}{ext}"
|
|
return DOWNLOAD_ROOT / doc["type"] / doc["subtype"] / filename
|
|
|
|
|
|
def deleted_marker_path(path):
|
|
"""Jméno souboru s příznakem smazání: 'x.pdf' -> 'x [D].pdf'."""
|
|
p = Path(path)
|
|
return p.with_name(f"{p.stem} [D]{p.suffix}")
|
|
|
|
|
|
# --- MongoDB synchronizace ---------------------------------------------
|
|
|
|
def doc_key(vtmf, version):
|
|
return f"{vtmf}|{version}"
|
|
|
|
|
|
def get_collection():
|
|
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
|
|
client.admin.command("ping")
|
|
coll = client[MONGO_DB][MONGO_COLL]
|
|
coll.create_index([("vtmf", ASCENDING), ("version", ASCENDING)],
|
|
unique=True)
|
|
coll.create_index([("deleted", ASCENDING), ("downloaded", ASCENDING)])
|
|
return coll
|
|
|
|
|
|
def migrate_old_csv(coll):
|
|
"""Jednorázová migrace download_state.csv do Mongo: záznamy 'ok'
|
|
se zapíší jako downloaded=True k odpovídajícímu VTMF (aktuální,
|
|
nesmazané verzi). CSV se pak přejmenuje na .imported."""
|
|
if not OLD_STATE_FILE.exists():
|
|
return
|
|
migrated = 0
|
|
with open(OLD_STATE_FILE, newline="", encoding="utf-8") as f:
|
|
for row in csv.DictReader(f):
|
|
if row["result"] != "ok":
|
|
continue
|
|
r = coll.update_one(
|
|
{"vtmf": row["vtmf"], "deleted": False,
|
|
"downloaded": {"$ne": True}},
|
|
{"$set": {"downloaded": True, "file": row["file"],
|
|
"downloaded_at": row["timestamp"]}})
|
|
migrated += r.modified_count
|
|
OLD_STATE_FILE.rename(OLD_STATE_FILE.with_suffix(".csv.imported"))
|
|
log(f"[i] Migrace download_state.csv -> Mongo: {migrated} záznamů; "
|
|
f"CSV přejmenováno na .imported")
|
|
|
|
|
|
def sync_report_to_mongo(coll, docs):
|
|
"""Promítne aktuální report do kolekce documents.
|
|
Klíč = (vtmf, version). Nové založí, změny polí promítne
|
|
(s history[]), chybějící označí deleted + soubor přejmenuje
|
|
s ' [D]', znovuobjevené vzkřísí a ' [D]' odebere."""
|
|
now = datetime.now()
|
|
stats = {"new": 0, "updated": 0, "unchanged": 0,
|
|
"resurrected": 0, "marked_deleted": 0}
|
|
current_keys = set()
|
|
|
|
for d in docs:
|
|
key = doc_key(d["vtmf"], d["version"])
|
|
current_keys.add(key)
|
|
existing = coll.find_one({"_id": key})
|
|
if existing is None:
|
|
coll.insert_one({
|
|
"_id": key, **d,
|
|
"first_seen": now, "last_seen": now,
|
|
"deleted": False, "downloaded": False,
|
|
"file": None, "history": [],
|
|
})
|
|
stats["new"] += 1
|
|
continue
|
|
|
|
changes = {}
|
|
for fld in TRACKED_FIELDS:
|
|
if existing.get(fld) != d.get(fld):
|
|
changes[fld] = {"old": existing.get(fld),
|
|
"new": d.get(fld)}
|
|
update = {"$set": {**d, "last_seen": now, "deleted": False}}
|
|
if changes:
|
|
update["$push"] = {"history": {"ts": now, "changes": changes}}
|
|
stats["updated"] += 1
|
|
else:
|
|
stats["unchanged"] += 1
|
|
|
|
if existing.get("deleted"):
|
|
# dokument se do reportu vrátil -> odebrat [D] ze souboru
|
|
stats["resurrected"] += 1
|
|
stats["unchanged"] -= 0 # (počítá se výše jako updated/unchanged)
|
|
old_file = existing.get("file")
|
|
if old_file:
|
|
marked = deleted_marker_path(old_file)
|
|
if marked.exists() and not Path(old_file).exists():
|
|
marked.rename(old_file)
|
|
log(f"[i] {key}: soubor vrácen z ' [D]' zpět.")
|
|
update["$set"]["file"] = str(old_file)
|
|
coll.update_one({"_id": key}, update)
|
|
|
|
# dokumenty, které v aktuálním reportu nejsou -> deleted + ' [D]'
|
|
for rec in coll.find({"deleted": False}):
|
|
if rec["_id"] in current_keys:
|
|
continue
|
|
upd = {"deleted": True, "deleted_at": now}
|
|
f = rec.get("file")
|
|
if f and Path(f).exists():
|
|
marked = deleted_marker_path(f)
|
|
try:
|
|
Path(f).rename(marked)
|
|
upd["file"] = str(marked)
|
|
log(f"[i] {rec['_id']}: soubor označen ' [D]'.")
|
|
except OSError as e:
|
|
log(f"[!] {rec['_id']}: přejmenování na [D] selhalo: {e}")
|
|
coll.update_one({"_id": rec["_id"]},
|
|
{"$set": upd,
|
|
"$push": {"history": {"ts": now,
|
|
"changes": {"deleted": {
|
|
"old": False,
|
|
"new": True}}}}})
|
|
stats["marked_deleted"] += 1
|
|
|
|
log(f"[ok] Mongo sync: {stats['new']} nových, {stats['updated']} změněných, "
|
|
f"{stats['unchanged']} beze změny, {stats['resurrected']} obnovených, "
|
|
f"{stats['marked_deleted']} označených deleted.")
|
|
return stats
|
|
|
|
|
|
# --- Přihlášení --------------------------------------------------------
|
|
|
|
def submit_login_form(page, password_box):
|
|
"""Odešle login formulář. Zkouší postupně tlačítka Sign On / Login /
|
|
OK / submit input; když žádné nenajde, stiskne Enter v poli hesla."""
|
|
candidates = [
|
|
page.get_by_role("button", name=re.compile("sign\\s*on", re.I)),
|
|
page.get_by_role("button", name=re.compile("log\\s*in|sign\\s*in", re.I)),
|
|
page.locator("input[type='submit']"),
|
|
page.locator("button[type='submit']"),
|
|
page.get_by_role("button", name=re.compile("^ok$", re.I)),
|
|
]
|
|
for loc in candidates:
|
|
try:
|
|
if loc.count() and loc.first.is_visible():
|
|
label = (loc.first.inner_text() or
|
|
loc.first.get_attribute("value") or "submit").strip()
|
|
log(f"[i] Odesílám formulář tlačítkem '{label}'...")
|
|
loc.first.click()
|
|
return
|
|
except Exception:
|
|
continue
|
|
log("[i] Tlačítko nenalezeno, odesílám Enterem v poli hesla...")
|
|
password_box.press("Enter")
|
|
|
|
|
|
def login_if_needed(page):
|
|
"""Otevře login URL, vyplní jméno+heslo, detekuje 2FA a počká na
|
|
ruční potvrzení. Pokud perzistentní session žije, login přeskočí."""
|
|
log(f"[i] Otevírám přihlašovací URL...")
|
|
page.goto(LOGIN_URL, wait_until="domcontentloaded")
|
|
|
|
if "vtmf.veevavault.com/ui" in page.url:
|
|
log("[i] Už přihlášen (perzistentní session).")
|
|
return
|
|
|
|
user_box = page.locator("input[type='text']").first
|
|
try:
|
|
user_box.wait_for(timeout=8000)
|
|
except PWTimeout:
|
|
if "vtmf.veevavault.com/ui" in page.url:
|
|
log("[i] Přihlášen bez formuláře (session redirect).")
|
|
return
|
|
raise RuntimeError(
|
|
f"Nenašel jsem login formulář ani Vault. Aktuální URL: {page.url}")
|
|
|
|
username = os.environ["VAULT_USER"]
|
|
password = os.environ["VAULT_PASS"]
|
|
|
|
log("[i] Vyplňuji přihlašovací údaje...")
|
|
user_box.fill(username)
|
|
password_box = page.locator("input[type='password']").first
|
|
password_box.fill(password)
|
|
submit_login_form(page, password_box)
|
|
|
|
log("[i] Odeslán login, čekám na výsledek...")
|
|
try:
|
|
page.wait_for_url(VAULT_UI_PATTERN, timeout=15000)
|
|
log("[ok] Přihlášen rovnou (bez 2FA).")
|
|
return
|
|
except PWTimeout:
|
|
pass # nejsme ve Vaultu -> pravděpodobně 2FA výzva
|
|
|
|
err = page.locator("text=/invalid|incorrect|failed/i")
|
|
try:
|
|
if err.count() and err.first.is_visible():
|
|
raise RuntimeError(f"Login selhal: {err.first.inner_text().strip()}")
|
|
except PWTimeout:
|
|
pass
|
|
|
|
print("\n" + "=" * 60)
|
|
print(" VYŽADOVÁNO OVĚŘENÍ NA TELEFONU (2FA).")
|
|
print(" Potvrď přihlášení v mobilní aplikaci.")
|
|
print("=" * 60)
|
|
input(" Až to potvrdíš, stiskni ENTER pro pokračování... ")
|
|
|
|
page.wait_for_url(VAULT_UI_PATTERN, timeout=120000)
|
|
log("[ok] Přihlášení dokončeno.")
|
|
|
|
|
|
def verify_inside(page):
|
|
"""Ověří, že jsme uvnitř Vaultu (URL na /ui)."""
|
|
page.wait_for_url(VAULT_UI_PATTERN, timeout=30000)
|
|
log(f"[ok] Uvnitř Vaultu: {page.url}")
|
|
|
|
|
|
def dialog_visible(page):
|
|
"""True, pokud je na stránce viditelný jQuery UI dialog."""
|
|
try:
|
|
dlg = page.locator(".ui-dialog")
|
|
return bool(dlg.count() and dlg.first.is_visible())
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def save_dialog_debug(page, tag):
|
|
"""Uloží diagnostiku neumlčitelného dialogu: screenshot, HTML všech
|
|
frames a výpis kandidátů na zavírací tlačítka. Vrátí cestu složky."""
|
|
out = DEBUG_DIR / datetime.now().strftime(f"%Y-%m-%d_%H-%M-%S_{tag}")
|
|
out.mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
page.screenshot(path=str(out / "screenshot.png"), full_page=False)
|
|
except Exception as e:
|
|
(out / "screenshot_error.txt").write_text(str(e), encoding="utf-8")
|
|
report = []
|
|
for i, frame in enumerate(page.frames):
|
|
report.append(f"=== frame[{i}] url={frame.url}")
|
|
try:
|
|
(out / f"frame_{i}.html").write_text(frame.content(),
|
|
encoding="utf-8")
|
|
for sel in (".ui-dialog", "a.ok.vv_button",
|
|
".ui-dialog-titlebar-close",
|
|
"button", "input[type='button']"):
|
|
n = frame.locator(sel).count()
|
|
if n:
|
|
report.append(f" {sel}: {n}x")
|
|
except Exception as e:
|
|
report.append(f" [chyba čtení framu: {e}]")
|
|
(out / "frames_report.txt").write_text("\n".join(report),
|
|
encoding="utf-8")
|
|
log(f"[!] Diagnostika dialogu uložena do: {out}")
|
|
return out
|
|
|
|
|
|
# Viditelné OK tlačítko dialogu — je to <a>, ne <button>!
|
|
# Křížek .ui-dialog-titlebar-close je display:none → NEPOUŽÍVAT.
|
|
DIALOG_OK_SELECTOR = (".ui-dialog a.ok.vv_button, "
|
|
".vv_login_msg_dialog .vv_button.ok")
|
|
|
|
|
|
def dismiss_maintenance_popup(page, timeout=8000):
|
|
"""Zavře Veeva login/maintenance dialog kliknutím na viditelné OK
|
|
(<a class='ok vv_button'>). Dialog se objevuje SE ZPOŽDĚNÍM,
|
|
proto se na něj krátce čeká. Bezpečné volat vždy."""
|
|
ok = page.locator(DIALOG_OK_SELECTOR)
|
|
try:
|
|
ok.first.wait_for(state="visible", timeout=timeout)
|
|
except PWTimeout:
|
|
return False # okno se neobjevilo — pokračujeme
|
|
except Exception:
|
|
return False
|
|
|
|
closed = 0
|
|
for _ in range(5): # dialogy umí být ve frontě
|
|
try:
|
|
if ok.count() and ok.first.is_visible():
|
|
ok.first.click()
|
|
page.wait_for_timeout(300)
|
|
closed += 1
|
|
log("[i] Maintenance/login dialog zavřen (OK).")
|
|
continue
|
|
except Exception:
|
|
pass
|
|
break
|
|
|
|
if not dialog_visible(page):
|
|
return bool(closed)
|
|
|
|
page.keyboard.press("Escape")
|
|
page.wait_for_timeout(500)
|
|
log("[i] Zkusil jsem dialog zavřít klávesou Escape.")
|
|
|
|
if dialog_visible(page):
|
|
save_dialog_debug(page, "dialog")
|
|
print("\n" + "=" * 60)
|
|
print(" DIALOG SE NEPODAŘILO ZAVŘÍT AUTOMATICKY.")
|
|
print(" Zavři ho prosím ručně v prohlížeči.")
|
|
print("=" * 60)
|
|
input(" Po ručním zavření stiskni ENTER... ")
|
|
return bool(closed)
|
|
|
|
|
|
# --- Export reportu ----------------------------------------------------
|
|
|
|
def download_report(page):
|
|
"""Stáhne report (Export to Excel, Data Only) do WhatToDownload/
|
|
pod timestampovaným názvem. Vrátí cestu k souboru."""
|
|
log("[i] Otevírám report Document Inventory Report - Study Level...")
|
|
page.goto(REPORT_URL, wait_until="domcontentloaded")
|
|
dismiss_maintenance_popup(page, timeout=4000)
|
|
|
|
# report je hotový, když se objeví počet záznamů / statusy
|
|
try:
|
|
page.wait_for_selector("text=Returned", timeout=30000)
|
|
except PWTimeout:
|
|
page.wait_for_selector("text=Document Status:", timeout=30000)
|
|
log("[i] Report načten, otevírám menu akcí (⋯)...")
|
|
|
|
# menu tří teček vpravo nahoře u nadpisu reportu
|
|
actions = page.locator("button[title='Actions'], .report-actions button")
|
|
if not actions.count():
|
|
# fallback: poslední tlačítko v hlavičce reportu
|
|
actions = page.locator(".vv-report-header button")
|
|
actions.last.click()
|
|
|
|
page.get_by_text("Export to Excel", exact=True).click()
|
|
log("[i] Dialog Excel Export Options...")
|
|
|
|
# 'Data Only' bývá default, ale pro jistotu explicitně
|
|
try:
|
|
page.get_by_text("Data Only").first.click()
|
|
except Exception:
|
|
pass
|
|
|
|
# Export kliknout PRÁVĚ jednou (vícenásobné kliky = duplikáty)
|
|
with page.expect_download(timeout=120000) as dl_info:
|
|
page.get_by_role("button", name="Export").click()
|
|
download = dl_info.value
|
|
|
|
EXCEL_DIR.mkdir(parents=True, exist_ok=True)
|
|
ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
|
dest = EXCEL_DIR / f"{ts} {download.suggested_filename}"
|
|
download.save_as(str(dest))
|
|
log(f"[ok] Report uložen: {dest}")
|
|
return dest
|
|
|
|
|
|
def archive_report(path):
|
|
"""Po úspěšném zpracování přesune report do Zpracovano/."""
|
|
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
|
|
target = PROCESSED_DIR / path.name
|
|
path.rename(target)
|
|
log(f"[i] Report archivován: {target}")
|
|
|
|
|
|
# --- Stažení dokumentů -------------------------------------------------
|
|
|
|
def find_source_file_button(page):
|
|
"""Najde ikonu Source File (list papíru se šipkou dolů, vpravo nahoře).
|
|
Více fallback selektorů — DOM se může lišit podle typu dokumentu."""
|
|
candidates = [
|
|
"[title='Source File']",
|
|
"[aria-label='Source File']",
|
|
]
|
|
for sel in candidates:
|
|
loc = page.locator(sel)
|
|
if loc.count():
|
|
return loc.first
|
|
loc = page.get_by_role("button", name=re.compile("Source File", re.I))
|
|
if loc.count():
|
|
return loc.first
|
|
return None
|
|
|
|
|
|
def download_source_file(page, doc):
|
|
vtmf = doc["vtmf"]
|
|
log(f"[i] Otevírám dokument {vtmf} ({doc.get('version', '')}) ...")
|
|
page.goto(doc["url"], wait_until="domcontentloaded")
|
|
try:
|
|
page.wait_for_load_state("networkidle", timeout=30000)
|
|
except PWTimeout:
|
|
log("[!] networkidle nenastal do 30 s, zkouším pokračovat...")
|
|
dismiss_maintenance_popup(page, timeout=2000)
|
|
|
|
target = find_source_file_button(page)
|
|
if target is None:
|
|
raise RuntimeError(
|
|
f"Nenašel jsem ikonu 'Source File' na stránce dokumentu {vtmf}.")
|
|
|
|
log("[i] Klikám na Source File a čekám na download...")
|
|
with page.expect_download(timeout=60000) as dl_info:
|
|
target.click()
|
|
# Varianta s dropdownem (Source File + Viewable Rendition)
|
|
try:
|
|
item = page.get_by_role("menuitem",
|
|
name=re.compile("Source File", re.I))
|
|
if item.count() and item.first.is_visible():
|
|
log("[i] Otevřel se dropdown, vybírám 'Source File'...")
|
|
item.first.click()
|
|
except Exception:
|
|
pass
|
|
download = dl_info.value
|
|
|
|
dest = build_target_path(doc, download.suggested_filename)
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
download.save_as(str(dest))
|
|
log(f"[ok] Uloženo: {dest}")
|
|
return dest
|
|
|
|
|
|
def download_missing(page, coll):
|
|
"""Stáhne všechny nesmazané dokumenty bez downloaded=True.
|
|
Výsledek každého se ihned zapíše do Mongo."""
|
|
todo = list(coll.find({"deleted": False, "downloaded": {"$ne": True}})
|
|
.sort([("vtmf", ASCENDING), ("version", ASCENDING)]))
|
|
if LIMIT:
|
|
todo = todo[:LIMIT]
|
|
log(f"\n[i] Ke stažení: {len(todo)} dokumentů"
|
|
+ (f" (LIMIT={LIMIT})" if LIMIT else ""))
|
|
|
|
ok_count, fail_count = 0, 0
|
|
for n, doc in enumerate(todo, 1):
|
|
key = doc["_id"]
|
|
log(f"\n--- [{n}/{len(todo)}] {key} | {doc['desc'][:70]}")
|
|
last_err = None
|
|
for attempt in range(1, MAX_ATTEMPTS + 1):
|
|
try:
|
|
dest = download_source_file(page, doc)
|
|
coll.update_one({"_id": key}, {"$set": {
|
|
"downloaded": True, "file": str(dest),
|
|
"downloaded_at": datetime.now(),
|
|
"last_error": None}})
|
|
ok_count += 1
|
|
last_err = None
|
|
break
|
|
except Exception as e:
|
|
last_err = e
|
|
log(f"[!] Pokus {attempt}/{MAX_ATTEMPTS} selhal: {e}")
|
|
if attempt < MAX_ATTEMPTS:
|
|
page.wait_for_timeout(RETRY_PAUSE_MS)
|
|
if last_err is not None:
|
|
coll.update_one({"_id": key}, {"$set": {
|
|
"last_error": str(last_err),
|
|
"error_at": datetime.now()}})
|
|
fail_count += 1
|
|
page.wait_for_timeout(BETWEEN_DOCS_MS)
|
|
return ok_count, fail_count
|
|
|
|
|
|
# --- Main --------------------------------------------------------------
|
|
|
|
def main():
|
|
ensure_credentials()
|
|
coll = get_collection()
|
|
log(f"[ok] Mongo připojeno: {MONGO_URI} / {MONGO_DB}.{MONGO_COLL}")
|
|
|
|
with sync_playwright() as p:
|
|
ctx = p.chromium.launch_persistent_context(
|
|
user_data_dir=str(PROFILE_DIR),
|
|
headless=False,
|
|
accept_downloads=True,
|
|
no_viewport=True, # okno se chová nativně
|
|
args=["--start-maximized"],
|
|
)
|
|
page = ctx.pages[0] if ctx.pages else ctx.new_page()
|
|
ok_count = fail_count = 0
|
|
try:
|
|
# 1) login
|
|
login_if_needed(page)
|
|
verify_inside(page)
|
|
dismiss_maintenance_popup(page)
|
|
|
|
# 2) export reportu
|
|
report_path = download_report(page)
|
|
|
|
# 3) parse + sync do Mongo
|
|
docs = read_documents_from_excel(report_path)
|
|
if not docs:
|
|
raise RuntimeError("Report neobsahuje žádné dokumenty — "
|
|
"sync přeskočen, nic se nemaže.")
|
|
sync_report_to_mongo(coll, docs)
|
|
migrate_old_csv(coll)
|
|
archive_report(report_path)
|
|
|
|
# 4) stažení chybějících
|
|
DOWNLOAD_ROOT.mkdir(parents=True, exist_ok=True)
|
|
ok_count, fail_count = download_missing(page, coll)
|
|
except KeyboardInterrupt:
|
|
log("\n[!] Přerušeno uživatelem — stav je v Mongo, příští běh naváže.")
|
|
finally:
|
|
total = coll.count_documents({})
|
|
have = coll.count_documents({"deleted": False, "downloaded": True})
|
|
active = coll.count_documents({"deleted": False})
|
|
log(f"\n[i] Výsledek běhu: {ok_count} staženo, {fail_count} chyb.")
|
|
log(f"[i] Mongo: {total} záznamů celkem, {active} aktivních, "
|
|
f"z toho staženo {have} ({active - have} zbývá).")
|
|
input("ENTER pro zavření prohlížeče...")
|
|
ctx.close()
|
|
sys.exit(1 if fail_count else 0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|