notebookVB

This commit is contained in:
2026-06-01 05:34:31 +02:00
parent aa6562c921
commit 32e877ac81
4 changed files with 19076 additions and 30 deletions
File diff suppressed because it is too large Load Diff
+208
View File
@@ -0,0 +1,208 @@
"""
Scrape the live Rohlik.cz category tree (main categories + subcategories)
via the navigation API and save it as JSON.
Endpoints:
GET /api/v5/navigation/components/navigation-tabs/categories
GET /api/v4/navigation/components/navigation-tabs/subcategories?categoryIds=ID1,ID2,...
"""
import json
from pathlib import Path
from playwright.sync_api import sync_playwright
from config import BASE_URL
from test_login import ensure_logged_in
OUT_PATH = Path(__file__).parent / "categories_live.json"
MAIN_URL = f"{BASE_URL}/api/v5/navigation/components/navigation-tabs/categories"
SUB_URL = f"{BASE_URL}/api/v4/navigation/components/navigation-tabs/subcategories"
def fetch_json(context, url, **params):
resp = context.request.get(url, params=params or None)
if resp.status != 200:
raise RuntimeError(f"GET {url} -> {resp.status}: {resp.text()[:200]}")
return resp.json()
def normalize_main(payload):
"""The API wraps payload in different shapes — try to find the categories list."""
if isinstance(payload, list):
return payload
for key in ("data", "categories", "items", "navigationTabs", "tabs"):
v = payload.get(key)
if isinstance(v, list):
return v
if isinstance(v, dict):
for k2 in ("categories", "items", "tabs"):
if isinstance(v.get(k2), list):
return v[k2]
return []
def pick(d, *keys):
"""Return the first non-None value among the given keys."""
for k in keys:
if isinstance(d, dict) and d.get(k) is not None:
return d[k]
return None
def find_subcats_for(payload, parent_id):
"""Try to locate the subcategories list for a given parent id in the response."""
pid = str(parent_id)
# 1) dict keyed by parent id
if isinstance(payload, dict):
if pid in payload and isinstance(payload[pid], (list, dict)):
v = payload[pid]
if isinstance(v, list):
return v
for k in ("subcategories", "children", "items", "categories"):
if isinstance(v.get(k), list):
return v[k]
# 2) wrapped under "data"/etc
for wrap in ("data", "subcategories", "categories", "items"):
sub = payload.get(wrap)
if isinstance(sub, dict) and pid in sub:
v = sub[pid]
if isinstance(v, list):
return v
if isinstance(sub, list):
# 3) list of {parentId/categoryId/id: ..., children: [...]}
for entry in sub:
if not isinstance(entry, dict):
continue
if str(pick(entry, "parentId", "categoryId", "id")) == pid:
for k in ("subcategories", "children", "items", "categories"):
if isinstance(entry.get(k), list):
return entry[k]
return []
def build_clean_tree(main_payload, sub_payload):
"""Build a [{name, id, url, children:[{name,id,url}]}, ...] tree."""
out = []
for cat in normalize_main(main_payload):
cid = pick(cat, "id", "categoryId")
node = {
"id": cid,
"name": pick(cat, "name", "title", "label"),
"url": pick(cat, "url", "slug", "link"),
"children": [],
}
for sub in find_subcats_for(sub_payload, cid):
if not isinstance(sub, dict):
continue
node["children"].append({
"id": pick(sub, "id", "categoryId"),
"name": pick(sub, "name", "title", "label"),
"url": pick(sub, "url", "slug", "link"),
})
out.append(node)
return out
def subs_from_payload(payload):
"""Subcategories API returns either a flat list or a dict wrapping one."""
if isinstance(payload, list):
return payload
if isinstance(payload, dict):
for k in ("data", "subcategories", "items", "categories"):
v = payload.get(k)
if isinstance(v, list):
return v
return []
def fetch_children_recursive(context, parent_id, visited, depth=1, max_depth=6):
"""Fetch subcategories recursively for parent_id, return list of nodes."""
if str(parent_id) in visited or depth > max_depth:
return []
visited.add(str(parent_id))
sub_payload = fetch_json(context, SUB_URL, categoryIds=str(parent_id))
subs = subs_from_payload(sub_payload)
out = []
for s in subs:
if not isinstance(s, dict):
continue
sid = pick(s, "id", "categoryId")
node = {
"id": sid,
"name": pick(s, "name", "title", "label"),
"url": pick(s, "url", "link", "slug"),
"children": [],
}
# Only recurse if the item itself advertises children
if sid and s.get("subcategoryIds"):
node["children"] = fetch_children_recursive(context, sid, visited, depth + 1, max_depth)
out.append(node)
return out
def print_tree(nodes, indent=0):
for n in nodes:
print(f"{' ' * indent}- {n['name']} (id={n['id']})")
if n.get("children"):
print_tree(n["children"], indent + 1)
def count_nodes(nodes):
total = len(nodes)
for n in nodes:
total += count_nodes(n.get("children", []))
return total
def main():
with sync_playwright() as pw:
context, page = ensure_logged_in(pw)
print(f"\nFetching main categories ...")
main_payload = fetch_json(context, MAIN_URL)
main_cats = normalize_main(main_payload)
print(f" Got {len(main_cats)} main categories")
clean_tree = []
visited = set()
print(f"\nFetching subcategories recursively ...")
for cat in main_cats:
cid = pick(cat, "id", "categoryId")
cname = pick(cat, "name", "title", "label")
curl = pick(cat, "url", "link", "slug")
if not cid:
continue
children = fetch_children_recursive(context, cid, visited)
node = {
"id": cid,
"name": cname,
"url": curl,
"children": children,
}
clean_tree.append(node)
total = count_nodes(children)
print(f" - {cname} (id={cid}) -> {len(children)} direct, {total} total descendants")
print("\nFull category tree:")
print_tree(clean_tree)
grand_total = count_nodes(clean_tree)
print(f"\nTotal nodes (incl. main): {grand_total}")
tree = {
"tree": clean_tree,
"raw_main": main_payload,
}
OUT_PATH.write_text(json.dumps(tree, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"Saved -> {OUT_PATH} ({OUT_PATH.stat().st_size} bytes)")
context.browser.close()
if __name__ == "__main__":
main()
+124
View File
@@ -0,0 +1,124 @@
"""
Open the first leaf (deepest) subcategory from categories_live.json
and list all products in it via the Rohlik JSON API.
Endpoint:
GET /api/v1/categories/normal/{categoryId}/products?page=N&size=50&sort=recommended
"""
import json
from pathlib import Path
from playwright.sync_api import sync_playwright
from config import BASE_URL
from test_login import ensure_logged_in
TREE_PATH = Path(__file__).parent / "categories_live.json"
PAGE_SIZE = 50
def find_first_leaf(nodes, path=None):
"""Walk the tree depth-first and return (path, leaf_node) of the first leaf."""
if path is None:
path = []
for n in nodes:
current = path + [n["name"]]
children = n.get("children") or []
if not children:
return current, n
result = find_first_leaf(children, current)
if result:
return result
return None
def fetch_products_page(context, category_id, page):
url = f"{BASE_URL}/api/v1/categories/normal/{category_id}/products"
params = {"page": page, "size": PAGE_SIZE, "sort": "recommended", "filter": "", "excludeProductIds": ""}
resp = context.request.get(url, params=params)
if resp.status != 200:
raise RuntimeError(f"GET {url} -> {resp.status}: {resp.text()[:200]}")
return resp.json()
def extract_products(payload):
"""Find the products list in the payload — try common shapes."""
if isinstance(payload, list):
return payload
if isinstance(payload, dict):
for k in ("products", "data", "items"):
v = payload.get(k)
if isinstance(v, list):
return v
if isinstance(v, dict):
for k2 in ("products", "items"):
if isinstance(v.get(k2), list):
return v[k2]
return []
def format_price(p):
"""Try common price fields."""
if not isinstance(p, dict):
return ""
for k in ("price", "amount", "value"):
v = p.get(k)
if isinstance(v, (int, float)):
return f"{v:.2f}"
if isinstance(v, dict):
for k2 in ("amount", "value", "full"):
if isinstance(v.get(k2), (int, float)):
return f"{v[k2]:.2f}"
return ""
def main():
if not TREE_PATH.exists():
raise SystemExit(f"Missing {TREE_PATH} — run scrape_categories.py first.")
data = json.loads(TREE_PATH.read_text(encoding="utf-8"))
tree = data["tree"]
path, leaf = find_first_leaf(tree)
print(f"First leaf: {' > '.join(path)} (id={leaf['id']})")
print(f"URL: {BASE_URL}{leaf['url']}\n")
with sync_playwright() as pw:
context, page = ensure_logged_in(pw)
all_products = []
page_num = 0
while True:
print(f"Fetching page {page_num} ...")
payload = fetch_products_page(context, leaf["id"], page_num)
products = extract_products(payload)
print(f" got {len(products)} products")
if not products:
break
all_products.extend(products)
if len(products) < PAGE_SIZE:
break
page_num += 1
print(f"\nTotal products: {len(all_products)}\n")
# Show first product raw structure so we can confirm field names
if all_products:
print("--- Sample raw product (first item, truncated) ---")
print(json.dumps(all_products[0], ensure_ascii=False, indent=2)[:1500])
print("--- end sample ---\n")
print("Products in category:")
for p in all_products:
name = p.get("productName") or p.get("name") or p.get("title") or "?"
pid = p.get("productId") or p.get("id") or "?"
price = format_price(p)
print(f" [{pid}] {name} {price}")
out_path = Path(__file__).parent / f"products_{leaf['id']}.json"
out_path.write_text(json.dumps(all_products, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"\nSaved raw products -> {out_path} ({out_path.stat().st_size} bytes)")
context.browser.close()
if __name__ == "__main__":
main()
+77 -27
View File
@@ -1,49 +1,99 @@
"""
Reuse saved browser state (cookies + localStorage) so the Usercentrics cookie
banner never appears and we stay logged in — same situation as a returning user.
Reusable login flow for Rohlik.cz:
- If auth_state.json is MISSING: opens a browser, you accept cookies + log in
manually, then press Enter to save the state.
- If auth_state.json EXISTS: loads it and just verifies (no banner, logged in).
1. Load saved session (auth_state.json) if it exists.
2. Open the site and check whether we're already logged in.
3. If yes -> continue.
4. If no -> log in via the JSON API, accept cookies, save the session, continue.
"""
import json
from pathlib import Path
from playwright.sync_api import sync_playwright
from config import BASE_URL, AUTH_STATE_PATH
from playwright.sync_api import sync_playwright, BrowserContext, Page
from config import BASE_URL, AUTH_STATE_PATH, ROHLIK_EMAIL, ROHLIK_PASSWORD
LOGIN_URL = f"{BASE_URL}/services/frontend-service/login"
def is_logged_in(page: Page) -> bool:
return page.locator('text="Přihlásit se"').count() == 0
def accept_cookies(page: Page):
"""Accept the Usercentrics consent banner via its official JS API."""
result = page.evaluate('''async () => {
for (let i = 0; i < 20; i++) {
if (window.UC_UI && window.UC_UI.isInitialized && window.UC_UI.isInitialized()) break;
await new Promise(r => setTimeout(r, 250));
}
if (window.UC_UI && typeof window.UC_UI.acceptAllConsents === 'function') {
await window.UC_UI.acceptAllConsents();
await window.UC_UI.closeCMP();
return "accepted";
}
return "UC_UI not available";
}''')
# Wait for the banner to actually detach from the DOM (close animation ~1s)
try:
page.wait_for_selector('#usercentrics-cmp-ui', state='detached', timeout=5000)
except Exception:
pass
return result
def api_login(context: BrowserContext) -> int:
resp = context.request.post(
LOGIN_URL,
data=json.dumps({"email": ROHLIK_EMAIL, "password": ROHLIK_PASSWORD}),
headers={"Content-Type": "application/json", "Accept": "application/json"},
)
return resp.status
def ensure_logged_in(pw) -> tuple[BrowserContext, Page]:
auth_path = Path(AUTH_STATE_PATH)
have_state = auth_path.exists()
with sync_playwright() as pw:
browser = pw.chromium.launch(headless=False, args=["--start-maximized"])
ctx_args = {"no_viewport": True}
if have_state:
ctx_args["storage_state"] = AUTH_STATE_PATH
context = browser.new_context(**ctx_args)
page = context.new_page()
print(f"Opening {BASE_URL} (state loaded: {have_state}) ...")
print(f"1) Opening site (saved session: {have_state}) ...")
page.goto(BASE_URL, wait_until="domcontentloaded", timeout=60000)
page.wait_for_timeout(4000)
page.wait_for_timeout(3000)
banner_present = page.locator('#usercentrics-cmp-ui').count() > 0
is_logged_in = page.locator('text="Přihlásit se"').count() == 0
print(f"Cookie banner present: {banner_present}")
print(f"Logged in: {is_logged_in}")
if is_logged_in(page):
print("2) Already logged in from saved session — continuing.")
return context, page
if not have_state:
print("\n" + "=" * 60)
print("No saved state. Accept cookies + log in manually,")
print("then press Enter here to save the state.")
print("=" * 60)
input()
print("2) Not logged in — logging in via API ...")
status = api_login(context)
print(f" Login API status: {status}")
page.goto(BASE_URL, wait_until="domcontentloaded", timeout=60000)
page.wait_for_timeout(3000)
print(f"3) Accepting cookies: {accept_cookies(page)}")
if is_logged_in(page):
context.storage_state(path=AUTH_STATE_PATH)
print(f"Saved state to {AUTH_STATE_PATH}")
banner_present = page.locator('#usercentrics-cmp-ui').count() > 0
is_logged_in = page.locator('text="Přihlásit se"').count() == 0
print(f" -> banner present now: {banner_present}, logged in: {is_logged_in}")
print("4) Logged in and session saved.")
else:
print("4) Login FAILED — check API status above.")
print("\nPress Enter to close browser...")
return context, page
if __name__ == "__main__":
with sync_playwright() as pw:
context, page = ensure_logged_in(pw)
print(f"\n -> logged in: {is_logged_in(page)}")
print(f" -> cookie banner present: {page.locator('#usercentrics-cmp-ui').count() > 0}")
print("\nReady to scrape. Press Enter to close browser...")
input()
browser.close()
context.browser.close()