Files
rohlik/10PriceScraping/Rohlik/scrape_first_leaf.py
T
2026-06-01 06:02:34 +02:00

203 lines
7.0 KiB
Python

"""
Open the first leaf (deepest) subcategory from categories_live.json
and list all products in it via the same API calls the website itself makes.
Flow that mimics the real frontend:
1. GET /api/v1/categories/normal/{categoryId}/products?page=N
-> { productIds: [...] }
2. For each chunk of IDs, call 5 batch endpoints in the same way the site does:
/api/v1/products
/api/v1/products/prices
/api/v1/products/stock
/api/v1/products/categories
/api/v1/products/user-data
All use repeated query params: ?products=ID1&products=ID2&...
3. Merge results per productId into one record.
"""
import json
from pathlib import Path
from playwright.sync_api import sync_playwright
from config import BASE_URL
from test_login import ensure_logged_in
TREE_PATH = Path(__file__).parent / "categories_live.json"
PAGE_SIZE = 50
CHUNK = 30 # how many IDs per batch request
# Endpoints that the frontend calls in parallel for each set of product IDs.
PRODUCT_BATCH_ENDPOINTS = {
"base": "/api/v1/products",
"prices": "/api/v1/products/prices",
"stock": "/api/v1/products/stock",
"categories": "/api/v1/products/categories",
"user_data": "/api/v1/products/user-data",
}
def find_first_leaf(nodes, path=None):
if path is None:
path = []
for n in nodes:
current = path + [n["name"]]
children = n.get("children") or []
if not children:
return current, n
result = find_first_leaf(children, current)
if result:
return result
return None
def get_json(context, url):
resp = context.request.get(url)
if resp.status != 200:
raise RuntimeError(f"GET {url[:120]}... -> {resp.status}: {resp.text()[:200]}")
return resp.json()
def fetch_products_page(context, category_id, page):
url = (f"{BASE_URL}/api/v1/categories/normal/{category_id}/products"
f"?page={page}&size={PAGE_SIZE}&sort=recommended&filter=&excludeProductIds=")
return get_json(context, url)
def fetch_batch(context, path, product_ids):
"""Call a batch endpoint with ?products=ID&products=ID&... — like the frontend does."""
qs = "&".join(f"products={pid}" for pid in product_ids)
url = f"{BASE_URL}{path}?{qs}"
return get_json(context, url)
def as_list(payload):
"""Each batch endpoint returns either a list or a wrapper around one."""
if isinstance(payload, list):
return payload
if isinstance(payload, dict):
for k in ("data", "products", "items"):
v = payload.get(k)
if isinstance(v, list):
return v
return []
def index_by_id(items):
out = {}
for it in items:
if not isinstance(it, dict):
continue
pid = it.get("productId") or it.get("id")
if pid is not None:
out[int(pid)] = it
return out
def fetch_merged_products(context, product_ids):
"""For a chunk of IDs, call all 5 endpoints and merge per productId."""
results = {key: index_by_id(as_list(fetch_batch(context, path, product_ids)))
for key, path in PRODUCT_BATCH_ENDPOINTS.items()}
merged = []
for pid in product_ids:
record = {"productId": pid}
for key in PRODUCT_BATCH_ENDPOINTS:
data = results[key].get(int(pid))
if data is not None:
record[key] = data
merged.append(record)
return merged
def main():
if not TREE_PATH.exists():
raise SystemExit(f"Missing {TREE_PATH} — run scrape_categories.py first.")
data = json.loads(TREE_PATH.read_text(encoding="utf-8"))
path, leaf = find_first_leaf(data["tree"])
print(f"First leaf: {' > '.join(path)} (id={leaf['id']})")
print(f"URL: {BASE_URL}{leaf['url']}\n")
with sync_playwright() as pw:
context, page = ensure_logged_in(pw)
# Step 1: collect all product IDs across pages
all_ids = []
page_num = 0
while True:
print(f"Listing page {page_num} ...")
payload = fetch_products_page(context, leaf["id"], page_num)
ids = payload.get("productIds") or []
print(f" got {len(ids)} product IDs")
if not ids:
break
all_ids.extend(ids)
if len(ids) < PAGE_SIZE:
break
page_num += 1
print(f"\nTotal IDs: {len(all_ids)}")
if not all_ids:
context.browser.close()
return
# Step 2: per chunk, hit the 5 batch endpoints the frontend uses and merge
all_products = []
for i in range(0, len(all_ids), CHUNK):
chunk = all_ids[i:i + CHUNK]
print(f"Batch fetch for IDs {i}..{i + len(chunk) - 1} ({len(chunk)} items) ...")
merged = fetch_merged_products(context, chunk)
all_products.extend(merged)
print(f"\nTotal products: {len(all_products)}\n")
# Show one merged record so we see real field shapes
if all_products:
print("--- Sample merged product (first item, truncated) ---")
print(json.dumps(all_products[0], ensure_ascii=False, indent=2)[:2500])
print("--- end sample ---\n")
# Simple human-readable listing
print(f"{'ID':>9} {'Skladem':<8} {'Cena':>10} {'Za jedn.':>11} {'Akce':>10} Název (balení)")
print("-" * 100)
for p in all_products:
base = p.get("base") or {}
prices = p.get("prices") or {}
stock = p.get("stock") or {}
name = base.get("name") or "?"
unit = base.get("unit") or ""
textual = base.get("textualAmount") or ""
price = (prices.get("price") or {}).get("amount")
ppu = (prices.get("pricePerUnit") or {}).get("amount")
sale_price = None
sale_badge = ""
sales = prices.get("sales") or []
if sales:
first = sales[0]
sale_price = (first.get("price") or {}).get("amount")
badges = first.get("badges") or []
if badges:
sale_badge = badges[0].get("title") or first.get("type") or ""
else:
sale_badge = first.get("type") or ""
in_stock = stock.get("inStock")
stock_str = "ano" if in_stock else ("ne" if in_stock is False else "?")
price_str = f"{price:.2f}" if isinstance(price, (int, float)) else ""
ppu_str = f"{ppu:.2f}/{unit}" if isinstance(ppu, (int, float)) else ""
sale_str = f"{sale_price:.2f} {sale_badge}".strip() if isinstance(sale_price, (int, float)) else ""
print(f"{p['productId']:>9} {stock_str:<8} {price_str:>10} {ppu_str:>11} {sale_str:>10} {name} ({textual})")
out_path = Path(__file__).parent / f"products_{leaf['id']}.json"
out_path.write_text(json.dumps(all_products, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"\nSaved -> {out_path} ({out_path.stat().st_size} bytes)")
context.browser.close()
if __name__ == "__main__":
main()