""" Rohlik.cz Price Scraper — API-based Iterates leaf categories, fetches product IDs via listing API, pulls details from 4 batch endpoints, upserts into MongoDB. Usage: python scraper.py # all categories -> MongoDB python scraper.py --category "Ovoce a zelenina" # one main category only python scraper.py --no-db # dry run, no DB writes python scraper.py --visible # show browser window """ import sys import io import argparse import logging from datetime import datetime, timezone from playwright.sync_api import sync_playwright from config import BASE_URL from test_login import ensure_logged_in from db import get_db, ensure_indexes, upsert_products, upsert_categories, log_scrape_run sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace") sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace") logging.basicConfig( level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%H:%M:%S", ) log = logging.getLogger(__name__) PAGE_SIZE = 50 CHUNK = 30 MAIN_CATS_URL = f"{BASE_URL}/api/v5/navigation/components/navigation-tabs/categories" SUBCATS_URL = f"{BASE_URL}/api/v4/navigation/components/navigation-tabs/subcategories" BATCH_ENDPOINTS = { "base": "/api/v1/products", "prices": "/api/v1/products/prices", "stock": "/api/v1/products/stock", "categories": "/api/v1/products/categories", } # --------------------------------------------------------------------------- # helpers # --------------------------------------------------------------------------- def get_json(context, url, **params): resp = context.request.get(url, params=params or None) if resp.status != 200: raise RuntimeError(f"HTTP {resp.status}: {url[:120]}") return resp.json() def as_list(payload): if isinstance(payload, list): return payload if isinstance(payload, dict): for k in ("data", "products", "items"): v = payload.get(k) if isinstance(v, list): return v return [] def pick(d, *keys): """Return the first non-None value among the given keys.""" for k in keys: if isinstance(d, dict) and d.get(k) is not None: return d[k] return None # --------------------------------------------------------------------------- # category tree — live from API # --------------------------------------------------------------------------- def normalize_main(payload): if isinstance(payload, list): return payload for key in ("data", "categories", "items", "navigationTabs", "tabs"): v = payload.get(key) if isinstance(v, list): return v if isinstance(v, dict): for k2 in ("categories", "items", "tabs"): if isinstance(v.get(k2), list): return v[k2] return [] def subs_from_payload(payload): if isinstance(payload, list): return payload if isinstance(payload, dict): for k in ("data", "subcategories", "items", "categories"): v = payload.get(k) if isinstance(v, list): return v return [] def fetch_children_recursive(context, parent_id, visited, depth=1, max_depth=6): if str(parent_id) in visited or depth > max_depth: return [] visited.add(str(parent_id)) sub_payload = get_json(context, SUBCATS_URL, categoryIds=str(parent_id)) subs = subs_from_payload(sub_payload) out = [] for s in subs: if not isinstance(s, dict): continue sid = pick(s, "id", "categoryId") node = { "id": sid, "name": pick(s, "name", "title", "label"), "url": pick(s, "url", "link", "slug"), "children": [], } if sid and s.get("subcategoryIds"): node["children"] = fetch_children_recursive(context, sid, visited, depth + 1, max_depth) out.append(node) return out def fetch_category_tree(context): """Fetch full category tree live from Rohlik API.""" log.info("Fetching main categories ...") main_payload = get_json(context, MAIN_CATS_URL) main_cats = normalize_main(main_payload) log.info(" %d main categories", len(main_cats)) tree = [] visited = set() log.info("Fetching subcategories recursively ...") for cat in main_cats: cid = pick(cat, "id", "categoryId") cname = pick(cat, "name", "title", "label") curl = pick(cat, "url", "link", "slug") if not cid: continue children = fetch_children_recursive(context, cid, visited) node = {"id": cid, "name": cname, "url": curl, "children": children} tree.append(node) n_desc = count_nodes(children) log.info(" - %s -> %d subcategories", cname, n_desc) total = count_nodes(tree) log.info(" Total: %d categories (incl. main)", total) return tree def count_nodes(nodes): total = len(nodes) for n in nodes: total += count_nodes(n.get("children", [])) return total def collect_leaves(nodes, path=None): """Return flat list of leaf nodes with their full path.""" if path is None: path = [] leaves = [] for n in nodes: current = path + [n["name"]] children = n.get("children") or [] if children: leaves.extend(collect_leaves(children, current)) else: leaves.append({**n, "path": current}) return leaves def tree_to_db_docs(nodes, parent_id=None, path=None, path_names=None): """Convert tree nodes to flat category docs for MongoDB.""" if path is None: path = [] if path_names is None: path_names = [] docs = [] for n in nodes: cur_path = path + [n["id"]] cur_names = path_names + [n["name"]] children = n.get("children") or [] docs.append({ "_id": n["id"], "name": n["name"], "slug": (n.get("url") or "").lstrip("/"), "path": cur_path, "pathNames": cur_names, "parentId": parent_id, "isLeaf": len(children) == 0, }) if children: docs.extend(tree_to_db_docs(children, n["id"], cur_path, cur_names)) return docs # --------------------------------------------------------------------------- # product fetching # --------------------------------------------------------------------------- def fetch_product_ids(context, category_id): """Paginate through listing API, return all product IDs for a leaf.""" all_ids = [] page = 0 while True: url = (f"{BASE_URL}/api/v1/categories/normal/{category_id}/products" f"?page={page}&size={PAGE_SIZE}&sort=recommended&filter=&excludeProductIds=") data = get_json(context, url) ids = data.get("productIds") or [] all_ids.extend(ids) if len(ids) < PAGE_SIZE: break page += 1 return all_ids def fetch_batch(context, endpoint, product_ids): qs = "&".join(f"products={pid}" for pid in product_ids) url = f"{BASE_URL}{endpoint}?{qs}" return as_list(get_json(context, url)) def fetch_product_details(context, product_ids): """For a chunk of IDs, call 4 batch endpoints and return raw lists.""" bases = fetch_batch(context, BATCH_ENDPOINTS["base"], product_ids) prices = fetch_batch(context, BATCH_ENDPOINTS["prices"], product_ids) stocks = fetch_batch(context, BATCH_ENDPOINTS["stock"], product_ids) cats = fetch_batch(context, BATCH_ENDPOINTS["categories"], product_ids) return bases, prices, stocks, cats # --------------------------------------------------------------------------- # console output # --------------------------------------------------------------------------- def print_header(): log.info("=" * 100) log.info(" ROHLIK.CZ PRICE SCRAPER") log.info("=" * 100) def print_category_header(leaf, leaf_idx, total_leaves): path_str = " > ".join(leaf["path"]) log.info("") log.info("-" * 100) log.info(" [%d/%d] %s (id=%s)", leaf_idx, total_leaves, path_str, leaf["id"]) log.info("-" * 100) def print_products_table(bases, prices_list, stocks): """Print a compact table of products in this chunk.""" prices_map = {p["productId"]: p for p in prices_list} stock_map = {s["productId"]: s for s in stocks} for b in bases: pid = b["id"] p = prices_map.get(pid, {}) s = stock_map.get(pid, {}) name = b.get("name", "?")[:50] price = p.get("price", {}).get("amount") ppu = p.get("pricePerUnit", {}).get("amount") unit = b.get("unit", "") in_stock = s.get("inStock") stock_str = "+" if in_stock else "-" if in_stock is False else "?" sale_str = "" sales = p.get("sales") or [] if sales: sp = sales[0].get("price", {}).get("amount") badge = (sales[0].get("badges") or [{}])[0].get("title", "") if sp: sale_str = f"{sp:.2f} {badge}" price_str = f"{price:.2f}" if isinstance(price, (int, float)) else "?" ppu_str = f"{ppu:.2f}/{unit}" if isinstance(ppu, (int, float)) else "" log.info(" %s %9d %8s %12s %14s %s", stock_str, pid, price_str, ppu_str, sale_str, name) def print_summary(stats): log.info("") log.info("=" * 100) log.info(" DONE") log.info(" Categories: %d", stats["categories_scraped"]) log.info(" Products: %d unique", stats["products_total"]) log.info(" Duration: %.1f s", stats["duration_seconds"]) if stats.get("errors"): log.info(" Errors: %d", stats["errors"]) log.info("=" * 100) # --------------------------------------------------------------------------- # main # --------------------------------------------------------------------------- def run_scraper(category_filter=None, headless=True, save_to_db=True): db = None if save_to_db: db = get_db() ensure_indexes(db) with sync_playwright() as pw: context, page = ensure_logged_in(pw, headless=headless) # fetch live category tree from API tree = fetch_category_tree(context) # filter to one main category if requested if category_filter: cf = category_filter.lower() tree = [t for t in tree if cf in t["name"].lower()] if not tree: raise SystemExit(f"No main category matching '{category_filter}'") leaves = collect_leaves(tree) log.info("Scraping %d leaf categories", len(leaves)) # save categories to MongoDB if db is not None: cat_docs = tree_to_db_docs(tree) upsert_categories(db, cat_docs) log.info("Upserted %d category docs", len(cat_docs)) print_header() run_start = datetime.now(timezone.utc) seen_ids = set() total_products = 0 errors = 0 for i, leaf in enumerate(leaves, 1): print_category_header(leaf, i, len(leaves)) try: product_ids = fetch_product_ids(context, leaf["id"]) log.info(" %d product IDs", len(product_ids)) if not product_ids: continue # deduplicate within run new_ids = [pid for pid in product_ids if pid not in seen_ids] seen_ids.update(product_ids) # process in chunks for j in range(0, len(new_ids), CHUNK): chunk = new_ids[j:j + CHUNK] bases, prices, stocks, cats = fetch_product_details(context, chunk) print_products_table(bases, prices, stocks) if db is not None: upsert_products(db, bases, prices, stocks, cats) total_products += len(bases) except Exception: log.exception(" ERROR in %s", leaf["name"]) errors += 1 context.browser.close() run_end = datetime.now(timezone.utc) stats = { "startedAt": run_start, "finishedAt": run_end, "duration_seconds": (run_end - run_start).total_seconds(), "categories_scraped": len(leaves), "products_total": total_products, "errors": errors, "filter": category_filter, } if db is not None: log_scrape_run(db, stats) print_summary(stats) return stats if __name__ == "__main__": parser = argparse.ArgumentParser(description="Rohlik.cz price scraper (API)") parser.add_argument("--category", type=str, help="Scrape only this main category (e.g. 'Ovoce a zelenina')") parser.add_argument("--no-db", action="store_true", help="Dry run — no MongoDB writes") parser.add_argument("--visible", action="store_true", help="Show browser window") args = parser.parse_args() run_scraper( category_filter=args.category, headless=not args.visible, save_to_db=not args.no_db, )