""" ============================================================================== Skript: enrich_fulltext_v1.0.py Verze: 1.0 Datum: 2026-06-03 Autor: vladimir.buzalka Popis: Vytahne PLNY TEXT z dokumentu odkazovanych v MongoDB (db: soubory) a ulozi ho do PostgreSQL (db: MongoSoubory) s GIN tsvector fulltext indexem. Zdroje: - MongoDB 192.168.1.76 db=soubory kolekce=42847922MDD3003, 77242113UCO3001 - PostgreSQL 192.168.1.76 db=MongoSoubory tabulka=documents Podporovane pripony: pdf, docx, xlsx, xlsm, pptx, eml, msg, txt, csv Inkrementalne: preskoci soubor, kde v PG existuje radek se shodnym sha256 a extractor_version a ok=true. Pri prvnim behu sam vytvori tabulku, indexy a textovou konfiguraci 'soubory' (unaccent + simple) - vyhleda case- a diakritika-insensitivni. ============================================================================== """ from __future__ import annotations import email import email.policy import sys import time import traceback from datetime import datetime, timezone from pathlib import Path import psycopg from pymongo import MongoClient # --- konfigurace ------------------------------------------------------------ MONGO_URI = "mongodb://192.168.1.76:27017" MONGO_DB = "soubory" MONGO_COLLECTIONS = ["42847922MDD3003", "77242113UCO3001"] PG_DSN = ("host=192.168.1.76 port=5432 dbname=MongoSoubory " "user=vladimir.buzalka password=Vlado7309208104++") EXTRACTOR_VERSION = "1.0" MAX_TEXT_BYTES = 5 * 1024 * 1024 # 5 MB textu na dokument max MAX_PDF_BYTES = 500 * 1024 * 1024 MAX_XLSX_BYTES = 200 * 1024 * 1024 MAX_GENERIC_BYTES = 300 * 1024 * 1024 SUPPORTED = ("pdf", "docx", "xlsx", "xlsm", "pptx", "eml", "msg", "txt", "csv") # --- SCHEMA ----------------------------------------------------------------- SCHEMA_SQL = """ CREATE EXTENSION IF NOT EXISTS unaccent; CREATE EXTENSION IF NOT EXISTS pg_trgm; DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_ts_config WHERE cfgname = 'soubory') THEN CREATE TEXT SEARCH CONFIGURATION soubory ( COPY = simple ); ALTER TEXT SEARCH CONFIGURATION soubory ALTER MAPPING FOR hword, hword_part, word WITH unaccent, simple; END IF; END$$; CREATE TABLE IF NOT EXISTS documents ( id BIGSERIAL PRIMARY KEY, mongo_id TEXT NOT NULL, study TEXT NOT NULL, path TEXT NOT NULL, rel_path TEXT, name TEXT, ext TEXT, sha256 TEXT NOT NULL, size_bytes BIGINT, mtime TIMESTAMPTZ, body TEXT, body_length INT, tsv tsvector GENERATED ALWAYS AS ( to_tsvector('soubory'::regconfig, coalesce(body, '')) ) STORED, extracted_at TIMESTAMPTZ DEFAULT now(), extractor_version TEXT, ok BOOLEAN, error TEXT, UNIQUE (study, path) ); CREATE INDEX IF NOT EXISTS documents_tsv_gin ON documents USING gin(tsv); CREATE INDEX IF NOT EXISTS documents_name_trgm ON documents USING gin(name gin_trgm_ops); CREATE INDEX IF NOT EXISTS documents_sha256_idx ON documents(sha256); CREATE INDEX IF NOT EXISTS documents_study_ext_idx ON documents(study, ext); """ # --- EXTRAKTORY (vraci string, max MAX_TEXT_BYTES) -------------------------- def _truncate(s: str) -> str: if not s: return "" b = s.encode("utf-8", errors="replace") if len(b) <= MAX_TEXT_BYTES: return s return b[:MAX_TEXT_BYTES].decode("utf-8", errors="ignore") def extract_pdf(path: Path) -> str: from pypdf import PdfReader reader = PdfReader(str(path)) if reader.is_encrypted: try: reader.decrypt("") except Exception: return "" parts = [] total = 0 for page in reader.pages: try: t = page.extract_text() or "" except Exception: continue parts.append(t) total += len(t) if total > MAX_TEXT_BYTES: break return _truncate("\n".join(parts)) def extract_docx(path: Path) -> str: from docx import Document doc = Document(str(path)) parts = [p.text for p in doc.paragraphs if p.text] for tbl in doc.tables: for row in tbl.rows: parts.append(" | ".join(c.text for c in row.cells)) return _truncate("\n".join(parts)) def extract_xlsx(path: Path) -> str: from openpyxl import load_workbook wb = load_workbook(str(path), read_only=True, data_only=True) parts = [] total = 0 for ws in wb.worksheets: parts.append(f"# {ws.title}") for row in ws.iter_rows(values_only=True): line = "\t".join("" if v is None else str(v) for v in row) if line.strip(): parts.append(line) total += len(line) if total > MAX_TEXT_BYTES: break if total > MAX_TEXT_BYTES: break wb.close() return _truncate("\n".join(parts)) def extract_pptx(path: Path) -> str: from pptx import Presentation prs = Presentation(str(path)) parts = [] for i, slide in enumerate(prs.slides, 1): parts.append(f"# slide {i}") for shape in slide.shapes: if shape.has_text_frame: for para in shape.text_frame.paragraphs: line = "".join(run.text for run in para.runs) if line.strip(): parts.append(line) if slide.has_notes_slide: notes = slide.notes_slide.notes_text_frame.text if notes: parts.append(f"[notes] {notes}") return _truncate("\n".join(parts)) def extract_eml(path: Path) -> str: with path.open("rb") as f: msg = email.message_from_binary_file(f, policy=email.policy.default) head = [] for k in ("From", "To", "Cc", "Subject", "Date"): v = msg.get(k) if v: head.append(f"{k}: {v}") parts = ["\n".join(head)] if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain" and not part.get_filename(): try: parts.append(part.get_content()) except Exception: pass else: try: parts.append(msg.get_content()) except Exception: pass return _truncate("\n\n".join(parts)) def extract_msg(path: Path) -> str: import extract_msg with extract_msg.openMsg(str(path)) as m: head = [] if m.subject: head.append(f"Subject: {m.subject}") if m.sender: head.append(f"From: {m.sender}") if m.to: head.append(f"To: {m.to}") if m.cc: head.append(f"Cc: {m.cc}") if m.date: head.append(f"Date: {m.date}") return _truncate("\n".join(head) + "\n\n" + (m.body or "")) def extract_text(path: Path) -> str: data = path.read_bytes()[:MAX_TEXT_BYTES] for enc in ("utf-8-sig", "cp1250", "latin-1"): try: return data.decode(enc) except UnicodeDecodeError: continue return data.decode("utf-8", errors="replace") EXTRACTORS = { "pdf": (extract_pdf, MAX_PDF_BYTES), "docx": (extract_docx, MAX_GENERIC_BYTES), "xlsx": (extract_xlsx, MAX_XLSX_BYTES), "xlsm": (extract_xlsx, MAX_XLSX_BYTES), "pptx": (extract_pptx, MAX_GENERIC_BYTES), "eml": (extract_eml, MAX_GENERIC_BYTES), "msg": (extract_msg, MAX_GENERIC_BYTES), "txt": (extract_text, MAX_GENERIC_BYTES), "csv": (extract_text, MAX_GENERIC_BYTES), } def _short(s, n=40): if not s: return "" s = str(s).replace("\n", " ").replace("\r", " ").strip() return s if len(s) <= n else s[:n] + "..." def _now() -> datetime: return datetime.now(tz=timezone.utc) # --- HLAVNI SMYCKA ---------------------------------------------------------- def process_collection(pg: psycopg.Connection, mongo_coll, study: str) -> dict: # nactu z PG existujici sha256 + verzi with pg.cursor() as cur: cur.execute( "SELECT path, sha256, extractor_version, ok FROM documents WHERE study = %s", (study,), ) existing = {row[0]: (row[1], row[2], row[3]) for row in cur.fetchall()} cursor = mongo_coll.find( {"ext": {"$in": list(EXTRACTORS.keys())}, "deleted_at": {"$exists": False}}, {"_id": 1, "path": 1, "rel_path": 1, "name": 1, "ext": 1, "sha256": 1, "size_bytes": 1, "mtime": 1}, no_cursor_timeout=True, ) processed = ok = errors = skipped = too_big = 0 queue = [] total_pending = mongo_coll.count_documents( {"ext": {"$in": list(EXTRACTORS.keys())}, "deleted_at": {"$exists": False}} ) print(f"[{study}] kandidatu v Mongo: {total_pending}") n = 0 try: for doc in cursor: n += 1 prev = existing.get(doc["path"]) if prev and prev[0] == doc.get("sha256") and prev[1] == EXTRACTOR_VERSION and prev[2]: skipped += 1 continue ext = doc["ext"] extractor, max_bytes = EXTRACTORS[ext] path = Path(doc["path"]) row = { "mongo_id": str(doc["_id"]), "study": study, "path": doc["path"], "rel_path": doc.get("rel_path"), "name": doc.get("name"), "ext": ext, "sha256": doc.get("sha256"), "size_bytes": doc.get("size_bytes"), "mtime": doc.get("mtime"), "body": None, "body_length": 0, "extracted_at": _now(), "extractor_version": EXTRACTOR_VERSION, "ok": False, "error": None, } status = "OK " detail = "" size_mb = (doc.get("size_bytes") or 0) / 1024 / 1024 if not path.exists(): row["error"] = "file_missing" status = "ERR"; detail = "file_missing"; errors += 1 elif (doc.get("size_bytes") or 0) > max_bytes: row["error"] = f"too_big_>{max_bytes}" status = "BIG"; detail = f"too_big_>{max_bytes//1024//1024}MB"; too_big += 1 else: try: body = extractor(path) or "" row["body"] = body if body else None row["body_length"] = len(body) row["ok"] = True ok += 1 detail = f"{len(body)} znaku {_short(body, 60)!r}" except Exception as e: row["error"] = f"{type(e).__name__}: {e}"[:500] status = "ERR"; detail = row["error"][:80]; errors += 1 queue.append(row) processed += 1 print(f" [{n:>4}/{total_pending}] {status} {ext:<4} {size_mb:6.1f}MB " f"{path.name} | {detail}", flush=True) if len(queue) >= 50: _flush(pg, queue); queue.clear() finally: cursor.close() if queue: _flush(pg, queue) return {"study": study, "processed": processed, "ok": ok, "errors": errors, "skipped": skipped, "too_big": too_big} UPSERT_SQL = """ INSERT INTO documents (mongo_id, study, path, rel_path, name, ext, sha256, size_bytes, mtime, body, body_length, extracted_at, extractor_version, ok, error) VALUES (%(mongo_id)s, %(study)s, %(path)s, %(rel_path)s, %(name)s, %(ext)s, %(sha256)s, %(size_bytes)s, %(mtime)s, %(body)s, %(body_length)s, %(extracted_at)s, %(extractor_version)s, %(ok)s, %(error)s) ON CONFLICT (study, path) DO UPDATE SET mongo_id = EXCLUDED.mongo_id, rel_path = EXCLUDED.rel_path, name = EXCLUDED.name, ext = EXCLUDED.ext, sha256 = EXCLUDED.sha256, size_bytes = EXCLUDED.size_bytes, mtime = EXCLUDED.mtime, body = EXCLUDED.body, body_length = EXCLUDED.body_length, extracted_at = EXCLUDED.extracted_at, extractor_version = EXCLUDED.extractor_version, ok = EXCLUDED.ok, error = EXCLUDED.error """ def _flush(pg: psycopg.Connection, rows: list[dict]) -> None: with pg.cursor() as cur: cur.executemany(UPSERT_SQL, rows) pg.commit() def main() -> int: t0 = time.time() print("Pripojuji se k PostgreSQL...") pg = psycopg.connect(PG_DSN, connect_timeout=10) with pg.cursor() as cur: cur.execute(SCHEMA_SQL) pg.commit() print("Schema OK.") print("Pripojuji se k MongoDB...") mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) mongo.admin.command("ping") db = mongo[MONGO_DB] print("Mongo OK.") results = [] for name in MONGO_COLLECTIONS: results.append(process_collection(pg, db[name], name)) pg.close() print("\n=== SHRNUTI ===") for r in results: print(f" {r['study']}: processed={r['processed']} ok={r['ok']} " f"errors={r['errors']} skipped={r['skipped']} too_big={r['too_big']}") print(f"\nCelkem trvalo: {time.time() - t0:.1f} s") return 0 if __name__ == "__main__": try: raise SystemExit(main()) except KeyboardInterrupt: print("\nPreruseno uzivatelem") except Exception: traceback.print_exc() sys.exit(1)