408 lines
13 KiB
Python
408 lines
13 KiB
Python
"""
|
|
Rohlik.cz Price Scraper — API-based
|
|
Iterates leaf categories, fetches product IDs via listing API,
|
|
pulls details from 4 batch endpoints, upserts into MongoDB.
|
|
|
|
Usage:
|
|
python scraper.py # all categories -> MongoDB
|
|
python scraper.py --category "Ovoce a zelenina" # one main category only
|
|
python scraper.py --no-db # dry run, no DB writes
|
|
python scraper.py --visible # show browser window
|
|
"""
|
|
|
|
import sys
|
|
import io
|
|
import argparse
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
|
|
from playwright.sync_api import sync_playwright
|
|
|
|
from config import BASE_URL
|
|
from test_login import ensure_logged_in
|
|
from db import get_db, ensure_indexes, upsert_products, upsert_categories, log_scrape_run
|
|
|
|
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
|
|
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
)
|
|
log = logging.getLogger(__name__)
|
|
|
|
PAGE_SIZE = 50
|
|
CHUNK = 30
|
|
|
|
MAIN_CATS_URL = f"{BASE_URL}/api/v5/navigation/components/navigation-tabs/categories"
|
|
SUBCATS_URL = f"{BASE_URL}/api/v4/navigation/components/navigation-tabs/subcategories"
|
|
|
|
BATCH_ENDPOINTS = {
|
|
"base": "/api/v1/products",
|
|
"prices": "/api/v1/products/prices",
|
|
"stock": "/api/v1/products/stock",
|
|
"categories": "/api/v1/products/categories",
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_json(context, url, **params):
|
|
resp = context.request.get(url, params=params or None)
|
|
if resp.status != 200:
|
|
raise RuntimeError(f"HTTP {resp.status}: {url[:120]}")
|
|
return resp.json()
|
|
|
|
|
|
def as_list(payload):
|
|
if isinstance(payload, list):
|
|
return payload
|
|
if isinstance(payload, dict):
|
|
for k in ("data", "products", "items"):
|
|
v = payload.get(k)
|
|
if isinstance(v, list):
|
|
return v
|
|
return []
|
|
|
|
|
|
def pick(d, *keys):
|
|
"""Return the first non-None value among the given keys."""
|
|
for k in keys:
|
|
if isinstance(d, dict) and d.get(k) is not None:
|
|
return d[k]
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# category tree — live from API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def normalize_main(payload):
|
|
if isinstance(payload, list):
|
|
return payload
|
|
for key in ("data", "categories", "items", "navigationTabs", "tabs"):
|
|
v = payload.get(key)
|
|
if isinstance(v, list):
|
|
return v
|
|
if isinstance(v, dict):
|
|
for k2 in ("categories", "items", "tabs"):
|
|
if isinstance(v.get(k2), list):
|
|
return v[k2]
|
|
return []
|
|
|
|
|
|
def subs_from_payload(payload):
|
|
if isinstance(payload, list):
|
|
return payload
|
|
if isinstance(payload, dict):
|
|
for k in ("data", "subcategories", "items", "categories"):
|
|
v = payload.get(k)
|
|
if isinstance(v, list):
|
|
return v
|
|
return []
|
|
|
|
|
|
def fetch_children_recursive(context, parent_id, visited, depth=1, max_depth=6):
|
|
if str(parent_id) in visited or depth > max_depth:
|
|
return []
|
|
visited.add(str(parent_id))
|
|
|
|
sub_payload = get_json(context, SUBCATS_URL, categoryIds=str(parent_id))
|
|
subs = subs_from_payload(sub_payload)
|
|
|
|
out = []
|
|
for s in subs:
|
|
if not isinstance(s, dict):
|
|
continue
|
|
sid = pick(s, "id", "categoryId")
|
|
node = {
|
|
"id": sid,
|
|
"name": pick(s, "name", "title", "label"),
|
|
"url": pick(s, "url", "link", "slug"),
|
|
"children": [],
|
|
}
|
|
if sid and s.get("subcategoryIds"):
|
|
node["children"] = fetch_children_recursive(context, sid, visited, depth + 1, max_depth)
|
|
out.append(node)
|
|
return out
|
|
|
|
|
|
def fetch_category_tree(context):
|
|
"""Fetch full category tree live from Rohlik API."""
|
|
log.info("Fetching main categories ...")
|
|
main_payload = get_json(context, MAIN_CATS_URL)
|
|
main_cats = normalize_main(main_payload)
|
|
log.info(" %d main categories", len(main_cats))
|
|
|
|
tree = []
|
|
visited = set()
|
|
|
|
log.info("Fetching subcategories recursively ...")
|
|
for cat in main_cats:
|
|
cid = pick(cat, "id", "categoryId")
|
|
cname = pick(cat, "name", "title", "label")
|
|
curl = pick(cat, "url", "link", "slug")
|
|
if not cid:
|
|
continue
|
|
|
|
children = fetch_children_recursive(context, cid, visited)
|
|
node = {"id": cid, "name": cname, "url": curl, "children": children}
|
|
tree.append(node)
|
|
|
|
n_desc = count_nodes(children)
|
|
log.info(" - %s -> %d subcategories", cname, n_desc)
|
|
|
|
total = count_nodes(tree)
|
|
log.info(" Total: %d categories (incl. main)", total)
|
|
return tree
|
|
|
|
|
|
def count_nodes(nodes):
|
|
total = len(nodes)
|
|
for n in nodes:
|
|
total += count_nodes(n.get("children", []))
|
|
return total
|
|
|
|
|
|
def collect_leaves(nodes, path=None):
|
|
"""Return flat list of leaf nodes with their full path."""
|
|
if path is None:
|
|
path = []
|
|
leaves = []
|
|
for n in nodes:
|
|
current = path + [n["name"]]
|
|
children = n.get("children") or []
|
|
if children:
|
|
leaves.extend(collect_leaves(children, current))
|
|
else:
|
|
leaves.append({**n, "path": current})
|
|
return leaves
|
|
|
|
|
|
def tree_to_db_docs(nodes, parent_id=None, path=None, path_names=None):
|
|
"""Convert tree nodes to flat category docs for MongoDB."""
|
|
if path is None:
|
|
path = []
|
|
if path_names is None:
|
|
path_names = []
|
|
docs = []
|
|
for n in nodes:
|
|
cur_path = path + [n["id"]]
|
|
cur_names = path_names + [n["name"]]
|
|
children = n.get("children") or []
|
|
docs.append({
|
|
"_id": n["id"],
|
|
"name": n["name"],
|
|
"slug": (n.get("url") or "").lstrip("/"),
|
|
"path": cur_path,
|
|
"pathNames": cur_names,
|
|
"parentId": parent_id,
|
|
"isLeaf": len(children) == 0,
|
|
})
|
|
if children:
|
|
docs.extend(tree_to_db_docs(children, n["id"], cur_path, cur_names))
|
|
return docs
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# product fetching
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def fetch_product_ids(context, category_id):
|
|
"""Paginate through listing API, return all product IDs for a leaf."""
|
|
all_ids = []
|
|
page = 0
|
|
while True:
|
|
url = (f"{BASE_URL}/api/v1/categories/normal/{category_id}/products"
|
|
f"?page={page}&size={PAGE_SIZE}&sort=recommended&filter=&excludeProductIds=")
|
|
data = get_json(context, url)
|
|
ids = data.get("productIds") or []
|
|
all_ids.extend(ids)
|
|
if len(ids) < PAGE_SIZE:
|
|
break
|
|
page += 1
|
|
return all_ids
|
|
|
|
|
|
def fetch_batch(context, endpoint, product_ids):
|
|
qs = "&".join(f"products={pid}" for pid in product_ids)
|
|
url = f"{BASE_URL}{endpoint}?{qs}"
|
|
return as_list(get_json(context, url))
|
|
|
|
|
|
def fetch_product_details(context, product_ids):
|
|
"""For a chunk of IDs, call 4 batch endpoints and return raw lists."""
|
|
bases = fetch_batch(context, BATCH_ENDPOINTS["base"], product_ids)
|
|
prices = fetch_batch(context, BATCH_ENDPOINTS["prices"], product_ids)
|
|
stocks = fetch_batch(context, BATCH_ENDPOINTS["stock"], product_ids)
|
|
cats = fetch_batch(context, BATCH_ENDPOINTS["categories"], product_ids)
|
|
return bases, prices, stocks, cats
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# console output
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def print_header():
|
|
log.info("=" * 100)
|
|
log.info(" ROHLIK.CZ PRICE SCRAPER")
|
|
log.info("=" * 100)
|
|
|
|
|
|
def print_category_header(leaf, leaf_idx, total_leaves):
|
|
path_str = " > ".join(leaf["path"])
|
|
log.info("")
|
|
log.info("-" * 100)
|
|
log.info(" [%d/%d] %s (id=%s)", leaf_idx, total_leaves, path_str, leaf["id"])
|
|
log.info("-" * 100)
|
|
|
|
|
|
def print_products_table(bases, prices_list, stocks):
|
|
"""Print a compact table of products in this chunk."""
|
|
prices_map = {p["productId"]: p for p in prices_list}
|
|
stock_map = {s["productId"]: s for s in stocks}
|
|
|
|
for b in bases:
|
|
pid = b["id"]
|
|
p = prices_map.get(pid, {})
|
|
s = stock_map.get(pid, {})
|
|
|
|
name = b.get("name", "?")[:50]
|
|
price = p.get("price", {}).get("amount")
|
|
ppu = p.get("pricePerUnit", {}).get("amount")
|
|
unit = b.get("unit", "")
|
|
in_stock = s.get("inStock")
|
|
stock_str = "+" if in_stock else "-" if in_stock is False else "?"
|
|
|
|
sale_str = ""
|
|
sales = p.get("sales") or []
|
|
if sales:
|
|
sp = sales[0].get("price", {}).get("amount")
|
|
badge = (sales[0].get("badges") or [{}])[0].get("title", "")
|
|
if sp:
|
|
sale_str = f"{sp:.2f} {badge}"
|
|
|
|
price_str = f"{price:.2f}" if isinstance(price, (int, float)) else "?"
|
|
ppu_str = f"{ppu:.2f}/{unit}" if isinstance(ppu, (int, float)) else ""
|
|
|
|
log.info(" %s %9d %8s %12s %14s %s",
|
|
stock_str, pid, price_str, ppu_str, sale_str, name)
|
|
|
|
|
|
def print_summary(stats):
|
|
log.info("")
|
|
log.info("=" * 100)
|
|
log.info(" DONE")
|
|
log.info(" Categories: %d", stats["categories_scraped"])
|
|
log.info(" Products: %d unique", stats["products_total"])
|
|
log.info(" Duration: %.1f s", stats["duration_seconds"])
|
|
if stats.get("errors"):
|
|
log.info(" Errors: %d", stats["errors"])
|
|
log.info("=" * 100)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def run_scraper(category_filter=None, headless=True, save_to_db=True):
|
|
db = None
|
|
if save_to_db:
|
|
db = get_db()
|
|
ensure_indexes(db)
|
|
|
|
with sync_playwright() as pw:
|
|
context, page = ensure_logged_in(pw, headless=headless)
|
|
|
|
# fetch live category tree from API
|
|
tree = fetch_category_tree(context)
|
|
|
|
# filter to one main category if requested
|
|
if category_filter:
|
|
cf = category_filter.lower()
|
|
tree = [t for t in tree if cf in t["name"].lower()]
|
|
if not tree:
|
|
raise SystemExit(f"No main category matching '{category_filter}'")
|
|
|
|
leaves = collect_leaves(tree)
|
|
log.info("Scraping %d leaf categories", len(leaves))
|
|
|
|
# save categories to MongoDB
|
|
if db is not None:
|
|
cat_docs = tree_to_db_docs(tree)
|
|
upsert_categories(db, cat_docs)
|
|
log.info("Upserted %d category docs", len(cat_docs))
|
|
|
|
print_header()
|
|
|
|
run_start = datetime.now(timezone.utc)
|
|
seen_ids = set()
|
|
total_products = 0
|
|
errors = 0
|
|
|
|
for i, leaf in enumerate(leaves, 1):
|
|
print_category_header(leaf, i, len(leaves))
|
|
|
|
try:
|
|
product_ids = fetch_product_ids(context, leaf["id"])
|
|
log.info(" %d product IDs", len(product_ids))
|
|
|
|
if not product_ids:
|
|
continue
|
|
|
|
# deduplicate within run
|
|
new_ids = [pid for pid in product_ids if pid not in seen_ids]
|
|
seen_ids.update(product_ids)
|
|
|
|
# process in chunks
|
|
for j in range(0, len(new_ids), CHUNK):
|
|
chunk = new_ids[j:j + CHUNK]
|
|
bases, prices, stocks, cats = fetch_product_details(context, chunk)
|
|
|
|
print_products_table(bases, prices, stocks)
|
|
|
|
if db is not None:
|
|
upsert_products(db, bases, prices, stocks, cats)
|
|
|
|
total_products += len(bases)
|
|
|
|
except Exception:
|
|
log.exception(" ERROR in %s", leaf["name"])
|
|
errors += 1
|
|
|
|
context.browser.close()
|
|
|
|
run_end = datetime.now(timezone.utc)
|
|
stats = {
|
|
"startedAt": run_start,
|
|
"finishedAt": run_end,
|
|
"duration_seconds": (run_end - run_start).total_seconds(),
|
|
"categories_scraped": len(leaves),
|
|
"products_total": total_products,
|
|
"errors": errors,
|
|
"filter": category_filter,
|
|
}
|
|
|
|
if db is not None:
|
|
log_scrape_run(db, stats)
|
|
|
|
print_summary(stats)
|
|
return stats
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Rohlik.cz price scraper (API)")
|
|
parser.add_argument("--category", type=str, help="Scrape only this main category (e.g. 'Ovoce a zelenina')")
|
|
parser.add_argument("--no-db", action="store_true", help="Dry run — no MongoDB writes")
|
|
parser.add_argument("--visible", action="store_true", help="Show browser window")
|
|
args = parser.parse_args()
|
|
|
|
run_scraper(
|
|
category_filter=args.category,
|
|
headless=not args.visible,
|
|
save_to_db=not args.no_db,
|
|
)
|