210311a7b3
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
255 lines
9.3 KiB
Python
255 lines
9.3 KiB
Python
"""
|
|
Rohlik.cz Price Scraper - Main Scraper
|
|
Version: 1.0.0
|
|
Date: 2026-05-31
|
|
|
|
Playwright-based scraper that iterates all leaf categories on Rohlik.cz,
|
|
scrolls to lazy-load every product card, and extracts pricing data from the DOM.
|
|
Supports authenticated scraping (prices differ for logged-in users).
|
|
|
|
Usage:
|
|
python scraper.py --no-db --visible # scrape to JSON, visible browser
|
|
python scraper.py --no-db --filter "Brambory" # scrape single category to JSON
|
|
python scraper.py # scrape to MongoDB
|
|
python scraper.py --visible # scrape to MongoDB, visible browser
|
|
"""
|
|
|
|
import re
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from playwright.sync_api import sync_playwright, Page
|
|
|
|
from config import (
|
|
BASE_URL, AUTH_STATE_PATH,
|
|
ROHLIK_EMAIL, ROHLIK_PASSWORD,
|
|
SCROLL_PAUSE, MAX_SCROLLS,
|
|
)
|
|
from categories import get_leaf_categories, get_all_categories_flat
|
|
from db import get_db, ensure_indexes, upsert_product, upsert_category, log_scrape_run
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def parse_price(raw: str | None) -> float | None:
|
|
if not raw:
|
|
return None
|
|
digits = re.sub(r"[^\d]", "", raw)
|
|
if not digits:
|
|
return None
|
|
return int(digits) / 100
|
|
|
|
|
|
def parse_original_price(raw: str | None) -> float | None:
|
|
if not raw:
|
|
return None
|
|
match = re.search(r"([\d\s]+[,.][\d]+)", raw.replace("\xa0", " "))
|
|
if match:
|
|
return float(match.group(1).replace(" ", "").replace(",", "."))
|
|
digits = re.sub(r"[^\d]", "", raw)
|
|
if digits:
|
|
return float(digits) / 100
|
|
return None
|
|
|
|
|
|
def login(page: Page):
|
|
log.info("Logging in to Rohlik.cz...")
|
|
page.goto(BASE_URL, wait_until="domcontentloaded", timeout=60000)
|
|
page.wait_for_timeout(3000)
|
|
|
|
page.locator('text="Přihlásit se"').first.click()
|
|
page.wait_for_timeout(2000)
|
|
|
|
page.locator('input[type="email"], input[name="email"]').first.fill(ROHLIK_EMAIL)
|
|
page.locator('input[type="password"], input[name="password"]').first.fill(ROHLIK_PASSWORD)
|
|
page.locator('button[type="submit"]').first.click()
|
|
page.wait_for_timeout(5000)
|
|
|
|
page.context.storage_state(path=AUTH_STATE_PATH)
|
|
log.info("Login successful, auth state saved.")
|
|
|
|
|
|
def scroll_to_load_all(page: Page) -> int:
|
|
prev_count = 0
|
|
for i in range(MAX_SCROLLS):
|
|
page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
|
|
page.wait_for_timeout(int(SCROLL_PAUSE * 1000))
|
|
current_count = page.locator('[data-test^="productCard-AVAILABLE-"]').count()
|
|
if current_count == prev_count and i > 2:
|
|
break
|
|
prev_count = current_count
|
|
return prev_count
|
|
|
|
|
|
def extract_products(page: Page, category: dict) -> list[dict]:
|
|
products_data = page.evaluate("""
|
|
() => {
|
|
const products = [];
|
|
document.querySelectorAll('[data-test^="productCard-AVAILABLE-"]').forEach(card => {
|
|
const id = card.getAttribute('data-test').replace('productCard-AVAILABLE-', '');
|
|
const nameEl = card.querySelector('[data-test="productCard-body-name"]');
|
|
const priceNoEl = card.querySelector('[data-test="productCard-body-price-priceNo"]');
|
|
const saleEl = card.querySelector('[data-test="productCard-body-price-sale"]');
|
|
const amountEl = card.querySelector('[data-test="productCard-footer-amount"]');
|
|
const unitPriceEl = card.querySelector('[data-test="productCard-footer-unitPrice"]');
|
|
const badgeEl = card.querySelector('[data-test="productCard-body-badge"]');
|
|
const imgEl = card.querySelector('img');
|
|
const linkEl = card.querySelector('a[href*="/"]');
|
|
|
|
products.push({
|
|
product_id: id,
|
|
name: nameEl?.textContent?.trim() || '',
|
|
price_raw: priceNoEl?.textContent?.trim() || '',
|
|
original_price_raw: saleEl?.textContent?.trim() || '',
|
|
amount: amountEl?.textContent?.trim() || '',
|
|
unit_price_raw: unitPriceEl?.textContent?.trim() || '',
|
|
discount_badge: badgeEl?.textContent?.trim() || '',
|
|
image_url: imgEl?.src || '',
|
|
product_url: linkEl?.getAttribute('href') || '',
|
|
});
|
|
});
|
|
return products;
|
|
}
|
|
""")
|
|
|
|
results = []
|
|
for p in products_data:
|
|
results.append({
|
|
"product_id": p["product_id"],
|
|
"name": p["name"],
|
|
"price": parse_price(p["price_raw"]),
|
|
"original_price": parse_original_price(p["original_price_raw"]),
|
|
"discount_badge": p["discount_badge"] or None,
|
|
"amount": p["amount"] or None,
|
|
"unit_price": p["unit_price_raw"].strip() or None,
|
|
"image_url": p["image_url"] or None,
|
|
"product_url": f"{BASE_URL}{p['product_url']}" if p["product_url"] else None,
|
|
"category_id": category["id"],
|
|
"category_name": category["name"],
|
|
"category_path": " > ".join(category.get("path", [category["name"]])),
|
|
})
|
|
return results
|
|
|
|
|
|
def scrape_leaf(page: Page, category: dict) -> list[dict]:
|
|
url = f"{BASE_URL}{category['url']}"
|
|
log.info("Scraping: %s (%s)", " > ".join(category.get("path", [category["name"]])), url)
|
|
|
|
page.goto(url, wait_until="domcontentloaded", timeout=60000)
|
|
page.wait_for_timeout(3000)
|
|
|
|
try:
|
|
page.wait_for_selector('[data-test^="productCard-AVAILABLE-"]', timeout=15000)
|
|
except Exception:
|
|
log.warning(" No products found in %s, skipping.", category["name"])
|
|
return []
|
|
|
|
total = scroll_to_load_all(page)
|
|
products = extract_products(page, category)
|
|
log.info(" %d products extracted (loaded %d)", len(products), total)
|
|
return products
|
|
|
|
|
|
def run_scraper(
|
|
category_filter: str | None = None,
|
|
headless: bool = True,
|
|
save_to_db: bool = True,
|
|
):
|
|
leaves = get_leaf_categories()
|
|
if category_filter:
|
|
category_filter_lower = category_filter.lower()
|
|
leaves = [c for c in leaves if category_filter_lower in " > ".join(c["path"]).lower()]
|
|
|
|
log.info("Will scrape %d leaf categories", len(leaves))
|
|
|
|
with sync_playwright() as pw:
|
|
ctx_args = {}
|
|
if Path(AUTH_STATE_PATH).exists():
|
|
ctx_args["storage_state"] = AUTH_STATE_PATH
|
|
|
|
browser = pw.chromium.launch(headless=headless)
|
|
context = browser.new_context(**ctx_args)
|
|
page = context.new_page()
|
|
|
|
page.goto(BASE_URL, wait_until="domcontentloaded", timeout=60000)
|
|
page.wait_for_timeout(5000)
|
|
|
|
is_logged_in = page.locator('text="Přihlásit se"').count() == 0
|
|
if not is_logged_in:
|
|
if ROHLIK_EMAIL and ROHLIK_PASSWORD:
|
|
login(page)
|
|
context = browser.new_context(storage_state=AUTH_STATE_PATH)
|
|
page = context.new_page()
|
|
else:
|
|
log.warning("Not logged in! Prices may differ from member prices.")
|
|
|
|
run_start = datetime.now(timezone.utc)
|
|
all_products = []
|
|
seen_ids = set()
|
|
|
|
db = None
|
|
if save_to_db:
|
|
db = get_db()
|
|
ensure_indexes(db)
|
|
for cat_data in get_all_categories_flat():
|
|
upsert_category(db, cat_data)
|
|
|
|
for leaf in leaves:
|
|
try:
|
|
products = scrape_leaf(page, leaf)
|
|
for p in products:
|
|
if p["product_id"] not in seen_ids:
|
|
seen_ids.add(p["product_id"])
|
|
all_products.append(p)
|
|
if db:
|
|
upsert_product(db, p)
|
|
except Exception:
|
|
log.exception("Error scraping %s", leaf["name"])
|
|
|
|
run_end = datetime.now(timezone.utc)
|
|
run_data = {
|
|
"started_at": run_start,
|
|
"finished_at": run_end,
|
|
"duration_seconds": (run_end - run_start).total_seconds(),
|
|
"categories_scraped": len(leaves),
|
|
"products_scraped": len(all_products),
|
|
}
|
|
|
|
if db:
|
|
log_scrape_run(db, run_data)
|
|
|
|
log.info(
|
|
"Done: %d unique products from %d categories in %.1fs",
|
|
len(all_products), len(leaves), run_data["duration_seconds"],
|
|
)
|
|
|
|
browser.close()
|
|
|
|
return all_products
|
|
|
|
|
|
def scrape_to_json(output_path: str = "products.json", **kwargs):
|
|
products = run_scraper(save_to_db=False, **kwargs)
|
|
with open(output_path, "w", encoding="utf-8") as f:
|
|
json.dump(products, f, ensure_ascii=False, indent=2, default=str)
|
|
log.info("Saved %d products to %s", len(products), output_path)
|
|
return products
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Rohlik.cz price scraper")
|
|
parser.add_argument("--no-db", action="store_true", help="Save to JSON instead of MongoDB")
|
|
parser.add_argument("--visible", action="store_true", help="Run browser in visible mode")
|
|
parser.add_argument("--filter", type=str, help="Filter categories by name (e.g. 'Ovoce', 'Zelenina > Rajčata')")
|
|
args = parser.parse_args()
|
|
|
|
if args.no_db:
|
|
scrape_to_json(category_filter=args.filter, headless=not args.visible)
|
|
else:
|
|
run_scraper(category_filter=args.filter, headless=not args.visible)
|