# ============================================================ # export_from_seaweed_v1.2.py # Verze: 1.2 # Datum: 2026-06-15 # Popis: Sestaví na disk strom dokumentů studie ze SeaweedFS # (zdroj pravdy = Mongo VTMF.documents + objekty ve Fileru). # Pojmenování podle původních Dropbox pravidel (pipeline v1.5): # # \\\ # "YYYY-MM-DD Description [VTMF-xxx] [v1.0]." # # Základní podadresář pod OUTPUT_ROOT podle úrovně: # study -> STUDY # country -> COUNTRY # site -> <číslo centra> (dokument se zapíše do složky # KAŽDÉHO centra, do kterého je referencovaný) # # PŘÍSLUŠNOST KE STUDII = pole scopes[] (ne studies[]!). # Dokument patří studii X na úrovni L, pokud má scope # "L|X|..." — tj. byl reálně natažen reportem té studie/úrovně. # studies[] je jen M:N reference (kam všude je přilinkovaný) # a pro výběr „TMF studie X" se NEpoužívá. # # RYCHLÉ NASTAVENÍ (nahoře): STUDIES + OUTPUT_ROOT_TMPL + přepínače # EXPORT (True/False u study/country/site). # # Stahuje jen deleted=False, downloaded=True, placeholder!=True, # se seaweed_path. Idempotentní — existující soubory přeskakuje # (resume), takže lze pouštět opakovaně pro „aktuální verzi". # # v1.2: výběr podle scopes[] (ne studies[]); více studií najednou # (seznam STUDIES). (v1.0/v1.1 v TRASH/.) # v1.3: site jen dle pevného prefixu (SITE_PREFIX) + jen CZ (ONLY_COUNTRY); # STUDY_DIRS = cíl per studie; cíl "dropbox:/cesta" = Dropbox API # ČISTÝ MIRROR (nahraje chybějící, smaže přebytky; zdroj = SeaweedFS). # # Spuštění: # & "...\.venv\Scripts\python.exe" "...\export_from_seaweed_v1.2.py" # ============================================================ import os import re import sys import time import urllib.request from pathlib import Path from pymongo import MongoClient, ASCENDING # ==================================================================== # RYCHLÉ NASTAVENÍ # ==================================================================== # Které studie sestavit (podle scopes[]). Klidně víc najednou. STUDIES = [ "77242113UCO3001", "77242113UCO3002", "77242113CRD3001", "42847922MDD3003", ] # Volitelně: napevno přiřaď konkrétní studii konkrétní adresář. # Co tu NENÍ uvedeno, použije obecnou šablonu OUTPUT_ROOT_TMPL níže. # Pod přiřazeným adresářem vznikají stejné podadresáře (STUDY / COUNTRY / # číslo centra), jen kořen je tvůj vlastní. STUDY_DIRS = { # Dropbox mirror do podsložky "_vTMF" UVNITŘ ručně organizované složky # studie. Mirror maže jen pod "_vTMF"; ostatní (ruční) složky studie # (#010 Protocol, #020 ICF …) jsou vedle a zůstávají nedotčené. "42847922MDD3003": r"dropbox:/!!42847922MDD3003/_vTMF", "77242113UCO3001": r"dropbox:/!77242113UCO3001/_vTMF", "77242113CRD3001": r"dropbox:/!77242113CRD3001/_vTMF", "77242113UCO3002": r"dropbox:/77242113UCO3002/_vTMF", } # Kam pro studie, které NEJSOU v STUDY_DIRS: {study} se nahradí kódem # studie. Pod tím vzniknou podadresáře dle úrovně (STUDY / COUNTRY / # číslo centra). OUTPUT_ROOT_TMPL = r"g:\x\{study}" # Co stáhnout — přepni True/False: EXPORT = { "study": True, # -> \STUDY\\\... "country": True, # -> \COUNTRY\\\... "site": True, # -> \<číslo centra>\\\... } # Prefix čísla centra pro každou studii. Číslo centra má tvar # "#####" (např. "S10-CZ10004"). POZOR: stejné fyzické centrum # má v KAŽDÉ studii jiný kód a dokument nese v sites[] VŠECHNY (M:N), # takže bez tohoto filtru soubor spadne i do složek center cizích studií. # Bereme tedy ze sites[] jen kódy začínající prefixem DANÉ studie. # (Heuristika podle name nestačí — nechá i cizí prefixy.) SITE_PREFIX = { "42847922MDD3003": "S10-CZ", "77242113UCO3001": "DD5-CZ", "77242113CRD3001": "DD6-CZ", # 77242113UCO3002: zatím bez center } # Exportovat jen tuto zemi (country úroveň + scope u country/site). # None = všechny země. Prefixy výše jsou CZ, proto necháváme CZ. ONLY_COUNTRY = "Czech Republic" OVERWRITE = False # True = přepsat i existující soubory # Dropbox mirror: cíl ve tvaru "dropbox:/cesta" se nahraje přes Dropbox # API a udržuje se jako ČISTÝ MIRROR — co chybí nahraje, co v cíli přebývá # smaže. Zdroj pravdy je SeaweedFS, v Dropboxu se nic nezálohuje. # Credentials (APP_KEY/SECRET/REFRESH_TOKEN) se berou z tohoto .env: DROPBOX_ENV = Path(__file__).resolve().parents[1] / "EmailsImport" / ".env" DROPBOX_MIRROR_DELETE = True # False = jen nahrávat/aktualizovat, nemazat # ==================================================================== MONGO_URI = "mongodb://192.168.1.76:27017" MONGO_DB = "VTMF" MONGO_COLL = "documents" BAD_CHARS_RE = re.compile(r"[<>:\"/\\|?*\x00-\x1f�]") def log(msg): print(msg, flush=True) def clean(s): s = BAD_CHARS_RE.sub("_", str(s or "")) s = re.sub(r"\s+", " ", s) s = re.sub(r"_{2,}", "_", s) return s.strip(" ._") def filename_for(doc): """'YYYY-MM-DD Description [VTMF-xxx] [v1.0].'.""" ext = Path(doc.get("seaweed_path", "")).suffix date = doc.get("date") or "" date_prefix = (date + " ") if date else "" version = f" [{doc['version']}]" if doc.get("version") else "" desc = clean(doc.get("desc")) or clean(doc.get("vtmf")) return f"{date_prefix}{desc} [{doc['vtmf']}]{version}{ext}" def base_rels_for(doc, level, site_prefix): """Základní podsložky (relativně ke kořeni) pro daný dokument. study -> ['STUDY'], country -> ['COUNTRY'], site -> []. POZOR: sites[] je M:N — stejné fyzické centrum má v každé studii jiný kód (S10-CZ10001 / BX4-CZ10001 / CH1-CZ10001 …) a dokument nese VŠECHNY. Správný kód pro TUTO studii poznáme podle pevného prefixu (SITE_PREFIX), např. 'S10-CZ'. Bereme jen sites[] začínající tímto prefixem; ostatní (kódy center cizích studií) ignorujeme. Když nic nematchuje, dokument do žádné site složky nepatří (vrať []).""" if level == "study": return ["STUDY"] if level == "country": return ["COUNTRY"] pref = (site_prefix or "").upper() return [clean(s) for s in doc.get("sites", []) if s and str(s).upper().startswith(pref) and clean(s)] def rel_targets(doc, level, site_prefix): """Cílové cesty relativně ke kořeni, oddělovač '/' (společný pro disk i Dropbox). Konkrétní sink ('/' -> '\\' na disku) si poradí sám.""" fname = filename_for(doc) typ = clean(doc.get("type")) or "_" sub = clean(doc.get("subtype")) or "_" return [f"{base}/{typ}/{sub}/{fname}" for base in base_rels_for(doc, level, site_prefix)] # -------------------------------------------------------------------- # Sinky: kam a jak zapisovat (lokální disk vs. Dropbox API mirror) # -------------------------------------------------------------------- class LocalSink: """Zápis na disk. Idempotentní (přeskakuje existující), bez mazání.""" def __init__(self, spec): self.root = Path(spec) self.label = str(self.root) self.root.mkdir(parents=True, exist_ok=True) def exists(self, rel): return (self.root / rel).exists() def keep(self, rel): pass def write(self, rel, data): dest = self.root / rel dest.parent.mkdir(parents=True, exist_ok=True) dest.write_bytes(data) def finalize(self): return 0 class DropboxSink: """Zápis přes Dropbox API jako ČISTÝ MIRROR. Co chybí nahraje, co v cíli přebývá (a není v tomto exportu) smaže. Dokumenty jsou neměnné verze (VTMF-xxx vX.Y), takže existence cesty = shoda obsahu — existující se nenahrávají znovu, obsah se neporovnává.""" def __init__(self, spec): import dropbox from dotenv import load_dotenv load_dotenv(DROPBOX_ENV) self._dbx = dropbox self.dbx = dropbox.Dropbox( app_key=os.getenv("DROPBOX_APP_KEY"), app_secret=os.getenv("DROPBOX_APP_SECRET"), oauth2_refresh_token=os.getenv("DROPBOX_APP_REFRESH_TOKEN"), ) path = spec.split(":", 1)[1].strip().replace("\\", "/") self.base = "/" + path.strip("/") self.label = f"dropbox:{self.base}" self.remote_files = {} # path_lower -> path_display (soubory) self.remote_dirs = {} # path_lower -> path_display (složky) self.seen = set() # path_lower, které ponecháváme self._load_remote() def _full(self, rel): return f"{self.base}/{rel}" def _load_remote(self): files = self._dbx.files try: res = self.dbx.files_list_folder(self.base, recursive=True) except self._dbx.exceptions.ApiError: return # cíl ještě neexistuje — nic ke smazání while True: for e in res.entries: if isinstance(e, files.FileMetadata): self.remote_files[e.path_lower] = e.path_display elif isinstance(e, files.FolderMetadata): self.remote_dirs[e.path_lower] = e.path_display if not res.has_more: break res = self.dbx.files_list_folder_continue(res.cursor) def exists(self, rel): return self._full(rel).lower() in self.remote_files def keep(self, rel): self.seen.add(self._full(rel).lower()) def write(self, rel, data): full = self._full(rel) self._upload(full, data) self.remote_files[full.lower()] = full def _upload(self, full, data): files = self._dbx.files wm = files.WriteMode.overwrite chunk = 8 * 1024 * 1024 if len(data) <= chunk: self.dbx.files_upload(data, full, mode=wm, mute=True) return # velké soubory přes upload session start = self.dbx.files_upload_session_start(data[:chunk]) cursor = files.UploadSessionCursor(session_id=start.session_id, offset=chunk) off = chunk while len(data) - off > chunk: self.dbx.files_upload_session_append_v2(data[off:off + chunk], cursor) off += chunk cursor.offset = off self.dbx.files_upload_session_finish( data[off:], cursor, files.CommitInfo(path=full, mode=wm, mute=True)) def finalize(self): # potřebné složky = všichni předci ponechaných souborů (+ base) base_low = self.base.lower() needed = {base_low} for low in self.seen: p = low.rsplit("/", 1)[0] while len(p) >= len(base_low): needed.add(p) if p == base_low: break p = p.rsplit("/", 1)[0] # přebytky = soubory mimo seen + složky mimo needed; mažeme jen ty # nejvyšší (rodič je potřebný), smazání složky odstraní i podstrom extra = [] for low, disp in self.remote_files.items(): if low not in self.seen and low.rsplit("/", 1)[0] in needed: extra.append(disp) for low, disp in self.remote_dirs.items(): if low not in needed and low.rsplit("/", 1)[0] in needed: extra.append(disp) if not extra: return 0 if not DROPBOX_MIRROR_DELETE: log(f" [mirror] {len(extra)} přebytečných položek (mazání vypnuto)") return 0 files = self._dbx.files for i in range(0, len(extra), 1000): chunk = [files.DeleteArg(path=p) for p in extra[i:i + 1000]] res = self.dbx.files_delete_batch(chunk) if res.is_async_job_id(): job = res.get_async_job_id() while True: st = self.dbx.files_delete_batch_check(job) if st.is_complete() or st.is_failed(): break time.sleep(1) log(f" [mirror] smazáno {len(extra)} přebytečných položek") return len(extra) def make_sink(spec): if str(spec).lower().startswith("dropbox:"): return DropboxSink(spec) return LocalSink(spec) def export_study_level(coll, study, level, sink): site_prefix = SITE_PREFIX.get(study) if level == "site" and not site_prefix: log(f" [site] {study}: chybí prefix v SITE_PREFIX — přeskakuji.") return 0, 0, 0, 0 # příslušnost ke studii/úrovni přes scope "level|study|[country]" # country/site filtrujeme i na zemi (ONLY_COUNTRY); study je globální if level in ("country", "site") and ONLY_COUNTRY: scope_prefix = re.escape(f"{level}|{study}|{ONLY_COUNTRY}") else: scope_prefix = re.escape(f"{level}|{study}|") q = {"scopes": {"$regex": f"^{scope_prefix}"}, "deleted": False, "downloaded": True, "placeholder": {"$ne": True}, "seaweed_path": {"$ne": None}} docs = list(coll.find(q).sort([("vtmf", ASCENDING), ("version", ASCENDING)])) log(f" [{level}] dokumentů: {len(docs)}") if not docs: return 0, 0, 0, 0 written = skipped = failed = 0 total_bytes = 0 for n, doc in enumerate(docs, 1): rels = rel_targets(doc, level, site_prefix) if not rels: continue for rel in rels: sink.keep(rel) # i přeskočené si v mirroru ponecháme need = rels if OVERWRITE else [rel for rel in rels if not sink.exists(rel)] if not need: skipped += len(rels) continue try: with urllib.request.urlopen(doc["seaweed_url"], timeout=120) as resp: data = resp.read() for rel in need: sink.write(rel, data) written += 1 total_bytes += len(data) skipped += len(rels) - len(need) kb = len(data) / 1024 size = f"{kb:.0f} KB" if kb < 1024 else f"{kb / 1024:.1f} MB" extra = f" (+{len(need)} kopií)" if len(need) > 1 else "" log(f" [{n}/{len(docs)}] {need[0]} ({size}){extra}") except Exception as e: failed += 1 log(f" [!] {doc['_id']}: {e}") return written, skipped, failed, total_bytes def main(): coll = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)[MONGO_DB][MONGO_COLL] levels = [lvl for lvl in ("study", "country", "site") if EXPORT.get(lvl)] log(f"[i] Studie: {', '.join(STUDIES)}") log(f"[i] Úrovně: {', '.join(levels) if levels else '(žádná)'}") log(f"[i] Cíl: {OUTPUT_ROOT_TMPL} (Dropbox: cíl 'dropbox:/cesta')\n") if not levels: sys.exit(0) gw = gs = gf = gb = gd = 0 for study in STUDIES: spec = STUDY_DIRS.get(study) or OUTPUT_ROOT_TMPL.format(study=study) # má studie přes scopes vůbec něco? has = coll.count_documents({"scopes": {"$regex": f"^[a-z]+\\|{re.escape(study)}\\|"}}) if not has: log(f"=== {study} -> {spec} ===") log(f"[i] {study}: přes scopes nic — pipeline pro tuto studii " f"zatím neproběhla, přeskakuji.\n") continue sink = make_sink(spec) log(f"=== {study} -> {sink.label} (scoped dokumentů: {has}) ===") for level in levels: w, s, f, b = export_study_level(coll, study, level, sink) gw += w; gs += s; gf += f; gb += b gd += sink.finalize() log("") mb = gb / 1024 / 1024 log(f"[ok] Hotovo: {gw} souborů zapsáno ({mb:.1f} MB), " f"{gs} přeskočeno (už existuje), {gd} smazáno (mirror), {gf} chyb.") sys.exit(1 if gf else 0) if __name__ == "__main__": main()