Files
2026-06-01 07:24:46 +02:00

126 lines
3.9 KiB
Python

from datetime import datetime, timezone
from pymongo import MongoClient, ASCENDING, DESCENDING, TEXT
from config import MONGO_URI, MONGO_DB
def get_db():
client = MongoClient(MONGO_URI)
return client[MONGO_DB]
def ensure_indexes(db):
db.categories.create_index("parentId")
db.categories.create_index("isLeaf")
db.products.create_index("mainCategoryId")
db.products.create_index([("archived", ASCENDING), ("lastSeen", DESCENDING)])
db.products.create_index([("name", TEXT)])
db.price_history.create_index([("productId", ASCENDING), ("scrapedAt", DESCENDING)])
db.price_history.create_index([("scrapedAt", DESCENDING)])
db.scrape_runs.create_index([("startedAt", DESCENDING)])
def upsert_category(db, cat: dict):
db.categories.update_one(
{"_id": cat["_id"]},
{"$set": cat},
upsert=True,
)
def upsert_categories(db, cats: list[dict]):
for cat in cats:
upsert_category(db, cat)
def upsert_product(db, base: dict, prices: dict, stock: dict, categories: list[dict]):
now = datetime.now(timezone.utc)
product_id = base["id"]
sale_raw = prices.get("sales", [])
sale = None
if sale_raw:
s = sale_raw[0]
sale = {
"type": s.get("type"),
"price": s["price"]["amount"],
"pricePerUnit": s.get("pricePerUnit", {}).get("amount"),
"badge": (s.get("badges") or [{}])[0].get("title"),
"validTill": s.get("validTill"),
}
category_path = [c["id"] for c in categories] if categories else []
doc = {
"name": base["name"],
"slug": base.get("slug"),
"brand": base.get("brand"),
"unit": base.get("unit"),
"textualAmount": base.get("textualAmount"),
"weightedItem": base.get("weightedItem", False),
"mainCategoryId": base.get("mainCategoryId"),
"categoryPath": category_path,
"allCategories": [
{"id": c["id"], "name": c["name"], "level": c.get("level", 0)}
for c in categories
] if categories else [],
"countryCode": base.get("flag"),
"images": base.get("images", []),
"badges": base.get("badges", []),
"archived": base.get("archived", False),
"premiumOnly": base.get("premiumOnly", False),
"currentPrice": prices["price"]["amount"],
"currentPricePerUnit": prices.get("pricePerUnit", {}).get("amount"),
"currency": prices["price"].get("currency", "CZK"),
"sale": sale,
"inStock": stock.get("inStock", False),
"maxBasketAmount": stock.get("maxBasketAmount", 0),
"packageAmount": stock.get("packageInfo", {}).get("amount"),
"packageUnit": stock.get("packageInfo", {}).get("unit"),
"warehouseId": stock.get("warehouseId"),
"lastSeen": now,
"lastScrapedAt": now,
}
db.products.update_one(
{"_id": product_id},
{
"$set": doc,
"$setOnInsert": {"firstSeen": now},
},
upsert=True,
)
db.price_history.insert_one({
"productId": product_id,
"scrapedAt": now,
"price": prices["price"]["amount"],
"pricePerUnit": prices.get("pricePerUnit", {}).get("amount"),
"inStock": stock.get("inStock", False),
"sale": sale,
})
def upsert_products(db, bases: list, prices_list: list, stocks: list, categories_list: list):
prices_map = {p["productId"]: p for p in prices_list}
stock_map = {s["productId"]: s for s in stocks}
cats_map = {c["productId"]: c.get("categories", []) for c in categories_list}
for base in bases:
pid = base["id"]
upsert_product(
db,
base,
prices_map.get(pid, {"price": {"amount": 0}}),
stock_map.get(pid, {}),
cats_map.get(pid, []),
)
def log_scrape_run(db, run_data: dict):
run_data.setdefault("startedAt", datetime.now(timezone.utc))
db.scrape_runs.insert_one(run_data)