""" ============================================================================== Skript: enrich_fulltext_v1.2.py Verze: 1.2 Datum: 2026-06-03 Autor: vladimir.buzalka Popis: Vytahne PLNY TEXT z dokumentu odkazovanych v MongoDB (db: soubory) a ulozi ho do PostgreSQL (db: MongoSoubory) s GIN tsvector indexem. Zmeny proti v1.1: - PG tsvector ma tvrdy limit ~1 MB binarne -> velky XLSX (5 MB textu) ho prekrocil. v1.2 generuje tsv z prvnich 800 000 znaku body: left(body, 800000). Sloupec body zustava plny (max 5 MB pro nahled / snippet). - SCHEMA_SQL provadi migraci sloupce tsv: pokud uz existuje stara verze (bez `left`), dropne index+sloupec a vytvori znovu s truncated vyrazem. - extractor_version = "1.2" -> preparsuji se vsechny radky z v1.0/v1.1. Zachovano z v1.1: - NUL bajty (0x00) se strippuji z body i error - DOCX fallback na raw XML pres regex pri padu python-docx Cilove ulozeni: - MongoDB 192.168.1.76 db=soubory kolekce=42847922MDD3003, 77242113UCO3001 - PostgreSQL 192.168.1.76 db=MongoSoubory tabulka=documents Podporovane pripony: pdf, docx, xlsx, xlsm, pptx, eml, msg, txt, csv ============================================================================== """ from __future__ import annotations import email import email.policy import re import sys import time import traceback import zipfile from datetime import datetime, timezone from pathlib import Path import psycopg from pymongo import MongoClient # --- konfigurace ------------------------------------------------------------ MONGO_URI = "mongodb://192.168.1.76:27017" MONGO_DB = "soubory" MONGO_COLLECTIONS = ["42847922MDD3003", "77242113UCO3001"] PG_DSN = ("host=192.168.1.76 port=5432 dbname=MongoSoubory " "user=vladimir.buzalka password=Vlado7309208104++") EXTRACTOR_VERSION = "1.2" MAX_TEXT_BYTES = 5 * 1024 * 1024 MAX_PDF_BYTES = 500 * 1024 * 1024 MAX_XLSX_BYTES = 200 * 1024 * 1024 MAX_GENERIC_BYTES = 300 * 1024 * 1024 SUPPORTED = ("pdf", "docx", "xlsx", "xlsm", "pptx", "eml", "msg", "txt", "csv") # --- SCHEMA ----------------------------------------------------------------- SCHEMA_SQL = """ CREATE EXTENSION IF NOT EXISTS unaccent; CREATE EXTENSION IF NOT EXISTS pg_trgm; DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_ts_config WHERE cfgname = 'soubory') THEN CREATE TEXT SEARCH CONFIGURATION soubory ( COPY = simple ); ALTER TEXT SEARCH CONFIGURATION soubory ALTER MAPPING FOR hword, hword_part, word WITH unaccent, simple; END IF; END$$; CREATE TABLE IF NOT EXISTS documents ( id BIGSERIAL PRIMARY KEY, mongo_id TEXT NOT NULL, study TEXT NOT NULL, path TEXT NOT NULL, rel_path TEXT, name TEXT, ext TEXT, sha256 TEXT NOT NULL, size_bytes BIGINT, mtime TIMESTAMPTZ, body TEXT, body_length INT, tsv tsvector GENERATED ALWAYS AS ( to_tsvector('soubory'::regconfig, left(coalesce(body, ''), 800000)) ) STORED, extracted_at TIMESTAMPTZ DEFAULT now(), extractor_version TEXT, ok BOOLEAN, error TEXT, UNIQUE (study, path) ); -- migrace tsv sloupce ze stareho vyrazu (bez `left`) na novy (s `left(..,800000)`) DO $$ DECLARE cur_expr TEXT; BEGIN SELECT pg_get_expr(d.adbin, d.adrelid) INTO cur_expr FROM pg_attribute a JOIN pg_class c ON c.oid = a.attrelid JOIN pg_attrdef d ON d.adrelid = a.attrelid AND d.adnum = a.attnum WHERE c.relname = 'documents' AND a.attname = 'tsv'; IF cur_expr IS NOT NULL AND position('left' in cur_expr) = 0 THEN EXECUTE 'DROP INDEX IF EXISTS documents_tsv_gin'; EXECUTE 'ALTER TABLE documents DROP COLUMN tsv'; EXECUTE 'ALTER TABLE documents ADD COLUMN tsv tsvector GENERATED ALWAYS AS ' || '(to_tsvector(''soubory''::regconfig, left(coalesce(body, ''''), 800000))) STORED'; END IF; END$$; CREATE INDEX IF NOT EXISTS documents_tsv_gin ON documents USING gin(tsv); CREATE INDEX IF NOT EXISTS documents_name_trgm ON documents USING gin(name gin_trgm_ops); CREATE INDEX IF NOT EXISTS documents_sha256_idx ON documents(sha256); CREATE INDEX IF NOT EXISTS documents_study_ext_idx ON documents(study, ext); """ # --- HELPERY ---------------------------------------------------------------- # odstrani 0x00 a ostatni controly krome whitespace _CTRL_RX = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f]") def _clean_for_pg(s: str) -> str: if not s: return "" return _CTRL_RX.sub("", s) def _truncate(s: str) -> str: s = _clean_for_pg(s or "") if not s: return "" b = s.encode("utf-8", errors="replace") if len(b) <= MAX_TEXT_BYTES: return s return b[:MAX_TEXT_BYTES].decode("utf-8", errors="ignore") # --- EXTRAKTORY ------------------------------------------------------------- def extract_pdf(path: Path) -> str: from pypdf import PdfReader reader = PdfReader(str(path)) if reader.is_encrypted: try: reader.decrypt("") except Exception: return "" parts = [] total = 0 for page in reader.pages: try: t = page.extract_text() or "" except Exception: continue parts.append(t) total += len(t) if total > MAX_TEXT_BYTES: break return _truncate("\n".join(parts)) # regex pro DOCX fallback - vytahne ... _DOCX_WT_RX = re.compile(r"]*>([^<]*)", re.DOTALL) _DOCX_WP_END_RX = re.compile(r"") def _docx_raw_text(path: Path) -> str: """Fallback - cte primo word/document.xml ze ZIPu.""" with zipfile.ZipFile(str(path)) as z: try: xml = z.read("word/document.xml").decode("utf-8", errors="replace") except KeyError: return "" xml = _DOCX_WP_END_RX.sub("\n", xml) return "\n".join(m.group(1) for m in _DOCX_WT_RX.finditer(xml)) def extract_docx(path: Path) -> str: from docx import Document try: doc = Document(str(path)) parts = [p.text for p in doc.paragraphs if p.text] for tbl in doc.tables: for row in tbl.rows: parts.append(" | ".join(c.text for c in row.cells)) return _truncate("\n".join(parts)) except Exception: # fallback - raw XML extract return _truncate(_docx_raw_text(path)) def extract_xlsx(path: Path) -> str: from openpyxl import load_workbook wb = load_workbook(str(path), read_only=True, data_only=True) parts = [] total = 0 for ws in wb.worksheets: parts.append(f"# {ws.title}") for row in ws.iter_rows(values_only=True): line = "\t".join("" if v is None else str(v) for v in row) if line.strip(): parts.append(line) total += len(line) if total > MAX_TEXT_BYTES: break if total > MAX_TEXT_BYTES: break wb.close() return _truncate("\n".join(parts)) def extract_pptx(path: Path) -> str: from pptx import Presentation prs = Presentation(str(path)) parts = [] for i, slide in enumerate(prs.slides, 1): parts.append(f"# slide {i}") for shape in slide.shapes: if shape.has_text_frame: for para in shape.text_frame.paragraphs: line = "".join(run.text for run in para.runs) if line.strip(): parts.append(line) if slide.has_notes_slide: notes = slide.notes_slide.notes_text_frame.text if notes: parts.append(f"[notes] {notes}") return _truncate("\n".join(parts)) def extract_eml(path: Path) -> str: with path.open("rb") as f: msg = email.message_from_binary_file(f, policy=email.policy.default) head = [] for k in ("From", "To", "Cc", "Subject", "Date"): v = msg.get(k) if v: head.append(f"{k}: {v}") parts = ["\n".join(head)] if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain" and not part.get_filename(): try: parts.append(part.get_content()) except Exception: pass else: try: parts.append(msg.get_content()) except Exception: pass return _truncate("\n\n".join(parts)) def extract_msg(path: Path) -> str: import extract_msg with extract_msg.openMsg(str(path)) as m: head = [] if m.subject: head.append(f"Subject: {m.subject}") if m.sender: head.append(f"From: {m.sender}") if m.to: head.append(f"To: {m.to}") if m.cc: head.append(f"Cc: {m.cc}") if m.date: head.append(f"Date: {m.date}") return _truncate("\n".join(head) + "\n\n" + (m.body or "")) def extract_text(path: Path) -> str: data = path.read_bytes()[:MAX_TEXT_BYTES] for enc in ("utf-8-sig", "cp1250", "latin-1"): try: return _truncate(data.decode(enc)) except UnicodeDecodeError: continue return _truncate(data.decode("utf-8", errors="replace")) EXTRACTORS = { "pdf": (extract_pdf, MAX_PDF_BYTES), "docx": (extract_docx, MAX_GENERIC_BYTES), "xlsx": (extract_xlsx, MAX_XLSX_BYTES), "xlsm": (extract_xlsx, MAX_XLSX_BYTES), "pptx": (extract_pptx, MAX_GENERIC_BYTES), "eml": (extract_eml, MAX_GENERIC_BYTES), "msg": (extract_msg, MAX_GENERIC_BYTES), "txt": (extract_text, MAX_GENERIC_BYTES), "csv": (extract_text, MAX_GENERIC_BYTES), } def _short(s, n=40): if not s: return "" s = str(s).replace("\n", " ").replace("\r", " ").strip() return s if len(s) <= n else s[:n] + "..." def _now() -> datetime: return datetime.now(tz=timezone.utc) # --- HLAVNI SMYCKA ---------------------------------------------------------- def process_collection(pg: psycopg.Connection, mongo_coll, study: str) -> dict: with pg.cursor() as cur: cur.execute( "SELECT path, sha256, extractor_version, ok FROM documents WHERE study = %s", (study,), ) existing = {row[0]: (row[1], row[2], row[3]) for row in cur.fetchall()} cursor = mongo_coll.find( {"ext": {"$in": list(EXTRACTORS.keys())}, "deleted_at": {"$exists": False}}, {"_id": 1, "path": 1, "rel_path": 1, "name": 1, "ext": 1, "sha256": 1, "size_bytes": 1, "mtime": 1}, no_cursor_timeout=True, ) processed = ok = errors = skipped = too_big = 0 queue: list[dict] = [] total_pending = mongo_coll.count_documents( {"ext": {"$in": list(EXTRACTORS.keys())}, "deleted_at": {"$exists": False}} ) print(f"[{study}] kandidatu v Mongo: {total_pending}") n = 0 try: for doc in cursor: n += 1 prev = existing.get(doc["path"]) if prev and prev[0] == doc.get("sha256") and prev[1] == EXTRACTOR_VERSION and prev[2]: skipped += 1 continue ext = doc["ext"] extractor, max_bytes = EXTRACTORS[ext] path = Path(doc["path"]) row = { "mongo_id": str(doc["_id"]), "study": study, "path": doc["path"], "rel_path": doc.get("rel_path"), "name": doc.get("name"), "ext": ext, "sha256": doc.get("sha256"), "size_bytes": doc.get("size_bytes"), "mtime": doc.get("mtime"), "body": None, "body_length": 0, "extracted_at": _now(), "extractor_version": EXTRACTOR_VERSION, "ok": False, "error": None, } status = "OK " detail = "" size_mb = (doc.get("size_bytes") or 0) / 1024 / 1024 if not path.exists(): row["error"] = "file_missing" status = "ERR"; detail = "file_missing"; errors += 1 elif (doc.get("size_bytes") or 0) > max_bytes: row["error"] = f"too_big_>{max_bytes}" status = "BIG"; detail = f"too_big_>{max_bytes//1024//1024}MB"; too_big += 1 else: try: body = extractor(path) or "" row["body"] = body if body else None row["body_length"] = len(body) row["ok"] = True ok += 1 detail = f"{len(body)} znaku {_short(body, 60)!r}" except Exception as e: row["error"] = f"{type(e).__name__}: {e}"[:500] status = "ERR"; detail = row["error"][:80]; errors += 1 queue.append(row) processed += 1 print(f" [{n:>4}/{total_pending}] {status} {ext:<4} {size_mb:6.1f}MB " f"{path.name} | {detail}", flush=True) if len(queue) >= 50: _flush(pg, queue); queue.clear() finally: cursor.close() if queue: _flush(pg, queue) return {"study": study, "processed": processed, "ok": ok, "errors": errors, "skipped": skipped, "too_big": too_big} UPSERT_SQL = """ INSERT INTO documents (mongo_id, study, path, rel_path, name, ext, sha256, size_bytes, mtime, body, body_length, extracted_at, extractor_version, ok, error) VALUES (%(mongo_id)s, %(study)s, %(path)s, %(rel_path)s, %(name)s, %(ext)s, %(sha256)s, %(size_bytes)s, %(mtime)s, %(body)s, %(body_length)s, %(extracted_at)s, %(extractor_version)s, %(ok)s, %(error)s) ON CONFLICT (study, path) DO UPDATE SET mongo_id = EXCLUDED.mongo_id, rel_path = EXCLUDED.rel_path, name = EXCLUDED.name, ext = EXCLUDED.ext, sha256 = EXCLUDED.sha256, size_bytes = EXCLUDED.size_bytes, mtime = EXCLUDED.mtime, body = EXCLUDED.body, body_length = EXCLUDED.body_length, extracted_at = EXCLUDED.extracted_at, extractor_version = EXCLUDED.extractor_version, ok = EXCLUDED.ok, error = EXCLUDED.error """ def _flush(pg: psycopg.Connection, rows: list[dict]) -> None: # posledni pojistka - jeste jednou strip NUL (kdyby se necim prokrouzil) for r in rows: if r.get("body"): r["body"] = _clean_for_pg(r["body"]) if r.get("error"): r["error"] = _clean_for_pg(r["error"]) with pg.cursor() as cur: cur.executemany(UPSERT_SQL, rows) pg.commit() def main() -> int: t0 = time.time() print("Pripojuji se k PostgreSQL...") pg = psycopg.connect(PG_DSN, connect_timeout=10) with pg.cursor() as cur: cur.execute(SCHEMA_SQL) pg.commit() print("Schema OK.") print("Pripojuji se k MongoDB...") mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) mongo.admin.command("ping") db = mongo[MONGO_DB] print("Mongo OK.") results = [] for name in MONGO_COLLECTIONS: results.append(process_collection(pg, db[name], name)) pg.close() print("\n=== SHRNUTI ===") for r in results: print(f" {r['study']}: processed={r['processed']} ok={r['ok']} " f"errors={r['errors']} skipped={r['skipped']} too_big={r['too_big']}") print(f"\nCelkem trvalo: {time.time() - t0:.1f} s") return 0 if __name__ == "__main__": try: raise SystemExit(main()) except KeyboardInterrupt: print("\nPreruseno uzivatelem") except Exception: traceback.print_exc() sys.exit(1)