Add Outlook/Soubory/Clario/Feasibility scripts and reports; ignore Incoming, Outlook downloads & profile
This commit is contained in:
@@ -0,0 +1,481 @@
|
||||
"""
|
||||
==============================================================================
|
||||
Skript: enrich_fulltext_v1.2.py
|
||||
Verze: 1.2
|
||||
Datum: 2026-06-03
|
||||
Autor: vladimir.buzalka
|
||||
Popis: Vytahne PLNY TEXT z dokumentu odkazovanych v MongoDB (db: soubory)
|
||||
a ulozi ho do PostgreSQL (db: MongoSoubory) s GIN tsvector indexem.
|
||||
|
||||
Zmeny proti v1.1:
|
||||
- PG tsvector ma tvrdy limit ~1 MB binarne -> velky XLSX (5 MB textu) ho prekrocil.
|
||||
v1.2 generuje tsv z prvnich 800 000 znaku body: left(body, 800000).
|
||||
Sloupec body zustava plny (max 5 MB pro nahled / snippet).
|
||||
- SCHEMA_SQL provadi migraci sloupce tsv: pokud uz existuje stara verze
|
||||
(bez `left`), dropne index+sloupec a vytvori znovu s truncated vyrazem.
|
||||
- extractor_version = "1.2" -> preparsuji se vsechny radky z v1.0/v1.1.
|
||||
|
||||
Zachovano z v1.1:
|
||||
- NUL bajty (0x00) se strippuji z body i error
|
||||
- DOCX fallback na raw XML pres regex pri padu python-docx
|
||||
|
||||
Cilove ulozeni:
|
||||
- MongoDB 192.168.1.76 db=soubory kolekce=42847922MDD3003, 77242113UCO3001
|
||||
- PostgreSQL 192.168.1.76 db=MongoSoubory tabulka=documents
|
||||
|
||||
Podporovane pripony: pdf, docx, xlsx, xlsm, pptx, eml, msg, txt, csv
|
||||
==============================================================================
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import email
|
||||
import email.policy
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
import zipfile
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
import psycopg
|
||||
from pymongo import MongoClient
|
||||
|
||||
# --- konfigurace ------------------------------------------------------------
|
||||
MONGO_URI = "mongodb://192.168.1.76:27017"
|
||||
MONGO_DB = "soubory"
|
||||
MONGO_COLLECTIONS = ["42847922MDD3003", "77242113UCO3001"]
|
||||
|
||||
PG_DSN = ("host=192.168.1.76 port=5432 dbname=MongoSoubory "
|
||||
"user=vladimir.buzalka password=Vlado7309208104++")
|
||||
|
||||
EXTRACTOR_VERSION = "1.2"
|
||||
|
||||
MAX_TEXT_BYTES = 5 * 1024 * 1024
|
||||
MAX_PDF_BYTES = 500 * 1024 * 1024
|
||||
MAX_XLSX_BYTES = 200 * 1024 * 1024
|
||||
MAX_GENERIC_BYTES = 300 * 1024 * 1024
|
||||
|
||||
SUPPORTED = ("pdf", "docx", "xlsx", "xlsm", "pptx", "eml", "msg", "txt", "csv")
|
||||
|
||||
|
||||
# --- SCHEMA -----------------------------------------------------------------
|
||||
|
||||
SCHEMA_SQL = """
|
||||
CREATE EXTENSION IF NOT EXISTS unaccent;
|
||||
CREATE EXTENSION IF NOT EXISTS pg_trgm;
|
||||
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (SELECT 1 FROM pg_ts_config WHERE cfgname = 'soubory') THEN
|
||||
CREATE TEXT SEARCH CONFIGURATION soubory ( COPY = simple );
|
||||
ALTER TEXT SEARCH CONFIGURATION soubory
|
||||
ALTER MAPPING FOR hword, hword_part, word
|
||||
WITH unaccent, simple;
|
||||
END IF;
|
||||
END$$;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS documents (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
mongo_id TEXT NOT NULL,
|
||||
study TEXT NOT NULL,
|
||||
path TEXT NOT NULL,
|
||||
rel_path TEXT,
|
||||
name TEXT,
|
||||
ext TEXT,
|
||||
sha256 TEXT NOT NULL,
|
||||
size_bytes BIGINT,
|
||||
mtime TIMESTAMPTZ,
|
||||
body TEXT,
|
||||
body_length INT,
|
||||
tsv tsvector GENERATED ALWAYS AS (
|
||||
to_tsvector('soubory'::regconfig, left(coalesce(body, ''), 800000))
|
||||
) STORED,
|
||||
extracted_at TIMESTAMPTZ DEFAULT now(),
|
||||
extractor_version TEXT,
|
||||
ok BOOLEAN,
|
||||
error TEXT,
|
||||
UNIQUE (study, path)
|
||||
);
|
||||
|
||||
-- migrace tsv sloupce ze stareho vyrazu (bez `left`) na novy (s `left(..,800000)`)
|
||||
DO $$
|
||||
DECLARE
|
||||
cur_expr TEXT;
|
||||
BEGIN
|
||||
SELECT pg_get_expr(d.adbin, d.adrelid)
|
||||
INTO cur_expr
|
||||
FROM pg_attribute a
|
||||
JOIN pg_class c ON c.oid = a.attrelid
|
||||
JOIN pg_attrdef d ON d.adrelid = a.attrelid AND d.adnum = a.attnum
|
||||
WHERE c.relname = 'documents' AND a.attname = 'tsv';
|
||||
|
||||
IF cur_expr IS NOT NULL AND position('left' in cur_expr) = 0 THEN
|
||||
EXECUTE 'DROP INDEX IF EXISTS documents_tsv_gin';
|
||||
EXECUTE 'ALTER TABLE documents DROP COLUMN tsv';
|
||||
EXECUTE 'ALTER TABLE documents ADD COLUMN tsv tsvector GENERATED ALWAYS AS '
|
||||
|| '(to_tsvector(''soubory''::regconfig, left(coalesce(body, ''''), 800000))) STORED';
|
||||
END IF;
|
||||
END$$;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS documents_tsv_gin ON documents USING gin(tsv);
|
||||
CREATE INDEX IF NOT EXISTS documents_name_trgm ON documents USING gin(name gin_trgm_ops);
|
||||
CREATE INDEX IF NOT EXISTS documents_sha256_idx ON documents(sha256);
|
||||
CREATE INDEX IF NOT EXISTS documents_study_ext_idx ON documents(study, ext);
|
||||
"""
|
||||
|
||||
|
||||
# --- HELPERY ----------------------------------------------------------------
|
||||
|
||||
# odstrani 0x00 a ostatni controly krome whitespace
|
||||
_CTRL_RX = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f]")
|
||||
|
||||
|
||||
def _clean_for_pg(s: str) -> str:
|
||||
if not s:
|
||||
return ""
|
||||
return _CTRL_RX.sub("", s)
|
||||
|
||||
|
||||
def _truncate(s: str) -> str:
|
||||
s = _clean_for_pg(s or "")
|
||||
if not s:
|
||||
return ""
|
||||
b = s.encode("utf-8", errors="replace")
|
||||
if len(b) <= MAX_TEXT_BYTES:
|
||||
return s
|
||||
return b[:MAX_TEXT_BYTES].decode("utf-8", errors="ignore")
|
||||
|
||||
|
||||
# --- EXTRAKTORY -------------------------------------------------------------
|
||||
|
||||
def extract_pdf(path: Path) -> str:
|
||||
from pypdf import PdfReader
|
||||
reader = PdfReader(str(path))
|
||||
if reader.is_encrypted:
|
||||
try:
|
||||
reader.decrypt("")
|
||||
except Exception:
|
||||
return ""
|
||||
parts = []
|
||||
total = 0
|
||||
for page in reader.pages:
|
||||
try:
|
||||
t = page.extract_text() or ""
|
||||
except Exception:
|
||||
continue
|
||||
parts.append(t)
|
||||
total += len(t)
|
||||
if total > MAX_TEXT_BYTES:
|
||||
break
|
||||
return _truncate("\n".join(parts))
|
||||
|
||||
|
||||
# regex pro DOCX fallback - vytahne <w:t>...</w:t>
|
||||
_DOCX_WT_RX = re.compile(r"<w:t[^>]*>([^<]*)</w:t>", re.DOTALL)
|
||||
_DOCX_WP_END_RX = re.compile(r"</w:p>")
|
||||
|
||||
|
||||
def _docx_raw_text(path: Path) -> str:
|
||||
"""Fallback - cte primo word/document.xml ze ZIPu."""
|
||||
with zipfile.ZipFile(str(path)) as z:
|
||||
try:
|
||||
xml = z.read("word/document.xml").decode("utf-8", errors="replace")
|
||||
except KeyError:
|
||||
return ""
|
||||
xml = _DOCX_WP_END_RX.sub("\n", xml)
|
||||
return "\n".join(m.group(1) for m in _DOCX_WT_RX.finditer(xml))
|
||||
|
||||
|
||||
def extract_docx(path: Path) -> str:
|
||||
from docx import Document
|
||||
try:
|
||||
doc = Document(str(path))
|
||||
parts = [p.text for p in doc.paragraphs if p.text]
|
||||
for tbl in doc.tables:
|
||||
for row in tbl.rows:
|
||||
parts.append(" | ".join(c.text for c in row.cells))
|
||||
return _truncate("\n".join(parts))
|
||||
except Exception:
|
||||
# fallback - raw XML extract
|
||||
return _truncate(_docx_raw_text(path))
|
||||
|
||||
|
||||
def extract_xlsx(path: Path) -> str:
|
||||
from openpyxl import load_workbook
|
||||
wb = load_workbook(str(path), read_only=True, data_only=True)
|
||||
parts = []
|
||||
total = 0
|
||||
for ws in wb.worksheets:
|
||||
parts.append(f"# {ws.title}")
|
||||
for row in ws.iter_rows(values_only=True):
|
||||
line = "\t".join("" if v is None else str(v) for v in row)
|
||||
if line.strip():
|
||||
parts.append(line)
|
||||
total += len(line)
|
||||
if total > MAX_TEXT_BYTES:
|
||||
break
|
||||
if total > MAX_TEXT_BYTES:
|
||||
break
|
||||
wb.close()
|
||||
return _truncate("\n".join(parts))
|
||||
|
||||
|
||||
def extract_pptx(path: Path) -> str:
|
||||
from pptx import Presentation
|
||||
prs = Presentation(str(path))
|
||||
parts = []
|
||||
for i, slide in enumerate(prs.slides, 1):
|
||||
parts.append(f"# slide {i}")
|
||||
for shape in slide.shapes:
|
||||
if shape.has_text_frame:
|
||||
for para in shape.text_frame.paragraphs:
|
||||
line = "".join(run.text for run in para.runs)
|
||||
if line.strip():
|
||||
parts.append(line)
|
||||
if slide.has_notes_slide:
|
||||
notes = slide.notes_slide.notes_text_frame.text
|
||||
if notes:
|
||||
parts.append(f"[notes] {notes}")
|
||||
return _truncate("\n".join(parts))
|
||||
|
||||
|
||||
def extract_eml(path: Path) -> str:
|
||||
with path.open("rb") as f:
|
||||
msg = email.message_from_binary_file(f, policy=email.policy.default)
|
||||
head = []
|
||||
for k in ("From", "To", "Cc", "Subject", "Date"):
|
||||
v = msg.get(k)
|
||||
if v:
|
||||
head.append(f"{k}: {v}")
|
||||
parts = ["\n".join(head)]
|
||||
if msg.is_multipart():
|
||||
for part in msg.walk():
|
||||
if part.get_content_type() == "text/plain" and not part.get_filename():
|
||||
try:
|
||||
parts.append(part.get_content())
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
parts.append(msg.get_content())
|
||||
except Exception:
|
||||
pass
|
||||
return _truncate("\n\n".join(parts))
|
||||
|
||||
|
||||
def extract_msg(path: Path) -> str:
|
||||
import extract_msg
|
||||
with extract_msg.openMsg(str(path)) as m:
|
||||
head = []
|
||||
if m.subject: head.append(f"Subject: {m.subject}")
|
||||
if m.sender: head.append(f"From: {m.sender}")
|
||||
if m.to: head.append(f"To: {m.to}")
|
||||
if m.cc: head.append(f"Cc: {m.cc}")
|
||||
if m.date: head.append(f"Date: {m.date}")
|
||||
return _truncate("\n".join(head) + "\n\n" + (m.body or ""))
|
||||
|
||||
|
||||
def extract_text(path: Path) -> str:
|
||||
data = path.read_bytes()[:MAX_TEXT_BYTES]
|
||||
for enc in ("utf-8-sig", "cp1250", "latin-1"):
|
||||
try:
|
||||
return _truncate(data.decode(enc))
|
||||
except UnicodeDecodeError:
|
||||
continue
|
||||
return _truncate(data.decode("utf-8", errors="replace"))
|
||||
|
||||
|
||||
EXTRACTORS = {
|
||||
"pdf": (extract_pdf, MAX_PDF_BYTES),
|
||||
"docx": (extract_docx, MAX_GENERIC_BYTES),
|
||||
"xlsx": (extract_xlsx, MAX_XLSX_BYTES),
|
||||
"xlsm": (extract_xlsx, MAX_XLSX_BYTES),
|
||||
"pptx": (extract_pptx, MAX_GENERIC_BYTES),
|
||||
"eml": (extract_eml, MAX_GENERIC_BYTES),
|
||||
"msg": (extract_msg, MAX_GENERIC_BYTES),
|
||||
"txt": (extract_text, MAX_GENERIC_BYTES),
|
||||
"csv": (extract_text, MAX_GENERIC_BYTES),
|
||||
}
|
||||
|
||||
|
||||
def _short(s, n=40):
|
||||
if not s:
|
||||
return ""
|
||||
s = str(s).replace("\n", " ").replace("\r", " ").strip()
|
||||
return s if len(s) <= n else s[:n] + "..."
|
||||
|
||||
|
||||
def _now() -> datetime:
|
||||
return datetime.now(tz=timezone.utc)
|
||||
|
||||
|
||||
# --- HLAVNI SMYCKA ----------------------------------------------------------
|
||||
|
||||
def process_collection(pg: psycopg.Connection, mongo_coll, study: str) -> dict:
|
||||
with pg.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT path, sha256, extractor_version, ok FROM documents WHERE study = %s",
|
||||
(study,),
|
||||
)
|
||||
existing = {row[0]: (row[1], row[2], row[3]) for row in cur.fetchall()}
|
||||
|
||||
cursor = mongo_coll.find(
|
||||
{"ext": {"$in": list(EXTRACTORS.keys())}, "deleted_at": {"$exists": False}},
|
||||
{"_id": 1, "path": 1, "rel_path": 1, "name": 1, "ext": 1,
|
||||
"sha256": 1, "size_bytes": 1, "mtime": 1},
|
||||
no_cursor_timeout=True,
|
||||
)
|
||||
|
||||
processed = ok = errors = skipped = too_big = 0
|
||||
queue: list[dict] = []
|
||||
total_pending = mongo_coll.count_documents(
|
||||
{"ext": {"$in": list(EXTRACTORS.keys())}, "deleted_at": {"$exists": False}}
|
||||
)
|
||||
print(f"[{study}] kandidatu v Mongo: {total_pending}")
|
||||
|
||||
n = 0
|
||||
try:
|
||||
for doc in cursor:
|
||||
n += 1
|
||||
prev = existing.get(doc["path"])
|
||||
if prev and prev[0] == doc.get("sha256") and prev[1] == EXTRACTOR_VERSION and prev[2]:
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
ext = doc["ext"]
|
||||
extractor, max_bytes = EXTRACTORS[ext]
|
||||
path = Path(doc["path"])
|
||||
|
||||
row = {
|
||||
"mongo_id": str(doc["_id"]),
|
||||
"study": study,
|
||||
"path": doc["path"],
|
||||
"rel_path": doc.get("rel_path"),
|
||||
"name": doc.get("name"),
|
||||
"ext": ext,
|
||||
"sha256": doc.get("sha256"),
|
||||
"size_bytes": doc.get("size_bytes"),
|
||||
"mtime": doc.get("mtime"),
|
||||
"body": None,
|
||||
"body_length": 0,
|
||||
"extracted_at": _now(),
|
||||
"extractor_version": EXTRACTOR_VERSION,
|
||||
"ok": False,
|
||||
"error": None,
|
||||
}
|
||||
|
||||
status = "OK "
|
||||
detail = ""
|
||||
size_mb = (doc.get("size_bytes") or 0) / 1024 / 1024
|
||||
|
||||
if not path.exists():
|
||||
row["error"] = "file_missing"
|
||||
status = "ERR"; detail = "file_missing"; errors += 1
|
||||
elif (doc.get("size_bytes") or 0) > max_bytes:
|
||||
row["error"] = f"too_big_>{max_bytes}"
|
||||
status = "BIG"; detail = f"too_big_>{max_bytes//1024//1024}MB"; too_big += 1
|
||||
else:
|
||||
try:
|
||||
body = extractor(path) or ""
|
||||
row["body"] = body if body else None
|
||||
row["body_length"] = len(body)
|
||||
row["ok"] = True
|
||||
ok += 1
|
||||
detail = f"{len(body)} znaku {_short(body, 60)!r}"
|
||||
except Exception as e:
|
||||
row["error"] = f"{type(e).__name__}: {e}"[:500]
|
||||
status = "ERR"; detail = row["error"][:80]; errors += 1
|
||||
|
||||
queue.append(row)
|
||||
processed += 1
|
||||
print(f" [{n:>4}/{total_pending}] {status} {ext:<4} {size_mb:6.1f}MB "
|
||||
f"{path.name} | {detail}", flush=True)
|
||||
|
||||
if len(queue) >= 50:
|
||||
_flush(pg, queue); queue.clear()
|
||||
finally:
|
||||
cursor.close()
|
||||
|
||||
if queue:
|
||||
_flush(pg, queue)
|
||||
|
||||
return {"study": study, "processed": processed, "ok": ok,
|
||||
"errors": errors, "skipped": skipped, "too_big": too_big}
|
||||
|
||||
|
||||
UPSERT_SQL = """
|
||||
INSERT INTO documents
|
||||
(mongo_id, study, path, rel_path, name, ext, sha256, size_bytes, mtime,
|
||||
body, body_length, extracted_at, extractor_version, ok, error)
|
||||
VALUES
|
||||
(%(mongo_id)s, %(study)s, %(path)s, %(rel_path)s, %(name)s, %(ext)s, %(sha256)s,
|
||||
%(size_bytes)s, %(mtime)s, %(body)s, %(body_length)s, %(extracted_at)s,
|
||||
%(extractor_version)s, %(ok)s, %(error)s)
|
||||
ON CONFLICT (study, path) DO UPDATE SET
|
||||
mongo_id = EXCLUDED.mongo_id,
|
||||
rel_path = EXCLUDED.rel_path,
|
||||
name = EXCLUDED.name,
|
||||
ext = EXCLUDED.ext,
|
||||
sha256 = EXCLUDED.sha256,
|
||||
size_bytes = EXCLUDED.size_bytes,
|
||||
mtime = EXCLUDED.mtime,
|
||||
body = EXCLUDED.body,
|
||||
body_length = EXCLUDED.body_length,
|
||||
extracted_at = EXCLUDED.extracted_at,
|
||||
extractor_version = EXCLUDED.extractor_version,
|
||||
ok = EXCLUDED.ok,
|
||||
error = EXCLUDED.error
|
||||
"""
|
||||
|
||||
|
||||
def _flush(pg: psycopg.Connection, rows: list[dict]) -> None:
|
||||
# posledni pojistka - jeste jednou strip NUL (kdyby se necim prokrouzil)
|
||||
for r in rows:
|
||||
if r.get("body"):
|
||||
r["body"] = _clean_for_pg(r["body"])
|
||||
if r.get("error"):
|
||||
r["error"] = _clean_for_pg(r["error"])
|
||||
with pg.cursor() as cur:
|
||||
cur.executemany(UPSERT_SQL, rows)
|
||||
pg.commit()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
t0 = time.time()
|
||||
print("Pripojuji se k PostgreSQL...")
|
||||
pg = psycopg.connect(PG_DSN, connect_timeout=10)
|
||||
with pg.cursor() as cur:
|
||||
cur.execute(SCHEMA_SQL)
|
||||
pg.commit()
|
||||
print("Schema OK.")
|
||||
|
||||
print("Pripojuji se k MongoDB...")
|
||||
mongo = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
|
||||
mongo.admin.command("ping")
|
||||
db = mongo[MONGO_DB]
|
||||
print("Mongo OK.")
|
||||
|
||||
results = []
|
||||
for name in MONGO_COLLECTIONS:
|
||||
results.append(process_collection(pg, db[name], name))
|
||||
|
||||
pg.close()
|
||||
|
||||
print("\n=== SHRNUTI ===")
|
||||
for r in results:
|
||||
print(f" {r['study']}: processed={r['processed']} ok={r['ok']} "
|
||||
f"errors={r['errors']} skipped={r['skipped']} too_big={r['too_big']}")
|
||||
print(f"\nCelkem trvalo: {time.time() - t0:.1f} s")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
raise SystemExit(main())
|
||||
except KeyboardInterrupt:
|
||||
print("\nPreruseno uzivatelem")
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
sys.exit(1)
|
||||
Reference in New Issue
Block a user