""" ============================================================================== Skript: scan_files_v1.0.py Verze: 1.0 Datum: 2026-06-03 Autor: vladimir.buzalka Popis: Rekurzivni sken Dropbox slozek dvou studii a zapis metadat vsech souboru do MongoDB (db: soubory, kolekce = nazev studie). - cesty k Dropboxu se zjisti pres Knihovny.najdi_dropbox - pro kazdy soubor: stat, sha256, mime (podle pripony), parsing data v nazvu (12JAN2026, 2026-01-12, 12-01-2026 ...) - inkrementalni: pokud size+mtime souhlasi se zaznamem v DB, sha256 se nepocita znovu (jen se aktualizuje last_seen_at) - smazane soubory dostanou deleted_at pri behu, ve kterem uz nebyly videny - vynechavaji se: .dropbox*, Thumbs.db, desktop.ini, ~$*.* (Office lock), .DS_Store, *.tmp MongoDB: 192.168.1.76:27017, bez autentizace DB: soubory Kolekce: 42847922MDD3003, 77242113UCO3001 (extrahovano z rootu cesty) ============================================================================== """ from __future__ import annotations import hashlib import mimetypes import os import re import sys import time from datetime import datetime, timezone from pathlib import Path from pymongo import MongoClient, UpdateOne, ASCENDING # --- prida Knihovny do path ------------------------------------------------- HERE = Path(__file__).resolve().parent sys.path.insert(0, str(HERE.parent)) from Knihovny.najdi_dropbox import get_dropbox_root # noqa: E402 # --- konfigurace ------------------------------------------------------------ MONGO_URI = "mongodb://192.168.1.76:27017" DB_NAME = "soubory" STUDIES = { "42847922MDD3003": "!!42847922MDD3003", "77242113UCO3001": "!77242113UCO3001", } SKIP_NAME_PATTERNS = [ re.compile(r"^\.dropbox.*", re.IGNORECASE), re.compile(r"^Thumbs\.db$", re.IGNORECASE), re.compile(r"^desktop\.ini$", re.IGNORECASE), re.compile(r"^~\$.*", re.IGNORECASE), re.compile(r"^\.DS_Store$", re.IGNORECASE), ] SKIP_DIR_NAMES = {".dropbox.cache"} HASH_CHUNK = 1024 * 1024 # 1 MiB # --- parsovani datumu v nazvu ---------------------------------------------- MONTHS = { "JAN": 1, "FEB": 2, "MAR": 3, "APR": 4, "MAY": 5, "JUN": 6, "JUL": 7, "AUG": 8, "SEP": 9, "OCT": 10, "NOV": 11, "DEC": 12, } DATE_PATTERNS = [ # 12JAN2026 / 12Jan2026 (re.compile(r"(\d{1,2})([A-Za-z]{3})(\d{4})"), "dmonth"), # 2026-01-12 / 2026_01_12 / 2026.01.12 (re.compile(r"(20\d{2})[-_.](\d{1,2})[-_.](\d{1,2})"), "ymd"), # 12-01-2026 / 12_01_2026 / 12.01.2026 (re.compile(r"(\d{1,2})[-_.](\d{1,2})[-_.](20\d{2})"), "dmy"), ] def extract_dates(name: str) -> list[str]: """Vraci unikatni ISO datumy (YYYY-MM-DD) nalezene v nazvu.""" found: set[str] = set() for rx, kind in DATE_PATTERNS: for m in rx.finditer(name): try: if kind == "dmonth": d = int(m.group(1)) mo = MONTHS.get(m.group(2).upper()) y = int(m.group(3)) if not mo: continue elif kind == "ymd": y, mo, d = int(m.group(1)), int(m.group(2)), int(m.group(3)) else: # dmy d, mo, y = int(m.group(1)), int(m.group(2)), int(m.group(3)) datetime(y, mo, d) found.add(f"{y:04d}-{mo:02d}-{d:02d}") except ValueError: continue return sorted(found) TOKEN_RX = re.compile(r"[A-Za-z0-9]+") def tokenize(name: str) -> list[str]: return [t.lower() for t in TOKEN_RX.findall(name)] def should_skip(name: str) -> bool: return any(p.match(name) for p in SKIP_NAME_PATTERNS) def sha256_of(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: while True: chunk = f.read(HASH_CHUNK) if not chunk: break h.update(chunk) return h.hexdigest() def to_dt(ts: float) -> datetime: return datetime.fromtimestamp(ts, tz=timezone.utc) def scan_study(study_code: str, study_root: Path, db, scan_started_at: datetime) -> dict: coll = db[study_code] coll.create_index([("path", ASCENDING)], unique=True) coll.create_index([("ext", ASCENDING)]) coll.create_index([("dates_in_name", ASCENDING)]) coll.create_index([("tokens", ASCENDING)]) coll.create_index([("sha256", ASCENDING)]) # existujici zaznamy -> mapa path -> (size, mtime_iso, sha256) existing = { d["path"]: (d.get("size_bytes"), d.get("mtime"), d.get("sha256")) for d in coll.find({}, {"path": 1, "size_bytes": 1, "mtime": 1, "sha256": 1}) } ops: list[UpdateOne] = [] seen = 0 rehashed = 0 skipped = 0 errors: list[tuple[str, str]] = [] print(f"[{study_code}] sken: {study_root}") for root, dirs, files in os.walk(study_root): # vyrad skip-dirs in-place dirs[:] = [d for d in dirs if d not in SKIP_DIR_NAMES] for fname in files: if should_skip(fname): skipped += 1 continue fpath = Path(root) / fname try: st = fpath.stat() except OSError as e: errors.append((str(fpath), f"stat: {e}")) continue path_str = str(fpath) size = st.st_size mtime = to_dt(st.st_mtime) prev = existing.get(path_str) if prev and prev[0] == size and prev[1] == mtime and prev[2]: # bez zmeny - jen last_seen_at + clear deleted_at ops.append(UpdateOne( {"path": path_str}, {"$set": {"last_seen_at": scan_started_at}, "$unset": {"deleted_at": ""}}, )) else: try: digest = sha256_of(fpath) except OSError as e: errors.append((path_str, f"hash: {e}")) continue rehashed += 1 rel = fpath.relative_to(study_root) doc = { "path": path_str, "study": study_code, "rel_path": str(rel), "dir": str(fpath.parent), "rel_dir": str(rel.parent) if str(rel.parent) != "." else "", "parent_folders": list(rel.parts[:-1]), "name": fname, "stem": fpath.stem, "ext": fpath.suffix.lower().lstrip("."), "size_bytes": size, "mtime": mtime, "ctime": to_dt(st.st_ctime), "atime": to_dt(st.st_atime), "sha256": digest, "mime": mimetypes.guess_type(fname)[0], "tokens": tokenize(fpath.stem), "dates_in_name": extract_dates(fname), "last_seen_at": scan_started_at, } ops.append(UpdateOne( {"path": path_str}, {"$set": doc, "$unset": {"deleted_at": ""}, "$setOnInsert": {"first_seen_at": scan_started_at}}, upsert=True, )) seen += 1 if len(ops) >= 500: coll.bulk_write(ops, ordered=False) ops.clear() print(f" ... {seen} souboru zpracovano") if ops: coll.bulk_write(ops, ordered=False) # oznac smazane res = coll.update_many( {"last_seen_at": {"$lt": scan_started_at}, "deleted_at": {"$exists": False}}, {"$set": {"deleted_at": scan_started_at}}, ) return { "study": study_code, "seen": seen, "rehashed": rehashed, "unchanged": seen - rehashed, "skipped": skipped, "marked_deleted": res.modified_count, "errors": errors, } def main() -> int: t0 = time.time() dropbox_root = Path(get_dropbox_root()) print(f"Dropbox root: {dropbox_root}") client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) client.admin.command("ping") db = client[DB_NAME] scan_started_at = datetime.now(tz=timezone.utc) results = [] for study_code, folder in STUDIES.items(): study_root = dropbox_root / folder if not study_root.is_dir(): print(f"[!] {study_root} neexistuje, preskakuji") continue results.append(scan_study(study_code, study_root, db, scan_started_at)) print("\n=== SHRNUTI ===") for r in results: print(f" {r['study']}: seen={r['seen']} rehashed={r['rehashed']} " f"unchanged={r['unchanged']} skipped={r['skipped']} " f"deleted={r['marked_deleted']} errors={len(r['errors'])}") for path, err in r["errors"][:5]: print(f" ! {err} ({path})") if len(r["errors"]) > 5: print(f" ... +{len(r['errors']) - 5} dalsich chyb") print(f"\nCelkem trvalo: {time.time() - t0:.1f} s") return 0 if __name__ == "__main__": raise SystemExit(main())