307 lines
12 KiB
Python
307 lines
12 KiB
Python
# ============================================================================
|
|
# central_logging_v1.0.py
|
|
# Verze: 1.0
|
|
# Datum: 2026-06-08
|
|
# Autor: Vladimír Buzalka
|
|
# Popis: Drop-in knihovna pro centrální logování do Grafana Loki přes
|
|
# FastAPI bránu (log_gateway). Přidává se VEDLE stávajícího
|
|
# souborového logování — jediným voláním setup_logging().
|
|
#
|
|
# Návrh (proč takhle):
|
|
# - JEN standardní knihovna (urllib) — nevyžaduje pip install ve všech
|
|
# skriptech projektu.
|
|
# - Neblokující: emit() jen vloží záznam do fronty, odesílá vlákno na
|
|
# pozadí v dávkách (batch). Skript se logováním nezdrží.
|
|
# - Odolné proti výpadku: když je gateway nedostupná, dávka spadne do
|
|
# lokálního spool souboru (.ndjson) a pošle se při příštím úspěchu.
|
|
# => žádné logy se neztratí, i kdyby server byl chvíli dole.
|
|
# - keep_file=True ponechá původní souborové logování. Po měsíci, až
|
|
# bude centrál ověřený, stačí zavolat s keep_file=False (nebo nastavit
|
|
# ENV CENTRAL_LOG_KEEP_FILE=0) a soubory se přestanou psát.
|
|
#
|
|
# Použití (minimum):
|
|
# from central_logging_v1.0 import setup_logging
|
|
# log = setup_logging("parse_emails_graph")
|
|
# log.info("start")
|
|
# log.error("něco selhalo: %s", err)
|
|
#
|
|
# Konfigurace přes ENV (s rozumnými defaulty):
|
|
# CENTRAL_LOG_GATEWAY http://192.168.1.76:8770
|
|
# CENTRAL_LOG_TOKEN sdílené tajemství (musí sedět s gateway)
|
|
# CENTRAL_LOG_ENV prod | test | dev (default prod)
|
|
# CENTRAL_LOG_KEEP_FILE 1 | 0 (default 1 = piš i soubory)
|
|
# CENTRAL_LOG_LEVEL INFO | ERROR | ... (default INFO)
|
|
# CENTRAL_LOG_SPOOL_DIR adresář pro spool (default vedle skriptu)
|
|
# ============================================================================
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import atexit
|
|
import socket
|
|
import logging
|
|
import threading
|
|
import urllib.request
|
|
import urllib.error
|
|
from collections import deque
|
|
from logging.handlers import RotatingFileHandler
|
|
from pathlib import Path
|
|
from typing import Any, Deque, Dict, List, Optional
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Výchozí konfigurace
|
|
# ---------------------------------------------------------------------------
|
|
DEFAULT_GATEWAY = os.environ.get("CENTRAL_LOG_GATEWAY", "http://192.168.1.76:8770")
|
|
DEFAULT_TOKEN = os.environ.get("CENTRAL_LOG_TOKEN", "change-this-shared-secret")
|
|
DEFAULT_ENV = os.environ.get("CENTRAL_LOG_ENV", "prod")
|
|
DEFAULT_LEVEL = os.environ.get("CENTRAL_LOG_LEVEL", "INFO").upper()
|
|
|
|
FLUSH_INTERVAL = 2.0 # s — jak často odeslat nasbíranou dávku
|
|
BATCH_MAX = 200 # max záznamů v jedné dávce
|
|
QUEUE_MAX = 50_000 # ochrana proti přetečení paměti
|
|
HTTP_TIMEOUT = 5.0 # s — timeout odeslání do gateway
|
|
SPOOL_REPLAY_MAX = 1000 # max záznamů přehraných ze spoolu na jeden cyklus
|
|
|
|
|
|
class _GatewaySender:
|
|
"""Vlákno na pozadí: sbírá záznamy z fronty a posílá je do gateway
|
|
v dávkách. Při neúspěchu zapisuje do spool souboru a později přehraje."""
|
|
|
|
def __init__(self, app_name: str, gateway: str, token: str, env: str, spool_dir: Path):
|
|
self.app_name = app_name
|
|
self.host = socket.gethostname()
|
|
self.gateway = gateway.rstrip("/")
|
|
self.token = token
|
|
self.env = env
|
|
self.spool_file = spool_dir / f"central_logging_spool_{app_name}.ndjson"
|
|
spool_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
self._queue: Deque[Dict[str, Any]] = deque(maxlen=QUEUE_MAX)
|
|
self._lock = threading.Lock()
|
|
self._stop = threading.Event()
|
|
self._thread = threading.Thread(target=self._run, name=f"central-log-{app_name}", daemon=True)
|
|
self._thread.start()
|
|
|
|
# -- veřejné --------------------------------------------------------
|
|
def submit(self, record: Dict[str, Any]) -> None:
|
|
with self._lock:
|
|
self._queue.append(record)
|
|
|
|
def flush_and_stop(self, timeout: float = 5.0) -> None:
|
|
self._stop.set()
|
|
self._thread.join(timeout=timeout)
|
|
# poslední pokus o odeslání toho, co zbylo
|
|
self._drain_once(final=True)
|
|
|
|
# -- vnitřní --------------------------------------------------------
|
|
def _run(self) -> None:
|
|
while not self._stop.is_set():
|
|
time.sleep(FLUSH_INTERVAL)
|
|
try:
|
|
self._replay_spool()
|
|
self._drain_once()
|
|
except Exception: # noqa: BLE001 — logování se nikdy nesmí zhroutit
|
|
pass
|
|
|
|
def _pop_batch(self) -> List[Dict[str, Any]]:
|
|
batch: List[Dict[str, Any]] = []
|
|
with self._lock:
|
|
while self._queue and len(batch) < BATCH_MAX:
|
|
batch.append(self._queue.popleft())
|
|
return batch
|
|
|
|
def _drain_once(self, final: bool = False) -> None:
|
|
while True:
|
|
batch = self._pop_batch()
|
|
if not batch:
|
|
return
|
|
ok = self._send(batch)
|
|
if not ok:
|
|
self._spool(batch)
|
|
if final and not self._queue:
|
|
return
|
|
|
|
def _send(self, records: List[Dict[str, Any]]) -> bool:
|
|
payload = json.dumps({
|
|
"app": self.app_name,
|
|
"host": self.host,
|
|
"env": self.env,
|
|
"records": records,
|
|
}, ensure_ascii=False).encode("utf-8")
|
|
|
|
req = urllib.request.Request(
|
|
f"{self.gateway}/log/batch",
|
|
data=payload,
|
|
method="POST",
|
|
headers={
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Bearer {self.token}",
|
|
},
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=HTTP_TIMEOUT) as resp:
|
|
return 200 <= resp.status < 300
|
|
except Exception: # noqa: BLE001 — síť/timeout/HTTP error -> spool
|
|
return False
|
|
|
|
# -- spool (fallback při výpadku) -----------------------------------
|
|
def _spool(self, records: List[Dict[str, Any]]) -> None:
|
|
try:
|
|
with open(self.spool_file, "a", encoding="utf-8") as f:
|
|
for r in records:
|
|
f.write(json.dumps(r, ensure_ascii=False) + "\n")
|
|
except Exception: # noqa: BLE001
|
|
pass
|
|
|
|
def _replay_spool(self) -> None:
|
|
if not self.spool_file.exists() or self.spool_file.stat().st_size == 0:
|
|
return
|
|
# načti dávku ze spoolu
|
|
try:
|
|
with open(self.spool_file, "r", encoding="utf-8") as f:
|
|
lines = f.readlines()
|
|
except Exception: # noqa: BLE001
|
|
return
|
|
if not lines:
|
|
return
|
|
|
|
chunk = lines[:SPOOL_REPLAY_MAX]
|
|
records = []
|
|
for ln in chunk:
|
|
ln = ln.strip()
|
|
if ln:
|
|
try:
|
|
records.append(json.loads(ln))
|
|
except Exception: # noqa: BLE001
|
|
pass
|
|
if records and self._send(records):
|
|
# úspěch -> odeber přehrané řádky ze spoolu
|
|
remaining = lines[SPOOL_REPLAY_MAX:]
|
|
try:
|
|
if remaining:
|
|
with open(self.spool_file, "w", encoding="utf-8") as f:
|
|
f.writelines(remaining)
|
|
else:
|
|
self.spool_file.unlink(missing_ok=True)
|
|
except Exception: # noqa: BLE001
|
|
pass
|
|
|
|
|
|
class CentralLogHandler(logging.Handler):
|
|
"""logging.Handler, který předává záznamy senderu na pozadí."""
|
|
|
|
def __init__(self, sender: _GatewaySender):
|
|
super().__init__()
|
|
self._sender = sender
|
|
|
|
def emit(self, record: logging.LogRecord) -> None:
|
|
try:
|
|
# msg = jen samotná zpráva; čas/úroveň/logger jdou do labelů a polí
|
|
rec: Dict[str, Any] = {
|
|
"ts": record.created,
|
|
"level": record.levelname,
|
|
"msg": record.getMessage(),
|
|
"logger": record.name,
|
|
"func": record.funcName,
|
|
"line": record.lineno,
|
|
}
|
|
# POZOR: formatException je metoda Formatteru, ne Handleru —
|
|
# proto použij vlastní Formatter, jinak by AttributeError shodil
|
|
# celý záznam (a tracebacky by se ztrácely).
|
|
if record.exc_info:
|
|
rec["exc"] = logging.Formatter().formatException(record.exc_info)
|
|
self._sender.submit(rec)
|
|
except Exception: # noqa: BLE001 — handler nikdy nesmí shodit aplikaci
|
|
pass
|
|
|
|
|
|
def setup_logging(
|
|
app_name: str,
|
|
*,
|
|
log_file: Optional[str] = None,
|
|
keep_file: Optional[bool] = None,
|
|
level: Optional[str] = None,
|
|
gateway: Optional[str] = None,
|
|
token: Optional[str] = None,
|
|
env: Optional[str] = None,
|
|
fmt: str = "%(asctime)s | %(levelname)s | %(name)s | %(message)s",
|
|
datefmt: str = "%Y-%m-%d %H:%M:%S",
|
|
spool_dir: Optional[str] = None,
|
|
) -> logging.Logger:
|
|
"""Nastaví root logger se dvěma cíli:
|
|
1) souborový handler (RotatingFileHandler) — stávající chování,
|
|
2) centrální handler do Loki přes gateway (na pozadí).
|
|
|
|
Args:
|
|
app_name: label aplikace v Loki (např. "parse_emails_graph").
|
|
log_file: cesta k log souboru. Default <app_name>.log vedle skriptu.
|
|
keep_file: piš i do souboru? Default z ENV CENTRAL_LOG_KEEP_FILE (1).
|
|
Po měsíci ověřování nastav False -> jen centrál.
|
|
level: min. úroveň, default ENV CENTRAL_LOG_LEVEL nebo INFO.
|
|
gateway/token/env: override ENV defaultů.
|
|
|
|
Returns:
|
|
nakonfigurovaný root logger (lze i logging.getLogger()).
|
|
"""
|
|
lvl_name = (level or DEFAULT_LEVEL).upper()
|
|
lvl = getattr(logging, lvl_name, logging.INFO)
|
|
|
|
if keep_file is None:
|
|
keep_file = os.environ.get("CENTRAL_LOG_KEEP_FILE", "1") not in ("0", "false", "False")
|
|
|
|
root = logging.getLogger()
|
|
root.setLevel(lvl)
|
|
|
|
# odstraň případné staré handlery (idempotentní setup)
|
|
for h in list(root.handlers):
|
|
root.removeHandler(h)
|
|
|
|
formatter = logging.Formatter(fmt=fmt, datefmt=datefmt)
|
|
|
|
# 1) Souborový handler (stávající způsob) -------------------------------
|
|
if keep_file:
|
|
if log_file is None:
|
|
base = Path(sys.argv[0]).resolve().parent if sys.argv and sys.argv[0] else Path.cwd()
|
|
log_file = str(base / f"{app_name}.log")
|
|
fh = RotatingFileHandler(log_file, maxBytes=10 * 1024 * 1024, backupCount=5, encoding="utf-8")
|
|
fh.setLevel(lvl)
|
|
fh.setFormatter(formatter)
|
|
root.addHandler(fh)
|
|
|
|
# 2) Centrální handler do Loki (na pozadí) ------------------------------
|
|
spool_base = Path(spool_dir) if spool_dir else (
|
|
Path(sys.argv[0]).resolve().parent if sys.argv and sys.argv[0] else Path.cwd()
|
|
)
|
|
sender = _GatewaySender(
|
|
app_name=app_name,
|
|
gateway=gateway or DEFAULT_GATEWAY,
|
|
token=token or DEFAULT_TOKEN,
|
|
env=env or DEFAULT_ENV,
|
|
spool_dir=spool_base / "_log_spool",
|
|
)
|
|
ch = CentralLogHandler(sender)
|
|
ch.setLevel(lvl)
|
|
ch.setFormatter(formatter)
|
|
root.addHandler(ch)
|
|
|
|
# při ukončení skriptu dolij frontu
|
|
atexit.register(sender.flush_and_stop)
|
|
|
|
root.info("central_logging v1.0 inicializováno (app=%s, keep_file=%s, gateway=%s)",
|
|
app_name, keep_file, gateway or DEFAULT_GATEWAY)
|
|
return root
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# rychlý self-test
|
|
log = setup_logging("central_logging_selftest", level="DEBUG")
|
|
log.info("ahoj z self-testu")
|
|
log.warning("varování %d", 42)
|
|
try:
|
|
1 / 0
|
|
except ZeroDivisionError:
|
|
log.exception("zachycená výjimka")
|
|
print("Self-test odeslán. Zkontroluj Grafanu / spool soubor.")
|