Education· Last updated April 7, 2026

Build an E-Commerce Price Monitoring System with WebShot API in Python

Learn how to build a competitive price intelligence dashboard using WebShot API to automate screenshots of competitor product pages at scale.

Build an E-Commerce Price Monitoring System with WebShot API in Python

Competitive pricing is one of the highest-leverage levers in e-commerce. A 1% price optimization across a large catalog can move revenue by millions. Yet most pricing teams still spend hours each week manually checking competitor sites, copying prices into spreadsheets, and making decisions based on data that was accurate 48 hours ago.

Automated price monitoring changes this completely. Instead of weekly manual checks, you get continuous price intelligence — knowing within hours when a competitor drops or raises prices, launches a promotion, or goes out of stock on a key SKU.

This guide shows you how to build a production-ready competitor price monitoring system using the WebShot API. You'll capture full-page screenshots of competitor product listings, extract price data, detect changes, and push alerts when meaningful shifts occur.

Why Screenshots Instead of HTML Scraping?

You might wonder: why use a screenshot API when you could scrape the HTML directly?

Modern e-commerce sites are JavaScript-rendered. Prices, inventory status, and promotional banners are injected by JavaScript after the initial HTML loads. A raw HTTP request gets you the skeleton HTML — not the prices you see in the browser.

Screenshot APIs execute JavaScript like a real browser. The WebShot API loads the page fully, waits for JavaScript to render, then captures the visual result. You get exactly what a human shopper would see — dynamic prices, countdown timers, and "limited stock" indicators included.

Screenshot APIs bypass some anti-scraping measures. Many e-commerce sites aggressively block automated HTTP requests. The WebShot API uses real browser rendering with proper user agent headers, significantly improving success rates.

The tradeoff is that you need a visual parsing layer (price extraction from images or HTML snapshot) — but for competitive intelligence at scale, that tradeoff is worth it.

System Architecture

Product URL List → WebShot API → Screenshot + HTML Snapshot
                                         ↓
                               Price Extraction Layer
                                         ↓
                               Database (price history)
                                         ↓
                               Change Detection → Alert System
                                         ↓
                               Dashboard / Reports

Setting Up the Client

pip install httpx asyncio aiofiles Pillow pytesseract sqlite-utils schedule
import httpx
import asyncio
import base64
from typing import Optional
from dataclasses import dataclass
 
@dataclass
class WebshotResult:
    url: str
    screenshot_base64: str
    html_content: Optional[str]
    success: bool
    error: Optional[str] = None
 
class WebShotClient:
    BASE_URL = "https://apivult.com/api/webshot"
 
    def __init__(self, api_key: str):
        self.client = httpx.AsyncClient(
            headers={"X-RapidAPI-Key": api_key},
            timeout=45.0  # Screenshots need time to load JS
        )
 
    async def capture(
        self,
        url: str,
        full_page: bool = True,
        width: int = 1440,
        wait_for_selector: Optional[str] = None,
        include_html: bool = True
    ) -> WebshotResult:
        payload = {
            "url": url,
            "full_page": full_page,
            "width": width,
            "format": "png",
            "include_html": include_html
        }
 
        if wait_for_selector:
            payload["wait_for_selector"] = wait_for_selector
 
        try:
            response = await self.client.post(
                f"{self.BASE_URL}/capture",
                json=payload
            )
            response.raise_for_status()
            data = response.json()
 
            return WebshotResult(
                url=url,
                screenshot_base64=data["screenshot"],
                html_content=data.get("html"),
                success=True
            )
        except Exception as e:
            return WebshotResult(
                url=url,
                screenshot_base64="",
                html_content=None,
                success=False,
                error=str(e)
            )
 
    async def close(self):
        await self.client.aclose()

Product Catalog Configuration

Define your competitor products with site-specific CSS selectors for price extraction:

from dataclasses import dataclass, field
 
@dataclass
class CompetitorProduct:
    product_id: str       # Your internal SKU
    competitor_name: str
    url: str
    price_selector: str   # CSS selector for the price element
    stock_selector: Optional[str] = None
    name_selector: Optional[str] = None
 
# Example product catalog
MONITORED_PRODUCTS = [
    CompetitorProduct(
        product_id="SKU-001",
        competitor_name="CompetitorA",
        url="https://competitor-a.com/products/blue-widget",
        price_selector=".price-current",
        stock_selector=".stock-status",
        name_selector="h1.product-title"
    ),
    CompetitorProduct(
        product_id="SKU-001",
        competitor_name="CompetitorB",
        url="https://competitor-b.com/p/blue-widget-pro",
        price_selector="[data-price]",
        stock_selector=".availability",
        name_selector=".pdp-title"
    ),
    CompetitorProduct(
        product_id="SKU-002",
        competitor_name="CompetitorA",
        url="https://competitor-a.com/products/red-gadget",
        price_selector=".price-current",
        stock_selector=".stock-status"
    ),
]

Price Extraction from HTML Snapshots

The WebShot API returns both the screenshot and the rendered HTML. Extract prices programmatically from the HTML — it's faster and more accurate than OCR on the image:

import re
from bs4 import BeautifulSoup
 
def extract_price_from_html(html: str, price_selector: str) -> Optional[float]:
    """
    Extract numeric price from rendered HTML using a CSS selector.
    Handles common price formats: $1,299.99 | £89.00 | €1.299,00
    """
    soup = BeautifulSoup(html, "html.parser")
    element = soup.select_one(price_selector)
 
    if not element:
        return None
 
    price_text = element.get_text(strip=True)
 
    # Remove currency symbols and normalize separators
    # Handle both 1,299.99 (US) and 1.299,99 (EU) formats
    cleaned = re.sub(r"[^\d.,]", "", price_text)
 
    # Detect format: if last separator is comma, it's EU format
    if cleaned.count(",") == 1 and cleaned.count(".") == 0:
        cleaned = cleaned.replace(",", ".")
    elif cleaned.count(",") == 1 and cleaned.count(".") >= 1:
        # US format: 1,299.99 → remove comma
        cleaned = cleaned.replace(",", "")
    elif cleaned.count(",") >= 2:
        # EU format: 1.299,99 → remove dots, replace comma with dot
        cleaned = cleaned.replace(".", "").replace(",", ".")
 
    try:
        return float(cleaned)
    except ValueError:
        return None
 
def extract_stock_status(html: str, stock_selector: str) -> str:
    """Returns 'in_stock', 'out_of_stock', or 'unknown'"""
    soup = BeautifulSoup(html, "html.parser")
    element = soup.select_one(stock_selector)
 
    if not element:
        return "unknown"
 
    text = element.get_text(strip=True).lower()
 
    out_of_stock_signals = ["out of stock", "unavailable", "sold out", "not available"]
    in_stock_signals = ["in stock", "available", "add to cart", "buy now"]
 
    if any(signal in text for signal in out_of_stock_signals):
        return "out_of_stock"
    elif any(signal in text for signal in in_stock_signals):
        return "in_stock"
 
    return "unknown"

Database Layer for Price History

import sqlite3
from datetime import datetime, timezone
 
def init_database(db_path: str = "price_monitor.db"):
    conn = sqlite3.connect(db_path)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS price_snapshots (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            product_id TEXT NOT NULL,
            competitor_name TEXT NOT NULL,
            url TEXT NOT NULL,
            price REAL,
            stock_status TEXT,
            screenshot_path TEXT,
            captured_at TEXT NOT NULL,
            capture_success INTEGER DEFAULT 1
        )
    """)
    conn.execute("""
        CREATE INDEX IF NOT EXISTS idx_product_competitor_time
        ON price_snapshots (product_id, competitor_name, captured_at)
    """)
    conn.commit()
    return conn
 
def save_price_snapshot(
    conn: sqlite3.Connection,
    product: CompetitorProduct,
    price: Optional[float],
    stock_status: str,
    screenshot_path: str,
    success: bool = True
):
    conn.execute("""
        INSERT INTO price_snapshots
        (product_id, competitor_name, url, price, stock_status, screenshot_path, captured_at, capture_success)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    """, (
        product.product_id,
        product.competitor_name,
        product.url,
        price,
        stock_status,
        screenshot_path,
        datetime.now(timezone.utc).isoformat(),
        1 if success else 0
    ))
    conn.commit()
 
def get_previous_price(
    conn: sqlite3.Connection,
    product_id: str,
    competitor_name: str
) -> Optional[float]:
    cursor = conn.execute("""
        SELECT price FROM price_snapshots
        WHERE product_id = ? AND competitor_name = ? AND price IS NOT NULL
        ORDER BY captured_at DESC
        LIMIT 1 OFFSET 1
    """, (product_id, competitor_name))
    row = cursor.fetchone()
    return row[0] if row else None

Change Detection and Alert System

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
 
PRICE_CHANGE_THRESHOLD = 0.02   # Alert on 2%+ price changes
 
def detect_price_change(
    current_price: Optional[float],
    previous_price: Optional[float]
) -> Optional[dict]:
    """Returns change details if significant, None otherwise"""
    if current_price is None or previous_price is None:
        return None
 
    if previous_price == 0:
        return None
 
    change_pct = (current_price - previous_price) / previous_price
 
    if abs(change_pct) < PRICE_CHANGE_THRESHOLD:
        return None
 
    return {
        "previous_price": previous_price,
        "current_price": current_price,
        "change_pct": change_pct,
        "direction": "decrease" if change_pct < 0 else "increase",
        "absolute_change": abs(current_price - previous_price)
    }
 
def send_price_alert(
    product: CompetitorProduct,
    change: dict,
    smtp_config: dict
):
    direction = "⬇️ DROPPED" if change["direction"] == "decrease" else "⬆️ INCREASED"
    pct = abs(change["change_pct"] * 100)
 
    subject = f"Price Alert: {product.competitor_name} {direction} {pct:.1f}% on {product.product_id}"
 
    body = f"""
    Competitor Price Change Detected
 
    Product ID: {product.product_id}
    Competitor: {product.competitor_name}
    URL: {product.url}
 
    Previous Price: ${change['previous_price']:.2f}
    Current Price:  ${change['current_price']:.2f}
    Change: {'+' if change['direction'] == 'increase' else '-'}${change['absolute_change']:.2f} ({pct:.1f}%)
 
    Direction: {direction}
    """
 
    msg = MIMEMultipart()
    msg["From"] = smtp_config["from"]
    msg["To"] = smtp_config["to"]
    msg["Subject"] = subject
    msg.attach(MIMEText(body, "plain"))
 
    with smtplib.SMTP(smtp_config["host"], smtp_config["port"]) as server:
        server.starttls()
        server.login(smtp_config["username"], smtp_config["password"])
        server.send_message(msg)

Main Monitoring Loop

import aiofiles
import os
 
async def monitor_product(
    webshot_client: WebShotClient,
    db_conn: sqlite3.Connection,
    product: CompetitorProduct,
    screenshot_dir: str,
    smtp_config: Optional[dict] = None
):
    result = await webshot_client.capture(
        url=product.url,
        full_page=False,        # Just viewport for faster capture
        wait_for_selector=product.price_selector
    )
 
    screenshot_path = ""
    if result.success and result.screenshot_base64:
        # Save screenshot to disk
        filename = f"{product.product_id}_{product.competitor_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
        screenshot_path = os.path.join(screenshot_dir, filename)
        image_data = base64.b64decode(result.screenshot_base64)
        async with aiofiles.open(screenshot_path, "wb") as f:
            await f.write(image_data)
 
    # Extract price and stock status
    price = None
    stock_status = "unknown"
 
    if result.success and result.html_content:
        price = extract_price_from_html(result.html_content, product.price_selector)
        if product.stock_selector:
            stock_status = extract_stock_status(result.html_content, product.stock_selector)
 
    # Save to database
    save_price_snapshot(db_conn, product, price, stock_status, screenshot_path, result.success)
 
    # Check for price changes and send alerts
    if price is not None and smtp_config:
        previous_price = get_previous_price(db_conn, product.product_id, product.competitor_name)
        change = detect_price_change(price, previous_price)
 
        if change:
            print(f"PRICE CHANGE: {product.competitor_name} | {product.product_id} | {change['direction']} {abs(change['change_pct']*100):.1f}%")
            send_price_alert(product, change, smtp_config)
 
    return {"product": product.product_id, "competitor": product.competitor_name, "price": price, "success": result.success}
 
async def run_monitoring_cycle(
    webshot_client: WebShotClient,
    db_conn: sqlite3.Connection,
    products: list[CompetitorProduct],
    screenshot_dir: str = "screenshots",
    smtp_config: Optional[dict] = None,
    concurrent_limit: int = 5
):
    os.makedirs(screenshot_dir, exist_ok=True)
    semaphore = asyncio.Semaphore(concurrent_limit)
 
    async def monitor_with_limit(product: CompetitorProduct):
        async with semaphore:
            return await monitor_product(
                webshot_client, db_conn, product, screenshot_dir, smtp_config
            )
 
    tasks = [monitor_with_limit(p) for p in products]
    results = await asyncio.gather(*tasks, return_exceptions=True)
 
    successes = sum(1 for r in results if isinstance(r, dict) and r.get("success"))
    print(f"Monitoring cycle complete: {successes}/{len(products)} succeeded")
    return results
 
# Main entrypoint
async def main():
    db_conn = init_database("price_monitor.db")
    client = WebShotClient(api_key="YOUR_API_KEY")
 
    try:
        await run_monitoring_cycle(
            webshot_client=client,
            db_conn=db_conn,
            products=MONITORED_PRODUCTS,
            screenshot_dir="screenshots",
            concurrent_limit=5
        )
    finally:
        await client.close()
        db_conn.close()
 
asyncio.run(main())

Setting Up Scheduled Monitoring

Run the monitoring cycle every 4 hours using a cron job or scheduler:

import schedule
import time
 
def run_monitoring_job():
    asyncio.run(main())
 
schedule.every(4).hours.do(run_monitoring_job)
schedule.every().day.at("09:00").do(run_monitoring_job)  # Morning baseline
 
while True:
    schedule.run_pending()
    time.sleep(60)

Analytics: Price Intelligence Dashboard

Query your price history for competitive intelligence:

def get_price_history(db_conn: sqlite3.Connection, product_id: str, days: int = 30) -> list:
    cursor = db_conn.execute("""
        SELECT competitor_name, price, stock_status, captured_at
        FROM price_snapshots
        WHERE product_id = ?
          AND captured_at >= datetime('now', '-' || ? || ' days')
          AND capture_success = 1
        ORDER BY captured_at ASC
    """, (product_id, days))
    return cursor.fetchall()
 
def get_lowest_competitor_price(db_conn: sqlite3.Connection, product_id: str) -> dict:
    cursor = db_conn.execute("""
        SELECT competitor_name, price, captured_at
        FROM price_snapshots
        WHERE product_id = ?
          AND price IS NOT NULL
          AND stock_status = 'in_stock'
          AND captured_at >= datetime('now', '-1 days')
        ORDER BY price ASC
        LIMIT 1
    """, (product_id,))
    row = cursor.fetchone()
    return {"competitor": row[0], "price": row[1], "as_of": row[2]} if row else {}

Production Considerations

Rate limiting: Space requests across competitors to avoid IP blocks. The 5-concurrent-request limit with natural delays between captures is a good starting point.

Screenshot archiving: Screenshots accumulate quickly. Implement a retention policy — keep the last 30 days of daily screenshots, weekly snapshots for 3 months, then discard.

Failure handling: Some product pages will return 404, redirect, or render incorrectly. Log failures and alert your team when a product page consistently fails — it may indicate a URL change or product discontinuation.

Legal compliance: Ensure your use of price monitoring complies with each site's terms of service. Many sites permit price comparison for competitive analysis; some restrict automated access explicitly.

Get Started

The WebShot API is available through apivult.com on RapidAPI. The free tier provides enough captures to prototype and validate your monitoring setup before scaling to production volumes.

For e-commerce teams managing competitive pricing across hundreds of SKUs, automated price monitoring pays for itself within weeks — both in saved analyst time and in revenue protected by faster pricing responses.