Files

163 lines
5.8 KiB
Python

# ============================================================================
# log_gateway_v1.0.py
# Verze: 1.0
# Datum: 2026-06-08
# Autor: Vladimír Buzalka
# Popis: FastAPI brána pro centrální logování. Přijímá jednoduchý JSON
# od klientských skriptů (knihovna central_logging) a přeposílá
# do Grafana Loki přes jeho HTTP push API.
#
# Proč brána a ne push přímo do Loki:
# - klient nezná interní detaily Loki (ns timestampy, streamy)
# - jeden sdílený token (Bearer) místo přístupu do DB/Loki
# - lze kdykoli vyměnit backend bez zásahu do skriptů
# - centrální místo pro normalizaci labelů a rate-limit
#
# Endpoints:
# GET /health — liveness + dostupnost Loki
# POST /log — jeden log záznam
# POST /log/batch — dávka záznamů (preferováno klientem)
#
# Autorizace: hlavička Authorization: Bearer <LOG_TOKEN>
#
# ENV:
# LOKI_URL výchozí http://loki:3100
# LOG_TOKEN sdílené tajemství (musí sedět s klientem)
# GATEWAY_ENV label env, výchozí "prod"
# ============================================================================
from __future__ import annotations
import os
import time
import json
import logging
from typing import Any, Dict, List, Optional
import httpx
from fastapi import FastAPI, Header, HTTPException
from pydantic import BaseModel, Field
LOKI_URL = os.environ.get("LOKI_URL", "http://loki:3100").rstrip("/")
LOG_TOKEN = os.environ.get("LOG_TOKEN", "change-this-shared-secret")
DEFAULT_ENV = os.environ.get("GATEWAY_ENV", "prod")
# Loki: labely držíme s nízkou kardinalitou (jinak exploze sérií).
# Vše ostatní (logger, func, line, exc) jde do těla log řádku jako JSON.
ALLOWED_LABELS = ("app", "host", "level", "env")
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
log = logging.getLogger("log_gateway")
app = FastAPI(title="Central Log Gateway", version="1.0")
class LogRecord(BaseModel):
ts: Optional[float] = Field(None, description="Unix čas v sekundách (float). Když chybí, doplní brána.")
level: str = "INFO"
msg: str = ""
logger: Optional[str] = None
func: Optional[str] = None
line: Optional[int] = None
exc: Optional[str] = None
extra: Optional[Dict[str, Any]] = None
class BatchPayload(BaseModel):
app: str
host: str = "unknown"
env: Optional[str] = None
records: List[LogRecord]
def _check_token(authorization: Optional[str]) -> None:
expected = f"Bearer {LOG_TOKEN}"
if not authorization or authorization != expected:
raise HTTPException(status_code=401, detail="Neplatný nebo chybějící token.")
def _to_loki_streams(payload: BatchPayload) -> Dict[str, Any]:
"""Seskupí záznamy podle (app, host, level, env) do Loki streamů.
Hodnota každé položky = [unix_nano_str, json_řádek]."""
env = payload.env or DEFAULT_ENV
streams: Dict[tuple, List[List[str]]] = {}
for r in payload.records:
ts = r.ts if r.ts is not None else time.time()
ts_nano = str(int(ts * 1_000_000_000))
line_obj: Dict[str, Any] = {"msg": r.msg}
if r.logger:
line_obj["logger"] = r.logger
if r.func:
line_obj["func"] = r.func
if r.line is not None:
line_obj["line"] = r.line
if r.exc:
line_obj["exc"] = r.exc
if r.extra:
line_obj["extra"] = r.extra
key = (payload.app, payload.host, (r.level or "INFO").upper(), env)
streams.setdefault(key, []).append([ts_nano, json.dumps(line_obj, ensure_ascii=False)])
out_streams = []
for (app_name, host, level, env_v), values in streams.items():
# Loki vyžaduje values seřazené vzestupně dle času v rámci streamu
values.sort(key=lambda v: v[0])
out_streams.append({
"stream": {"app": app_name, "host": host, "level": level, "env": env_v},
"values": values,
})
return {"streams": out_streams}
async def _push_to_loki(body: Dict[str, Any]) -> None:
url = f"{LOKI_URL}/loki/api/v1/push"
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.post(url, json=body, headers={"Content-Type": "application/json"})
if resp.status_code >= 300:
log.error("Loki push selhal %s: %s", resp.status_code, resp.text[:500])
raise HTTPException(status_code=502, detail=f"Loki push selhal: {resp.status_code}")
@app.get("/health")
async def health() -> Dict[str, Any]:
loki_ok = False
try:
async with httpx.AsyncClient(timeout=3.0) as client:
r = await client.get(f"{LOKI_URL}/ready")
loki_ok = r.status_code == 200
except Exception as e: # noqa: BLE001
log.warning("Loki nedostupný: %s", e)
return {"status": "ok", "loki": "ready" if loki_ok else "unavailable", "version": "1.0"}
@app.post("/log/batch")
async def log_batch(payload: BatchPayload, authorization: Optional[str] = Header(None)) -> Dict[str, Any]:
_check_token(authorization)
if not payload.records:
return {"accepted": 0}
body = _to_loki_streams(payload)
await _push_to_loki(body)
return {"accepted": len(payload.records), "streams": len(body["streams"])}
@app.post("/log")
async def log_one(
record: LogRecord,
app_name: str,
host: str = "unknown",
env: Optional[str] = None,
authorization: Optional[str] = Header(None),
) -> Dict[str, Any]:
_check_token(authorization)
payload = BatchPayload(app=app_name, host=host, env=env, records=[record])
body = _to_loki_streams(payload)
await _push_to_loki(body)
return {"accepted": 1}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8770)