# ============================================================================ # central_logging_v1.0.py # Verze: 1.0 # Datum: 2026-06-08 # Autor: Vladimír Buzalka # Popis: Drop-in knihovna pro centrální logování do Grafana Loki přes # FastAPI bránu (log_gateway). Přidává se VEDLE stávajícího # souborového logování — jediným voláním setup_logging(). # # Návrh (proč takhle): # - JEN standardní knihovna (urllib) — nevyžaduje pip install ve všech # skriptech projektu. # - Neblokující: emit() jen vloží záznam do fronty, odesílá vlákno na # pozadí v dávkách (batch). Skript se logováním nezdrží. # - Odolné proti výpadku: když je gateway nedostupná, dávka spadne do # lokálního spool souboru (.ndjson) a pošle se při příštím úspěchu. # => žádné logy se neztratí, i kdyby server byl chvíli dole. # - keep_file=True ponechá původní souborové logování. Po měsíci, až # bude centrál ověřený, stačí zavolat s keep_file=False (nebo nastavit # ENV CENTRAL_LOG_KEEP_FILE=0) a soubory se přestanou psát. # # Použití (minimum): # from central_logging_v1.0 import setup_logging # log = setup_logging("parse_emails_graph") # log.info("start") # log.error("něco selhalo: %s", err) # # Konfigurace přes ENV (s rozumnými defaulty): # CENTRAL_LOG_GATEWAY http://192.168.1.76:8770 # CENTRAL_LOG_TOKEN sdílené tajemství (musí sedět s gateway) # CENTRAL_LOG_ENV prod | test | dev (default prod) # CENTRAL_LOG_KEEP_FILE 1 | 0 (default 1 = piš i soubory) # CENTRAL_LOG_LEVEL INFO | ERROR | ... (default INFO) # CENTRAL_LOG_SPOOL_DIR adresář pro spool (default vedle skriptu) # ============================================================================ from __future__ import annotations import os import sys import json import time import atexit import socket import logging import threading import urllib.request import urllib.error from collections import deque from logging.handlers import RotatingFileHandler from pathlib import Path from typing import Any, Deque, Dict, List, Optional # --------------------------------------------------------------------------- # Výchozí konfigurace # --------------------------------------------------------------------------- DEFAULT_GATEWAY = os.environ.get("CENTRAL_LOG_GATEWAY", "http://192.168.1.76:8770") DEFAULT_TOKEN = os.environ.get("CENTRAL_LOG_TOKEN", "change-this-shared-secret") DEFAULT_ENV = os.environ.get("CENTRAL_LOG_ENV", "prod") DEFAULT_LEVEL = os.environ.get("CENTRAL_LOG_LEVEL", "INFO").upper() FLUSH_INTERVAL = 2.0 # s — jak často odeslat nasbíranou dávku BATCH_MAX = 200 # max záznamů v jedné dávce QUEUE_MAX = 50_000 # ochrana proti přetečení paměti HTTP_TIMEOUT = 5.0 # s — timeout odeslání do gateway SPOOL_REPLAY_MAX = 1000 # max záznamů přehraných ze spoolu na jeden cyklus class _GatewaySender: """Vlákno na pozadí: sbírá záznamy z fronty a posílá je do gateway v dávkách. Při neúspěchu zapisuje do spool souboru a později přehraje.""" def __init__(self, app_name: str, gateway: str, token: str, env: str, spool_dir: Path): self.app_name = app_name self.host = socket.gethostname() self.gateway = gateway.rstrip("/") self.token = token self.env = env self.spool_file = spool_dir / f"central_logging_spool_{app_name}.ndjson" spool_dir.mkdir(parents=True, exist_ok=True) self._queue: Deque[Dict[str, Any]] = deque(maxlen=QUEUE_MAX) self._lock = threading.Lock() self._stop = threading.Event() self._thread = threading.Thread(target=self._run, name=f"central-log-{app_name}", daemon=True) self._thread.start() # -- veřejné -------------------------------------------------------- def submit(self, record: Dict[str, Any]) -> None: with self._lock: self._queue.append(record) def flush_and_stop(self, timeout: float = 5.0) -> None: self._stop.set() self._thread.join(timeout=timeout) # poslední pokus o odeslání toho, co zbylo self._drain_once(final=True) # -- vnitřní -------------------------------------------------------- def _run(self) -> None: while not self._stop.is_set(): time.sleep(FLUSH_INTERVAL) try: self._replay_spool() self._drain_once() except Exception: # noqa: BLE001 — logování se nikdy nesmí zhroutit pass def _pop_batch(self) -> List[Dict[str, Any]]: batch: List[Dict[str, Any]] = [] with self._lock: while self._queue and len(batch) < BATCH_MAX: batch.append(self._queue.popleft()) return batch def _drain_once(self, final: bool = False) -> None: while True: batch = self._pop_batch() if not batch: return ok = self._send(batch) if not ok: self._spool(batch) if final and not self._queue: return def _send(self, records: List[Dict[str, Any]]) -> bool: payload = json.dumps({ "app": self.app_name, "host": self.host, "env": self.env, "records": records, }, ensure_ascii=False).encode("utf-8") req = urllib.request.Request( f"{self.gateway}/log/batch", data=payload, method="POST", headers={ "Content-Type": "application/json", "Authorization": f"Bearer {self.token}", }, ) try: with urllib.request.urlopen(req, timeout=HTTP_TIMEOUT) as resp: return 200 <= resp.status < 300 except Exception: # noqa: BLE001 — síť/timeout/HTTP error -> spool return False # -- spool (fallback při výpadku) ----------------------------------- def _spool(self, records: List[Dict[str, Any]]) -> None: try: with open(self.spool_file, "a", encoding="utf-8") as f: for r in records: f.write(json.dumps(r, ensure_ascii=False) + "\n") except Exception: # noqa: BLE001 pass def _replay_spool(self) -> None: if not self.spool_file.exists() or self.spool_file.stat().st_size == 0: return # načti dávku ze spoolu try: with open(self.spool_file, "r", encoding="utf-8") as f: lines = f.readlines() except Exception: # noqa: BLE001 return if not lines: return chunk = lines[:SPOOL_REPLAY_MAX] records = [] for ln in chunk: ln = ln.strip() if ln: try: records.append(json.loads(ln)) except Exception: # noqa: BLE001 pass if records and self._send(records): # úspěch -> odeber přehrané řádky ze spoolu remaining = lines[SPOOL_REPLAY_MAX:] try: if remaining: with open(self.spool_file, "w", encoding="utf-8") as f: f.writelines(remaining) else: self.spool_file.unlink(missing_ok=True) except Exception: # noqa: BLE001 pass class CentralLogHandler(logging.Handler): """logging.Handler, který předává záznamy senderu na pozadí.""" def __init__(self, sender: _GatewaySender): super().__init__() self._sender = sender def emit(self, record: logging.LogRecord) -> None: try: # msg = jen samotná zpráva; čas/úroveň/logger jdou do labelů a polí rec: Dict[str, Any] = { "ts": record.created, "level": record.levelname, "msg": record.getMessage(), "logger": record.name, "func": record.funcName, "line": record.lineno, } # POZOR: formatException je metoda Formatteru, ne Handleru — # proto použij vlastní Formatter, jinak by AttributeError shodil # celý záznam (a tracebacky by se ztrácely). if record.exc_info: rec["exc"] = logging.Formatter().formatException(record.exc_info) self._sender.submit(rec) except Exception: # noqa: BLE001 — handler nikdy nesmí shodit aplikaci pass def setup_logging( app_name: str, *, log_file: Optional[str] = None, keep_file: Optional[bool] = None, level: Optional[str] = None, gateway: Optional[str] = None, token: Optional[str] = None, env: Optional[str] = None, quiet_loggers: Optional[List[str]] = None, fmt: str = "%(asctime)s | %(levelname)s | %(name)s | %(message)s", datefmt: str = "%Y-%m-%d %H:%M:%S", spool_dir: Optional[str] = None, ) -> logging.Logger: """Nastaví root logger se dvěma cíli: 1) souborový handler (RotatingFileHandler) — stávající chování, 2) centrální handler do Loki přes gateway (na pozadí). Args: app_name: label aplikace v Loki (např. "parse_emails_graph"). log_file: cesta k log souboru. Default .log vedle skriptu. keep_file: piš i do souboru? Default z ENV CENTRAL_LOG_KEEP_FILE (1). Po měsíci ověřování nastav False -> jen centrál. level: min. úroveň, default ENV CENTRAL_LOG_LEVEL nebo INFO. gateway/token/env: override ENV defaultů. Returns: nakonfigurovaný root logger (lze i logging.getLogger()). """ # Konfiguraci čteme z os.environ AŽ TADY (call-time), ne při importu modulu. # Důvod: skripty často načítají vlastní .env (do os.environ) až po importu # této knihovny — kdybychom četli při importu, token/gateway bychom minuli. gw = gateway or os.environ.get("CENTRAL_LOG_GATEWAY", "http://192.168.1.76:8770") tok = token or os.environ.get("CENTRAL_LOG_TOKEN", "change-this-shared-secret") ev = env or os.environ.get("CENTRAL_LOG_ENV", "prod") lvl_name = (level or os.environ.get("CENTRAL_LOG_LEVEL", "INFO")).upper() lvl = getattr(logging, lvl_name, logging.INFO) if keep_file is None: keep_file = os.environ.get("CENTRAL_LOG_KEEP_FILE", "1") not in ("0", "false", "False") root = logging.getLogger() root.setLevel(lvl) # Ztiš upovídané knihovny (jinak root@INFO chytá jejich šum do centrálu). # Předej quiet_loggers=[] pro vypnutí, nebo vlastní seznam. _default_quiet = ["httpx", "httpcore", "urllib3", "anthropic", "openai", "PIL", "asyncio", "fdb", "fontTools", "pdfminer"] for _name in (_default_quiet if quiet_loggers is None else quiet_loggers): logging.getLogger(_name).setLevel(logging.WARNING) # odstraň případné staré handlery (idempotentní setup) for h in list(root.handlers): root.removeHandler(h) formatter = logging.Formatter(fmt=fmt, datefmt=datefmt) # 1) Souborový handler (stávající způsob) ------------------------------- if keep_file: if log_file is None: base = Path(sys.argv[0]).resolve().parent if sys.argv and sys.argv[0] else Path.cwd() log_file = str(base / f"{app_name}.log") fh = RotatingFileHandler(log_file, maxBytes=10 * 1024 * 1024, backupCount=5, encoding="utf-8") fh.setLevel(lvl) fh.setFormatter(formatter) root.addHandler(fh) # 2) Centrální handler do Loki (na pozadí) ------------------------------ spool_base = Path(spool_dir) if spool_dir else ( Path(sys.argv[0]).resolve().parent if sys.argv and sys.argv[0] else Path.cwd() ) sender = _GatewaySender( app_name=app_name, gateway=gw, token=tok, env=ev, spool_dir=spool_base / "_log_spool", ) ch = CentralLogHandler(sender) ch.setLevel(lvl) ch.setFormatter(formatter) root.addHandler(ch) # při ukončení skriptu dolij frontu atexit.register(sender.flush_and_stop) root.info("central_logging v1.0 inicializováno (app=%s, keep_file=%s, gateway=%s)", app_name, keep_file, gw) return root if __name__ == "__main__": # rychlý self-test log = setup_logging("central_logging_selftest", level="DEBUG") log.info("ahoj z self-testu") log.warning("varování %d", 42) try: 1 / 0 except ZeroDivisionError: log.exception("zachycená výjimka") print("Self-test odeslán. Zkontroluj Grafanu / spool soubor.")