from datetime import datetime, timezone from pymongo import MongoClient, ASCENDING, DESCENDING, TEXT from config import MONGO_URI, MONGO_DB def get_db(): client = MongoClient(MONGO_URI) return client[MONGO_DB] def ensure_indexes(db): db.categories.create_index("parentId") db.categories.create_index("isLeaf") db.products.create_index("mainCategoryId") db.products.create_index([("archived", ASCENDING), ("lastSeen", DESCENDING)]) db.products.create_index([("name", TEXT)]) db.price_history.create_index([("productId", ASCENDING), ("scrapedAt", DESCENDING)]) db.price_history.create_index([("scrapedAt", DESCENDING)]) db.scrape_runs.create_index([("startedAt", DESCENDING)]) def upsert_category(db, cat: dict): db.categories.update_one( {"_id": cat["_id"]}, {"$set": cat}, upsert=True, ) def upsert_categories(db, cats: list[dict]): for cat in cats: upsert_category(db, cat) def upsert_product(db, base: dict, prices: dict, stock: dict, categories: list[dict]): now = datetime.now(timezone.utc) product_id = base["id"] sale_raw = prices.get("sales", []) sale = None if sale_raw: s = sale_raw[0] sale = { "type": s.get("type"), "price": s["price"]["amount"], "pricePerUnit": s.get("pricePerUnit", {}).get("amount"), "badge": (s.get("badges") or [{}])[0].get("title"), "validTill": s.get("validTill"), } category_path = [c["id"] for c in categories] if categories else [] doc = { "name": base["name"], "slug": base.get("slug"), "brand": base.get("brand"), "unit": base.get("unit"), "textualAmount": base.get("textualAmount"), "weightedItem": base.get("weightedItem", False), "mainCategoryId": base.get("mainCategoryId"), "categoryPath": category_path, "allCategories": [ {"id": c["id"], "name": c["name"], "level": c.get("level", 0)} for c in categories ] if categories else [], "countryCode": base.get("flag"), "images": base.get("images", []), "badges": base.get("badges", []), "archived": base.get("archived", False), "premiumOnly": base.get("premiumOnly", False), "currentPrice": prices["price"]["amount"], "currentPricePerUnit": prices.get("pricePerUnit", {}).get("amount"), "currency": prices["price"].get("currency", "CZK"), "sale": sale, "inStock": stock.get("inStock", False), "maxBasketAmount": stock.get("maxBasketAmount", 0), "packageAmount": stock.get("packageInfo", {}).get("amount"), "packageUnit": stock.get("packageInfo", {}).get("unit"), "warehouseId": stock.get("warehouseId"), "lastSeen": now, "lastScrapedAt": now, } db.products.update_one( {"_id": product_id}, { "$set": doc, "$setOnInsert": {"firstSeen": now}, }, upsert=True, ) db.price_history.insert_one({ "productId": product_id, "scrapedAt": now, "price": prices["price"]["amount"], "pricePerUnit": prices.get("pricePerUnit", {}).get("amount"), "inStock": stock.get("inStock", False), "sale": sale, }) def upsert_products(db, bases: list, prices_list: list, stocks: list, categories_list: list): prices_map = {p["productId"]: p for p in prices_list} stock_map = {s["productId"]: s for s in stocks} cats_map = {c["productId"]: c.get("categories", []) for c in categories_list} for base in bases: pid = base["id"] upsert_product( db, base, prices_map.get(pid, {"price": {"amount": 0}}), stock_map.get(pid, {}), cats_map.get(pid, []), ) def log_scrape_run(db, run_data: dict): run_data.setdefault("startedAt", datetime.now(timezone.utc)) db.scrape_runs.insert_one(run_data)