399 lines
16 KiB
Python
399 lines
16 KiB
Python
# ============================================================
|
|
# export_from_seaweed_v1.2.py
|
|
# Verze: 1.2
|
|
# Datum: 2026-06-15
|
|
# Popis: Sestaví na disk strom dokumentů studie ze SeaweedFS
|
|
# (zdroj pravdy = Mongo VTMF.documents + objekty ve Fileru).
|
|
# Pojmenování podle původních Dropbox pravidel (pipeline v1.5):
|
|
#
|
|
# <základní podadresář>\<Type>\<Subtype>\
|
|
# "YYYY-MM-DD Description [VTMF-xxx] [v1.0].<přípona>"
|
|
#
|
|
# Základní podadresář pod OUTPUT_ROOT podle úrovně:
|
|
# study -> STUDY
|
|
# country -> COUNTRY
|
|
# site -> <číslo centra> (dokument se zapíše do složky
|
|
# KAŽDÉHO centra, do kterého je referencovaný)
|
|
#
|
|
# PŘÍSLUŠNOST KE STUDII = pole scopes[] (ne studies[]!).
|
|
# Dokument patří studii X na úrovni L, pokud má scope
|
|
# "L|X|..." — tj. byl reálně natažen reportem té studie/úrovně.
|
|
# studies[] je jen M:N reference (kam všude je přilinkovaný)
|
|
# a pro výběr „TMF studie X" se NEpoužívá.
|
|
#
|
|
# RYCHLÉ NASTAVENÍ (nahoře): STUDIES + OUTPUT_ROOT_TMPL + přepínače
|
|
# EXPORT (True/False u study/country/site).
|
|
#
|
|
# Stahuje jen deleted=False, downloaded=True, placeholder!=True,
|
|
# se seaweed_path. Idempotentní — existující soubory přeskakuje
|
|
# (resume), takže lze pouštět opakovaně pro „aktuální verzi".
|
|
#
|
|
# v1.2: výběr podle scopes[] (ne studies[]); více studií najednou
|
|
# (seznam STUDIES). (v1.0/v1.1 v TRASH/.)
|
|
# v1.3: site jen dle pevného prefixu (SITE_PREFIX) + jen CZ (ONLY_COUNTRY);
|
|
# STUDY_DIRS = cíl per studie; cíl "dropbox:/cesta" = Dropbox API
|
|
# ČISTÝ MIRROR (nahraje chybějící, smaže přebytky; zdroj = SeaweedFS).
|
|
#
|
|
# Spuštění:
|
|
# & "...\.venv\Scripts\python.exe" "...\export_from_seaweed_v1.2.py"
|
|
# ============================================================
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
from pymongo import MongoClient, ASCENDING
|
|
|
|
# ====================================================================
|
|
# RYCHLÉ NASTAVENÍ
|
|
# ====================================================================
|
|
# Které studie sestavit (podle scopes[]). Klidně víc najednou.
|
|
STUDIES = [
|
|
"77242113UCO3001",
|
|
"77242113UCO3002",
|
|
"77242113CRD3001",
|
|
"42847922MDD3003",
|
|
]
|
|
|
|
# Volitelně: napevno přiřaď konkrétní studii konkrétní adresář.
|
|
# Co tu NENÍ uvedeno, použije obecnou šablonu OUTPUT_ROOT_TMPL níže.
|
|
# Pod přiřazeným adresářem vznikají stejné podadresáře (STUDY / COUNTRY /
|
|
# číslo centra), jen kořen je tvůj vlastní.
|
|
STUDY_DIRS = {
|
|
# Dropbox mirror do podsložky "_vTMF" UVNITŘ ručně organizované složky
|
|
# studie. Mirror maže jen pod "_vTMF"; ostatní (ruční) složky studie
|
|
# (#010 Protocol, #020 ICF …) jsou vedle a zůstávají nedotčené.
|
|
"42847922MDD3003": r"dropbox:/!!42847922MDD3003/_vTMF",
|
|
"77242113UCO3001": r"dropbox:/!77242113UCO3001/_vTMF",
|
|
"77242113CRD3001": r"dropbox:/!77242113CRD3001/_vTMF",
|
|
"77242113UCO3002": r"dropbox:/77242113UCO3002/_vTMF",
|
|
}
|
|
|
|
# Kam pro studie, které NEJSOU v STUDY_DIRS: {study} se nahradí kódem
|
|
# studie. Pod tím vzniknou podadresáře dle úrovně (STUDY / COUNTRY /
|
|
# číslo centra).
|
|
OUTPUT_ROOT_TMPL = r"g:\x\{study}"
|
|
|
|
# Co stáhnout — přepni True/False:
|
|
EXPORT = {
|
|
"study": True, # -> <root>\STUDY\<Type>\<Subtype>\...
|
|
"country": True, # -> <root>\COUNTRY\<Type>\<Subtype>\...
|
|
"site": True, # -> <root>\<číslo centra>\<Type>\<Subtype>\...
|
|
}
|
|
|
|
# Prefix čísla centra pro každou studii. Číslo centra má tvar
|
|
# "<PREFIX>#####" (např. "S10-CZ10004"). POZOR: stejné fyzické centrum
|
|
# má v KAŽDÉ studii jiný kód a dokument nese v sites[] VŠECHNY (M:N),
|
|
# takže bez tohoto filtru soubor spadne i do složek center cizích studií.
|
|
# Bereme tedy ze sites[] jen kódy začínající prefixem DANÉ studie.
|
|
# (Heuristika podle name nestačí — nechá i cizí prefixy.)
|
|
SITE_PREFIX = {
|
|
"42847922MDD3003": "S10-CZ",
|
|
"77242113UCO3001": "DD5-CZ",
|
|
"77242113CRD3001": "DD6-CZ",
|
|
# 77242113UCO3002: zatím bez center
|
|
}
|
|
|
|
# Exportovat jen tuto zemi (country úroveň + scope u country/site).
|
|
# None = všechny země. Prefixy výše jsou CZ, proto necháváme CZ.
|
|
ONLY_COUNTRY = "Czech Republic"
|
|
|
|
OVERWRITE = False # True = přepsat i existující soubory
|
|
|
|
# Dropbox mirror: cíl ve tvaru "dropbox:/cesta" se nahraje přes Dropbox
|
|
# API a udržuje se jako ČISTÝ MIRROR — co chybí nahraje, co v cíli přebývá
|
|
# smaže. Zdroj pravdy je SeaweedFS, v Dropboxu se nic nezálohuje.
|
|
# Credentials (APP_KEY/SECRET/REFRESH_TOKEN) se berou z tohoto .env:
|
|
DROPBOX_ENV = Path(__file__).resolve().parents[1] / "EmailsImport" / ".env"
|
|
DROPBOX_MIRROR_DELETE = True # False = jen nahrávat/aktualizovat, nemazat
|
|
# ====================================================================
|
|
|
|
MONGO_URI = "mongodb://192.168.1.76:27017"
|
|
MONGO_DB = "VTMF"
|
|
MONGO_COLL = "documents"
|
|
|
|
BAD_CHARS_RE = re.compile(r"[<>:\"/\\|?*\x00-\x1f�]")
|
|
|
|
|
|
def log(msg):
|
|
print(msg, flush=True)
|
|
|
|
|
|
def clean(s):
|
|
s = BAD_CHARS_RE.sub("_", str(s or ""))
|
|
s = re.sub(r"\s+", " ", s)
|
|
s = re.sub(r"_{2,}", "_", s)
|
|
return s.strip(" ._")
|
|
|
|
|
|
def filename_for(doc):
|
|
"""'YYYY-MM-DD Description [VTMF-xxx] [v1.0].<přípona>'."""
|
|
ext = Path(doc.get("seaweed_path", "")).suffix
|
|
date = doc.get("date") or ""
|
|
date_prefix = (date + " ") if date else ""
|
|
version = f" [{doc['version']}]" if doc.get("version") else ""
|
|
desc = clean(doc.get("desc")) or clean(doc.get("vtmf"))
|
|
return f"{date_prefix}{desc} [{doc['vtmf']}]{version}{ext}"
|
|
|
|
|
|
def base_rels_for(doc, level, site_prefix):
|
|
"""Základní podsložky (relativně ke kořeni) pro daný dokument.
|
|
study -> ['STUDY'], country -> ['COUNTRY'], site -> [<centra této studie>].
|
|
|
|
POZOR: sites[] je M:N — stejné fyzické centrum má v každé studii jiný
|
|
kód (S10-CZ10001 / BX4-CZ10001 / CH1-CZ10001 …) a dokument nese VŠECHNY.
|
|
Správný kód pro TUTO studii poznáme podle pevného prefixu (SITE_PREFIX),
|
|
např. 'S10-CZ'. Bereme jen sites[] začínající tímto prefixem; ostatní
|
|
(kódy center cizích studií) ignorujeme. Když nic nematchuje, dokument
|
|
do žádné site složky nepatří (vrať [])."""
|
|
if level == "study":
|
|
return ["STUDY"]
|
|
if level == "country":
|
|
return ["COUNTRY"]
|
|
pref = (site_prefix or "").upper()
|
|
return [clean(s) for s in doc.get("sites", [])
|
|
if s and str(s).upper().startswith(pref) and clean(s)]
|
|
|
|
|
|
def rel_targets(doc, level, site_prefix):
|
|
"""Cílové cesty relativně ke kořeni, oddělovač '/' (společný pro disk
|
|
i Dropbox). Konkrétní sink ('/' -> '\\' na disku) si poradí sám."""
|
|
fname = filename_for(doc)
|
|
typ = clean(doc.get("type")) or "_"
|
|
sub = clean(doc.get("subtype")) or "_"
|
|
return [f"{base}/{typ}/{sub}/{fname}"
|
|
for base in base_rels_for(doc, level, site_prefix)]
|
|
|
|
|
|
# --------------------------------------------------------------------
|
|
# Sinky: kam a jak zapisovat (lokální disk vs. Dropbox API mirror)
|
|
# --------------------------------------------------------------------
|
|
class LocalSink:
|
|
"""Zápis na disk. Idempotentní (přeskakuje existující), bez mazání."""
|
|
|
|
def __init__(self, spec):
|
|
self.root = Path(spec)
|
|
self.label = str(self.root)
|
|
self.root.mkdir(parents=True, exist_ok=True)
|
|
|
|
def exists(self, rel):
|
|
return (self.root / rel).exists()
|
|
|
|
def keep(self, rel):
|
|
pass
|
|
|
|
def write(self, rel, data):
|
|
dest = self.root / rel
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
dest.write_bytes(data)
|
|
|
|
def finalize(self):
|
|
return 0
|
|
|
|
|
|
class DropboxSink:
|
|
"""Zápis přes Dropbox API jako ČISTÝ MIRROR. Co chybí nahraje, co v cíli
|
|
přebývá (a není v tomto exportu) smaže. Dokumenty jsou neměnné verze
|
|
(VTMF-xxx vX.Y), takže existence cesty = shoda obsahu — existující se
|
|
nenahrávají znovu, obsah se neporovnává."""
|
|
|
|
def __init__(self, spec):
|
|
import dropbox
|
|
from dotenv import load_dotenv
|
|
load_dotenv(DROPBOX_ENV)
|
|
self._dbx = dropbox
|
|
self.dbx = dropbox.Dropbox(
|
|
app_key=os.getenv("DROPBOX_APP_KEY"),
|
|
app_secret=os.getenv("DROPBOX_APP_SECRET"),
|
|
oauth2_refresh_token=os.getenv("DROPBOX_APP_REFRESH_TOKEN"),
|
|
)
|
|
path = spec.split(":", 1)[1].strip().replace("\\", "/")
|
|
self.base = "/" + path.strip("/")
|
|
self.label = f"dropbox:{self.base}"
|
|
self.remote_files = {} # path_lower -> path_display (soubory)
|
|
self.remote_dirs = {} # path_lower -> path_display (složky)
|
|
self.seen = set() # path_lower, které ponecháváme
|
|
self._load_remote()
|
|
|
|
def _full(self, rel):
|
|
return f"{self.base}/{rel}"
|
|
|
|
def _load_remote(self):
|
|
files = self._dbx.files
|
|
try:
|
|
res = self.dbx.files_list_folder(self.base, recursive=True)
|
|
except self._dbx.exceptions.ApiError:
|
|
return # cíl ještě neexistuje — nic ke smazání
|
|
while True:
|
|
for e in res.entries:
|
|
if isinstance(e, files.FileMetadata):
|
|
self.remote_files[e.path_lower] = e.path_display
|
|
elif isinstance(e, files.FolderMetadata):
|
|
self.remote_dirs[e.path_lower] = e.path_display
|
|
if not res.has_more:
|
|
break
|
|
res = self.dbx.files_list_folder_continue(res.cursor)
|
|
|
|
def exists(self, rel):
|
|
return self._full(rel).lower() in self.remote_files
|
|
|
|
def keep(self, rel):
|
|
self.seen.add(self._full(rel).lower())
|
|
|
|
def write(self, rel, data):
|
|
full = self._full(rel)
|
|
self._upload(full, data)
|
|
self.remote_files[full.lower()] = full
|
|
|
|
def _upload(self, full, data):
|
|
files = self._dbx.files
|
|
wm = files.WriteMode.overwrite
|
|
chunk = 8 * 1024 * 1024
|
|
if len(data) <= chunk:
|
|
self.dbx.files_upload(data, full, mode=wm, mute=True)
|
|
return
|
|
# velké soubory přes upload session
|
|
start = self.dbx.files_upload_session_start(data[:chunk])
|
|
cursor = files.UploadSessionCursor(session_id=start.session_id, offset=chunk)
|
|
off = chunk
|
|
while len(data) - off > chunk:
|
|
self.dbx.files_upload_session_append_v2(data[off:off + chunk], cursor)
|
|
off += chunk
|
|
cursor.offset = off
|
|
self.dbx.files_upload_session_finish(
|
|
data[off:], cursor, files.CommitInfo(path=full, mode=wm, mute=True))
|
|
|
|
def finalize(self):
|
|
# potřebné složky = všichni předci ponechaných souborů (+ base)
|
|
base_low = self.base.lower()
|
|
needed = {base_low}
|
|
for low in self.seen:
|
|
p = low.rsplit("/", 1)[0]
|
|
while len(p) >= len(base_low):
|
|
needed.add(p)
|
|
if p == base_low:
|
|
break
|
|
p = p.rsplit("/", 1)[0]
|
|
# přebytky = soubory mimo seen + složky mimo needed; mažeme jen ty
|
|
# nejvyšší (rodič je potřebný), smazání složky odstraní i podstrom
|
|
extra = []
|
|
for low, disp in self.remote_files.items():
|
|
if low not in self.seen and low.rsplit("/", 1)[0] in needed:
|
|
extra.append(disp)
|
|
for low, disp in self.remote_dirs.items():
|
|
if low not in needed and low.rsplit("/", 1)[0] in needed:
|
|
extra.append(disp)
|
|
if not extra:
|
|
return 0
|
|
if not DROPBOX_MIRROR_DELETE:
|
|
log(f" [mirror] {len(extra)} přebytečných položek (mazání vypnuto)")
|
|
return 0
|
|
files = self._dbx.files
|
|
for i in range(0, len(extra), 1000):
|
|
chunk = [files.DeleteArg(path=p) for p in extra[i:i + 1000]]
|
|
res = self.dbx.files_delete_batch(chunk)
|
|
if res.is_async_job_id():
|
|
job = res.get_async_job_id()
|
|
while True:
|
|
st = self.dbx.files_delete_batch_check(job)
|
|
if st.is_complete() or st.is_failed():
|
|
break
|
|
time.sleep(1)
|
|
log(f" [mirror] smazáno {len(extra)} přebytečných položek")
|
|
return len(extra)
|
|
|
|
|
|
def make_sink(spec):
|
|
if str(spec).lower().startswith("dropbox:"):
|
|
return DropboxSink(spec)
|
|
return LocalSink(spec)
|
|
|
|
|
|
def export_study_level(coll, study, level, sink):
|
|
site_prefix = SITE_PREFIX.get(study)
|
|
if level == "site" and not site_prefix:
|
|
log(f" [site] {study}: chybí prefix v SITE_PREFIX — přeskakuji.")
|
|
return 0, 0, 0, 0
|
|
# příslušnost ke studii/úrovni přes scope "level|study|[country]"
|
|
# country/site filtrujeme i na zemi (ONLY_COUNTRY); study je globální
|
|
if level in ("country", "site") and ONLY_COUNTRY:
|
|
scope_prefix = re.escape(f"{level}|{study}|{ONLY_COUNTRY}")
|
|
else:
|
|
scope_prefix = re.escape(f"{level}|{study}|")
|
|
q = {"scopes": {"$regex": f"^{scope_prefix}"},
|
|
"deleted": False, "downloaded": True,
|
|
"placeholder": {"$ne": True}, "seaweed_path": {"$ne": None}}
|
|
docs = list(coll.find(q).sort([("vtmf", ASCENDING), ("version", ASCENDING)]))
|
|
log(f" [{level}] dokumentů: {len(docs)}")
|
|
if not docs:
|
|
return 0, 0, 0, 0
|
|
|
|
written = skipped = failed = 0
|
|
total_bytes = 0
|
|
for n, doc in enumerate(docs, 1):
|
|
rels = rel_targets(doc, level, site_prefix)
|
|
if not rels:
|
|
continue
|
|
for rel in rels:
|
|
sink.keep(rel) # i přeskočené si v mirroru ponecháme
|
|
need = rels if OVERWRITE else [rel for rel in rels if not sink.exists(rel)]
|
|
if not need:
|
|
skipped += len(rels)
|
|
continue
|
|
try:
|
|
with urllib.request.urlopen(doc["seaweed_url"], timeout=120) as resp:
|
|
data = resp.read()
|
|
for rel in need:
|
|
sink.write(rel, data)
|
|
written += 1
|
|
total_bytes += len(data)
|
|
skipped += len(rels) - len(need)
|
|
kb = len(data) / 1024
|
|
size = f"{kb:.0f} KB" if kb < 1024 else f"{kb / 1024:.1f} MB"
|
|
extra = f" (+{len(need)} kopií)" if len(need) > 1 else ""
|
|
log(f" [{n}/{len(docs)}] {need[0]} ({size}){extra}")
|
|
except Exception as e:
|
|
failed += 1
|
|
log(f" [!] {doc['_id']}: {e}")
|
|
return written, skipped, failed, total_bytes
|
|
|
|
|
|
def main():
|
|
coll = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)[MONGO_DB][MONGO_COLL]
|
|
levels = [lvl for lvl in ("study", "country", "site") if EXPORT.get(lvl)]
|
|
log(f"[i] Studie: {', '.join(STUDIES)}")
|
|
log(f"[i] Úrovně: {', '.join(levels) if levels else '(žádná)'}")
|
|
log(f"[i] Cíl: {OUTPUT_ROOT_TMPL} (Dropbox: cíl 'dropbox:/cesta')\n")
|
|
if not levels:
|
|
sys.exit(0)
|
|
|
|
gw = gs = gf = gb = gd = 0
|
|
for study in STUDIES:
|
|
spec = STUDY_DIRS.get(study) or OUTPUT_ROOT_TMPL.format(study=study)
|
|
# má studie přes scopes vůbec něco?
|
|
has = coll.count_documents({"scopes": {"$regex": f"^[a-z]+\\|{re.escape(study)}\\|"}})
|
|
if not has:
|
|
log(f"=== {study} -> {spec} ===")
|
|
log(f"[i] {study}: přes scopes nic — pipeline pro tuto studii "
|
|
f"zatím neproběhla, přeskakuji.\n")
|
|
continue
|
|
sink = make_sink(spec)
|
|
log(f"=== {study} -> {sink.label} (scoped dokumentů: {has}) ===")
|
|
for level in levels:
|
|
w, s, f, b = export_study_level(coll, study, level, sink)
|
|
gw += w; gs += s; gf += f; gb += b
|
|
gd += sink.finalize()
|
|
log("")
|
|
|
|
mb = gb / 1024 / 1024
|
|
log(f"[ok] Hotovo: {gw} souborů zapsáno ({mb:.1f} MB), "
|
|
f"{gs} přeskočeno (už existuje), {gd} smazáno (mirror), {gf} chyb.")
|
|
sys.exit(1 if gf else 0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|