""" ============================================================================== Skript: enrich_files_v1.0.py Verze: 1.0 Datum: 2026-06-03 Autor: vladimir.buzalka Popis: Doplni metadata z obsahu souboru (PDF/DOCX/XLSX/PPTX/EML/MSG) do existujicich zaznamu v MongoDB (db: soubory). Pole se uklada do podobjektu `content`: - common: ok (bool), error (str|None), parsed_at, parser_version - pdf: pages, author, title, subject, creator, producer, created, modified, encrypted, text_head (prvni stranka, max 2000 znaku) - docx: author, title, subject, last_modified_by, paragraphs, words, created, modified, text_head - xlsx: sheets [{name, rows, cols}], total_sheets, author, title, last_modified_by, created, modified - pptx: slides, author, title, subject, last_modified_by, created, modified, text_head (text z prvnich 3 snimku) - eml: subject, from, to, cc, date, has_attachments, attachments [filenames], body_head - msg: same as eml Inkrementalni: - preskaci soubor, kde content.sha256_at_parse == aktualni sha256 a content.parser_version == aktualni verze - pri zmene obsahu (jiny sha256) prepocita - pri chybe ulozi content.error a content.ok=False MongoDB: 192.168.1.76:27017 DB: soubory ============================================================================== """ from __future__ import annotations import email import email.policy import sys import time import traceback from datetime import datetime, timezone from pathlib import Path from pymongo import MongoClient, UpdateOne MONGO_URI = "mongodb://192.168.1.76:27017" DB_NAME = "soubory" COLLECTIONS = ["42847922MDD3003", "77242113UCO3001"] PARSER_VERSION = "1.0" TEXT_HEAD_LIMIT = 2000 # limity pro velke soubory - aby skript neuvazil na 1GB PDF MAX_PDF_BYTES = 500 * 1024 * 1024 # 500 MB MAX_XLSX_BYTES = 200 * 1024 * 1024 MAX_GENERIC_BYTES = 300 * 1024 * 1024 def _now() -> datetime: return datetime.now(tz=timezone.utc) def _truncate(s: str | None, n: int = TEXT_HEAD_LIMIT) -> str | None: if s is None: return None s = s.strip() return s if len(s) <= n else s[:n] def _to_dt(value): if isinstance(value, datetime): return value if value.tzinfo else value.replace(tzinfo=timezone.utc) if isinstance(value, str) and value: try: return datetime.fromisoformat(value.replace("Z", "+00:00")) except ValueError: return None return None # --- PARSERY ---------------------------------------------------------------- def parse_pdf(path: Path) -> dict: from pypdf import PdfReader reader = PdfReader(str(path)) info = reader.metadata or {} out = { "pages": len(reader.pages), "encrypted": reader.is_encrypted, "author": getattr(info, "author", None), "title": getattr(info, "title", None), "subject": getattr(info, "subject", None), "creator": getattr(info, "creator", None), "producer": getattr(info, "producer", None), "created": _to_dt(getattr(info, "creation_date", None)), "modified": _to_dt(getattr(info, "modification_date", None)), } text_head = None try: if not reader.is_encrypted and reader.pages: text_head = reader.pages[0].extract_text() except Exception: text_head = None out["text_head"] = _truncate(text_head) return out def parse_docx(path: Path) -> dict: from docx import Document doc = Document(str(path)) core = doc.core_properties paragraphs = doc.paragraphs text = "\n".join(p.text for p in paragraphs if p.text) words = len(text.split()) return { "author": core.author, "title": core.title, "subject": core.subject, "last_modified_by": core.last_modified_by, "paragraphs": len(paragraphs), "words": words, "created": _to_dt(core.created), "modified": _to_dt(core.modified), "text_head": _truncate(text), } def parse_xlsx(path: Path) -> dict: from openpyxl import load_workbook wb = load_workbook(str(path), read_only=True, data_only=False) sheets = [] for ws in wb.worksheets: sheets.append({ "name": ws.title, "rows": ws.max_row, "cols": ws.max_column, }) props = wb.properties out = { "total_sheets": len(sheets), "sheets": sheets, "author": props.creator, "title": props.title, "subject": props.subject, "last_modified_by": props.lastModifiedBy, "created": _to_dt(props.created), "modified": _to_dt(props.modified), } wb.close() return out def parse_pptx(path: Path) -> dict: from pptx import Presentation prs = Presentation(str(path)) core = prs.core_properties head_parts = [] for slide in list(prs.slides)[:3]: for shape in slide.shapes: if shape.has_text_frame: for para in shape.text_frame.paragraphs: for run in para.runs: if run.text: head_parts.append(run.text) return { "slides": len(prs.slides), "author": core.author, "title": core.title, "subject": core.subject, "last_modified_by": core.last_modified_by, "created": _to_dt(core.created), "modified": _to_dt(core.modified), "text_head": _truncate(" ".join(head_parts)), } def parse_eml(path: Path) -> dict: with path.open("rb") as f: msg = email.message_from_binary_file(f, policy=email.policy.default) attachments = [] body_parts = [] if msg.is_multipart(): for part in msg.walk(): disp = (part.get("Content-Disposition") or "").lower() ctype = part.get_content_type() if "attachment" in disp or part.get_filename(): fname = part.get_filename() if fname: attachments.append(fname) elif ctype == "text/plain": try: body_parts.append(part.get_content()) except Exception: pass else: try: body_parts.append(msg.get_content()) except Exception: pass def _addrs(field): v = msg.get(field) return v if v else None return { "subject": msg.get("Subject"), "from": _addrs("From"), "to": _addrs("To"), "cc": _addrs("Cc"), "date": msg.get("Date"), "has_attachments": bool(attachments), "attachments": attachments, "body_head": _truncate("\n".join(body_parts)), } def parse_msg(path: Path) -> dict: import extract_msg with extract_msg.openMsg(str(path)) as msg: attachments = [] for att in msg.attachments or []: try: fname = att.longFilename or att.shortFilename if fname: attachments.append(fname) except Exception: continue return { "subject": msg.subject, "from": msg.sender, "to": msg.to, "cc": msg.cc, "date": str(msg.date) if msg.date else None, "has_attachments": bool(attachments), "attachments": attachments, "body_head": _truncate(msg.body or ""), } PARSERS = { "pdf": (parse_pdf, MAX_PDF_BYTES), "docx": (parse_docx, MAX_GENERIC_BYTES), "xlsx": (parse_xlsx, MAX_XLSX_BYTES), "xlsm": (parse_xlsx, MAX_XLSX_BYTES), "pptx": (parse_pptx, MAX_GENERIC_BYTES), "eml": (parse_eml, MAX_GENERIC_BYTES), "msg": (parse_msg, MAX_GENERIC_BYTES), } # --- SUMMARY PRO KONZOLI ---------------------------------------------------- def _short(s, n=40): if not s: return "" s = str(s).replace("\n", " ").replace("\r", " ").strip() return s if len(s) <= n else s[:n] + "..." def _summary(content: dict, ext: str) -> str: if not content.get("ok"): return f"chyba: {_short(content.get('error'), 80)}" parts = [] if ext == "pdf": parts.append(f"{content.get('pages')}p") if content.get("encrypted"): parts.append("enc") if content.get("author"): parts.append(f"by={_short(content['author'], 25)}") if content.get("title"): parts.append(f"t={_short(content['title'], 30)}") elif ext == "docx": parts.append(f"{content.get('paragraphs')}para") parts.append(f"{content.get('words')}w") if content.get("author"): parts.append(f"by={_short(content['author'], 25)}") elif ext in ("xlsx", "xlsm"): n = content.get("total_sheets", 0) sheets = content.get("sheets") or [] names = ",".join(_short(s["name"], 12) for s in sheets[:3]) if n > 3: names += f",+{n-3}" parts.append(f"{n}sh[{names}]") if content.get("author"): parts.append(f"by={_short(content['author'], 20)}") elif ext == "pptx": parts.append(f"{content.get('slides')}slides") if content.get("author"): parts.append(f"by={_short(content['author'], 25)}") if content.get("title"): parts.append(f"t={_short(content['title'], 25)}") elif ext in ("eml", "msg"): if content.get("from"): parts.append(f"from={_short(content['from'], 25)}") if content.get("subject"): parts.append(f"subj={_short(content['subject'], 40)}") if content.get("has_attachments"): parts.append(f"att={len(content.get('attachments') or [])}") return " ".join(parts) if parts else "ok" # --- HLAVNI SMYCKA ---------------------------------------------------------- def enrich_collection(coll, study: str) -> dict: supported = list(PARSERS.keys()) query = { "ext": {"$in": supported}, "deleted_at": {"$exists": False}, "$or": [ {"content": {"$exists": False}}, {"content.parser_version": {"$ne": PARSER_VERSION}}, {"$expr": {"$ne": ["$content.sha256_at_parse", "$sha256"]}}, ], } total_pending = coll.count_documents(query) print(f"[{study}] k zpracovani: {total_pending} souboru") ops: list[UpdateOne] = [] processed = 0 ok = 0 errors = 0 too_big = 0 cursor = coll.find(query, {"path": 1, "ext": 1, "size_bytes": 1, "sha256": 1}, no_cursor_timeout=True) try: for doc in cursor: ext = doc["ext"] parser, max_bytes = PARSERS[ext] path = Path(doc["path"]) content: dict = { "parser_version": PARSER_VERSION, "parsed_at": _now(), "sha256_at_parse": doc.get("sha256"), } if not path.exists(): content.update(ok=False, error="file_missing") errors += 1 elif doc.get("size_bytes", 0) > max_bytes: content.update(ok=False, error=f"too_big_>{max_bytes}") too_big += 1 else: try: payload = parser(path) content["ok"] = True content.update(payload) ok += 1 except Exception as e: content["ok"] = False content["error"] = f"{type(e).__name__}: {e}"[:500] errors += 1 ops.append(UpdateOne({"_id": doc["_id"]}, {"$set": {"content": content}})) processed += 1 status = "OK " if content.get("ok") else ("BIG" if "too_big" in (content.get("error") or "") else "ERR") size_mb = (doc.get("size_bytes", 0) or 0) / 1024 / 1024 detail = _summary(content, ext) print(f" [{processed:>4}/{total_pending}] {status} {ext:<4} {size_mb:6.1f}MB {path.name} | {detail}", flush=True) if len(ops) >= 50: coll.bulk_write(ops, ordered=False) ops.clear() finally: cursor.close() if ops: coll.bulk_write(ops, ordered=False) return {"study": study, "processed": processed, "ok": ok, "errors": errors, "too_big": too_big} def main() -> int: t0 = time.time() client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) client.admin.command("ping") db = client[DB_NAME] results = [] for name in COLLECTIONS: results.append(enrich_collection(db[name], name)) print("\n=== SHRNUTI ===") for r in results: print(f" {r['study']}: processed={r['processed']} ok={r['ok']} " f"errors={r['errors']} too_big={r['too_big']}") print(f"\nCelkem trvalo: {time.time() - t0:.1f} s") return 0 if __name__ == "__main__": try: raise SystemExit(main()) except KeyboardInterrupt: print("\nPreruseno uzivatelem") except Exception: traceback.print_exc() sys.exit(1)