This commit is contained in:
2025-12-29 22:51:13 +01:00
parent 64472e59ba
commit 1124a1fbe1
4 changed files with 311 additions and 110 deletions

View File

@@ -1,71 +1,157 @@
import nntplib
import os
import time
import socket
from dotenv import load_dotenv
from datetime import datetime, UTC
from db import get_conn
from psycopg.types.json import Json
# ================== CONFIG ==================
# --- ZVÝŠENÍ LIMITŮ PRO STABILITU ---
nntplib._MAXLINE = 1048576 # 1MB limit pro dlouhé hlavičky
socket.setdefaulttimeout(30) # Prevence nekonečného čekání na síť
# ================= CONFIG =================
GROUP = "alt.binaries.e-book.magazines"
SUBJECT_KEY = "PC Pro 2011-07.pdf"
# ============================================
BATCH_SIZE = 5000 # Menší batch pro častější feedback v konzoli
# =========================================
load_dotenv()
EWEKA_USER = os.getenv("EWEKA_USER")
EWEKA_PASS = os.getenv("EWEKA_PASS")
print("🔌 Connecting to PostgreSQL...")
conn = get_conn()
cur = conn.cursor()
cur.execute("""
SELECT article_number
FROM articles
WHERE newsgroup = %s
AND metadata->>'subject' LIKE %s
ORDER BY article_number
""", (GROUP, f"%{SUBJECT_KEY}%"))
def get_current_frontier(cur):
print(f"🔍 Hledám v databázi hranici spodního bloku pro skupinu {GROUP}...")
cur.execute("""
SELECT MAX(article_number)
FROM articles
WHERE newsgroup = %s AND article_number < 10000000
""", (GROUP,))
res = cur.fetchone()[0]
val = int(res) if res else None
print(f"📍 Nalezena hranice: {val if val else 'Žádná (začínám od nuly)'}")
return val
article_numbers = [row[0] for row in cur.fetchall()]
total = len(article_numbers)
print(f"📦 Found {total} parts in DB")
def is_batch_missing(cur, start, end):
# Tady print nedáváme, aby to nespamovalo 1000x za sekundu
cur.execute("""
SELECT EXISTS (
SELECT 1 FROM articles
WHERE newsgroup = %s AND article_number BETWEEN %s AND %s
)
""", (GROUP, start, end))
return not cur.fetchone()[0]
if total == 0:
print("❌ No articles found, aborting.")
exit(1)
print("🔌 Connecting to Eweka NNTP...")
with nntplib.NNTP_SSL(
"news.eweka.nl",
563,
EWEKA_USER,
EWEKA_PASS,
readermode=True,
) as nntp:
def fast_fill():
print(f"🚀 Startuji proces: {datetime.now().strftime('%H:%M:%S')}")
nntp.group(GROUP)
conn = get_conn()
conn.autocommit = True
cur = conn.cursor()
existing = []
missing = []
current_id = get_current_frontier(cur)
if not current_id:
print("⚠️ Varování: Spodní hranice nenalezena, zkusím se zeptat serveru na nejstarší článek.")
for idx, art in enumerate(article_numbers, start=1):
try:
nntp.stat(art)
existing.append(art)
print(f"✅ [{idx}/{total}] EXISTS article {art}")
except Exception:
missing.append(art)
print(f"❌ [{idx}/{total}] MISSING article {art}")
print(f"🔌 Připojuji se k Eweka NNTP SSL (news.eweka.nl)...")
try:
with nntplib.NNTP_SSL("news.eweka.nl", 563, user=EWEKA_USER, password=EWEKA_PASS) as nntp:
resp, count, first, last, name = nntp.group(GROUP)
first, last = int(first), int(last)
print("\n================ RESULT ================")
print(f"Total parts : {total}")
print(f"Existing : {len(existing)}")
print(f"Missing : {len(missing)}")
if not current_id:
current_id = first
if existing:
print("\nExisting article_numbers:")
print(existing)
print(f"📊 INFO O SKUPINĚ:")
print(f" Název: {name}")
print(f" Článků na serveru: {count}")
print(f" Rozsah: {first}{last}")
print(f" Aktuální pozice: {current_id}")
print(f" Zbývá zpracovat cca: {last - current_id} článků")
print("-" * 50)
if missing:
print("\nMissing article_numbers (first 20):")
print(missing[:20])
total_downloaded = 0
start_process_time = time.time()
while current_id < last:
batch_start = current_id + 1
batch_end = min(batch_start + BATCH_SIZE - 1, last)
# 1. KROK: Kontrola existence
if not is_batch_missing(cur, batch_start, batch_end):
# Zkusíme skočit dál
cur.execute(
"SELECT MAX(article_number) FROM articles WHERE newsgroup = %s AND article_number BETWEEN %s AND %s",
(GROUP, batch_start, batch_end + 100000))
res = cur.fetchone()[0]
if res:
current_id = int(res)
print(f"⏩ [SKIP] Data existují. Přeskakuji na ID: {current_id}", end="\r")
else:
current_id = batch_end # Sychr
continue
# 2. KROK: Stahování (tady dáváme hodně printů)
print(f"\n💎 [DÍRA] Nalezena mezera: {batch_start} - {batch_end}")
# --- SÍŤOVÁ ČÁST ---
print(f" 📡 NNTP XOVER...", end=" ", flush=True)
t_net_start = time.time()
try:
resp, overviews = nntp.xover(batch_start, batch_end)
t_net_end = time.time()
print(f"OK ({len(overviews)} hlaviček za {t_net_end - t_net_start:.2f}s)")
rows = []
for art_num, fields in overviews:
rows.append((
GROUP, art_num, fields.get("message-id"),
Json({"group": GROUP, "article_number": art_num, **fields}),
datetime.now(UTC)
))
# --- DATABÁZOVÁ ČÁST ---
if rows:
print(f" 💾 POSTGRES INSERT...", end=" ", flush=True)
t_db_start = time.time()
cur.executemany(
"""
INSERT INTO articles (newsgroup, article_number, message_id, metadata, fetched_at)
VALUES (%s, %s, %s, %s, %s) ON CONFLICT DO NOTHING
""", rows
)
t_db_end = time.time()
total_downloaded += len(rows)
print(f"OK ({t_db_end - t_db_start:.2f}s)")
else:
print(f" Žádná data k uložení (prázdný rozsah na serveru)")
current_id = batch_end
except Exception as e:
print(f"\n❌ CHYBA v bloku {batch_start}-{batch_end}: {e}")
print(" Zkouším popojít o jeden BATCH dál...")
current_id += BATCH_SIZE
time.sleep(2) # Krátká pauza při chybě
except Exception as e:
print(f"\n💥 KRITICKÁ CHYBA PŘIPOJENÍ: {e}")
finally:
total_time = time.time() - start_process_time
print("\n" + "=" * 50)
print(f"🏁 KONEC RELACE")
print(f"⏱️ Celkový čas: {total_time:.1f} sekund")
print(f"📦 Celkem staženo nových hlaviček: {total_downloaded}")
if total_time > 0:
print(f"🚀 Průměrná rychlost: {total_downloaded / total_time:.1f} hlaviček/s")
print("=" * 50)
cur.close()
conn.close()
if __name__ == "__main__":
fast_fill()