Education· Last updated April 19, 2026

How to Build a Real-Time Energy Price Monitoring System in Python with Energy Volatility API

Step-by-step Python guide to building an automated energy price monitoring and alert system. Covers Brent crude, natural gas, and power price tracking with threshold alerts and historical trend analysis.

How to Build a Real-Time Energy Price Monitoring System in Python with Energy Volatility API

In April 2026, physical Brent crude prices are trading above $150 per barrel and natural gas prices have hit multi-year highs. For procurement teams, CFOs, and operations managers with significant energy exposure, the difference between checking prices at 9 AM and having an automated alert fire at 6 AM can be a 5-10% swing in procurement cost.

In this guide, we will build a complete energy price monitoring system in Python using the Energy Volatility API. By the end, you will have a production-ready system that:

  • Fetches real-time prices for Brent crude, WTI, natural gas, and power
  • Calculates price change percentages against configurable baselines
  • Triggers email and Slack alerts when prices cross defined thresholds
  • Stores price history in a local SQLite database for trend analysis
  • Generates weekly price reports as formatted summaries

The full system runs as a scheduled job — check prices every 15 minutes, alert immediately on threshold breaches, report weekly. Total implementation: under 200 lines of Python.

Prerequisites

pip install requests pandas matplotlib schedule python-dotenv smtplib

Create a .env file in your project directory:

ENERGY_API_KEY=YOUR_API_KEY
[email protected]
SMTP_SERVER=smtp.gmail.com
SMTP_PORT=587
[email protected]
SMTP_PASSWORD=your-app-password
SLACK_WEBHOOK_URL=https://hooks.slack.com/your-webhook-url

Step 1: API Client and Price Fetching

Start with a clean API client that handles authentication, rate limiting, and error handling:

import os
import time
import logging
from dataclasses import dataclass
from typing import Optional
import requests
from dotenv import load_dotenv
 
load_dotenv()
 
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
 
@dataclass(frozen=True)
class EnergyPrice:
    commodity: str
    price: float
    currency: str
    unit: str
    timestamp: str
    change_24h: float
    change_pct_24h: float
 
class EnergyVolatilityClient:
    BASE_URL = "https://apivult.com/api/energy-volatility"
    
    def __init__(self, api_key: str):
        self._session = requests.Session()
        self._session.headers.update({
            "X-RapidAPI-Key": api_key,
            "Content-Type": "application/json"
        })
        self._last_request_time = 0.0
        self._min_interval = 1.0  # rate limit: 1 request per second
    
    def _throttle(self) -> None:
        elapsed = time.time() - self._last_request_time
        if elapsed < self._min_interval:
            time.sleep(self._min_interval - elapsed)
        self._last_request_time = time.time()
    
    def get_price(self, commodity: str) -> Optional[EnergyPrice]:
        """Fetch current price for a commodity. Returns None on failure."""
        self._throttle()
        try:
            response = self._session.get(
                f"{self.BASE_URL}/price",
                params={"commodity": commodity},
                timeout=10
            )
            response.raise_for_status()
            data = response.json()
            return EnergyPrice(
                commodity=commodity,
                price=data["price"],
                currency=data["currency"],
                unit=data["unit"],
                timestamp=data["timestamp"],
                change_24h=data["change_24h"],
                change_pct_24h=data["change_pct_24h"]
            )
        except requests.RequestException as e:
            logger.error(f"Failed to fetch {commodity} price: {e}")
            return None
    
    def get_multiple_prices(self, commodities: list[str]) -> dict[str, EnergyPrice]:
        """Fetch prices for multiple commodities. Returns dict of commodity -> price."""
        results = {}
        for commodity in commodities:
            price = self.get_price(commodity)
            if price is not None:
                results[commodity] = price
        return results
 
# Commodities to monitor
MONITORED_COMMODITIES = [
    "brent_crude",    # Brent crude oil $/bbl
    "wti_crude",      # WTI crude oil $/bbl
    "natural_gas",    # Henry Hub natural gas $/MMBtu
    "eu_power",       # European power price €/MWh
    "us_power",       # US power price $/MWh
]

Step 2: Price Storage with SQLite

Store price history locally for trend analysis and report generation:

import sqlite3
from contextlib import contextmanager
from datetime import datetime
 
DB_PATH = "energy_prices.db"
 
def initialize_database() -> None:
    """Create tables if they don't exist."""
    with sqlite3.connect(DB_PATH) as conn:
        conn.execute("""
            CREATE TABLE IF NOT EXISTS price_history (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                commodity TEXT NOT NULL,
                price REAL NOT NULL,
                currency TEXT NOT NULL,
                unit TEXT NOT NULL,
                change_24h REAL,
                change_pct_24h REAL,
                fetched_at TEXT NOT NULL
            )
        """)
        conn.execute("""
            CREATE INDEX IF NOT EXISTS idx_commodity_time 
            ON price_history (commodity, fetched_at)
        """)
        conn.commit()
 
@contextmanager
def get_db_connection():
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    try:
        yield conn
    finally:
        conn.close()
 
def store_prices(prices: dict[str, EnergyPrice]) -> None:
    """Store fetched prices in the database."""
    fetched_at = datetime.utcnow().isoformat()
    with get_db_connection() as conn:
        conn.executemany(
            """INSERT INTO price_history 
               (commodity, price, currency, unit, change_24h, change_pct_24h, fetched_at)
               VALUES (?, ?, ?, ?, ?, ?, ?)""",
            [
                (p.commodity, p.price, p.currency, p.unit,
                 p.change_24h, p.change_pct_24h, fetched_at)
                for p in prices.values()
            ]
        )
        conn.commit()
    logger.info(f"Stored {len(prices)} price records at {fetched_at}")
 
def get_price_history(commodity: str, days: int = 7) -> list[dict]:
    """Retrieve price history for a commodity over the past N days."""
    cutoff = datetime.utcnow().replace(
        hour=0, minute=0, second=0
    ).isoformat()
    with get_db_connection() as conn:
        rows = conn.execute(
            """SELECT * FROM price_history 
               WHERE commodity = ? AND fetched_at >= ?
               ORDER BY fetched_at ASC""",
            (commodity, cutoff)
        ).fetchall()
    return [dict(row) for row in rows]

Step 3: Alert Configuration and Threshold Logic

Define threshold-based alerts with cooldown periods to prevent alert fatigue:

from dataclasses import dataclass, field
from typing import Callable
 
@dataclass
class AlertThreshold:
    commodity: str
    condition: str          # "above" or "below"
    price_level: float
    cooldown_minutes: int = 60
    last_triggered: Optional[datetime] = field(default=None, repr=False)
    
    def should_trigger(self, current_price: float) -> bool:
        """Check if this threshold should fire given the current price."""
        if self.condition == "above" and current_price <= self.price_level:
            return False
        if self.condition == "below" and current_price >= self.price_level:
            return False
        
        if self.last_triggered is not None:
            elapsed = (datetime.utcnow() - self.last_triggered).total_seconds() / 60
            if elapsed < self.cooldown_minutes:
                return False
        
        return True
    
    def mark_triggered(self) -> "AlertThreshold":
        """Return new threshold with updated last_triggered timestamp."""
        return AlertThreshold(
            commodity=self.commodity,
            condition=self.condition,
            price_level=self.price_level,
            cooldown_minutes=self.cooldown_minutes,
            last_triggered=datetime.utcnow()
        )
 
# Configure your alert thresholds
DEFAULT_THRESHOLDS = [
    AlertThreshold("brent_crude", "above", 155.0, cooldown_minutes=120),
    AlertThreshold("brent_crude", "below", 130.0, cooldown_minutes=120),
    AlertThreshold("wti_crude", "above", 150.0, cooldown_minutes=120),
    AlertThreshold("natural_gas", "above", 8.0, cooldown_minutes=60),
    AlertThreshold("eu_power", "above", 200.0, cooldown_minutes=60),
    AlertThreshold("us_power", "above", 120.0, cooldown_minutes=60),
]
 
def check_thresholds(
    prices: dict[str, EnergyPrice],
    thresholds: list[AlertThreshold]
) -> tuple[list[tuple[AlertThreshold, EnergyPrice]], list[AlertThreshold]]:
    """
    Returns (triggered_alerts, updated_thresholds).
    Triggered alerts is a list of (threshold, price) pairs.
    Updated thresholds has last_triggered timestamps updated for fired alerts.
    """
    triggered = []
    updated_thresholds = []
    
    for threshold in thresholds:
        price = prices.get(threshold.commodity)
        if price is not None and threshold.should_trigger(price.price):
            triggered.append((threshold, price))
            updated_thresholds.append(threshold.mark_triggered())
        else:
            updated_thresholds.append(threshold)
    
    return triggered, updated_thresholds

Step 4: Alert Delivery (Email and Slack)

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import json
 
def format_alert_message(
    threshold: AlertThreshold,
    price: EnergyPrice
) -> str:
    direction = "ABOVE" if threshold.condition == "above" else "BELOW"
    return (
        f"⚠️ ENERGY PRICE ALERT\n"
        f"Commodity: {price.commodity.replace('_', ' ').upper()}\n"
        f"Current Price: {price.currency}{price.price:.2f}/{price.unit}\n"
        f"Alert Level: {direction} {price.currency}{threshold.price_level:.2f}\n"
        f"24h Change: {price.change_pct_24h:+.2f}%\n"
        f"Timestamp: {price.timestamp}"
    )
 
def send_email_alert(
    subject: str,
    body: str,
    to_address: str
) -> None:
    msg = MIMEMultipart()
    msg["From"] = os.getenv("SMTP_USER")
    msg["To"] = to_address
    msg["Subject"] = subject
    msg.attach(MIMEText(body, "plain"))
    
    try:
        with smtplib.SMTP(
            os.getenv("SMTP_SERVER"),
            int(os.getenv("SMTP_PORT"))
        ) as server:
            server.starttls()
            server.login(os.getenv("SMTP_USER"), os.getenv("SMTP_PASSWORD"))
            server.send_message(msg)
        logger.info(f"Email alert sent: {subject}")
    except smtplib.SMTPException as e:
        logger.error(f"Failed to send email alert: {e}")
 
def send_slack_alert(message: str) -> None:
    webhook_url = os.getenv("SLACK_WEBHOOK_URL")
    if not webhook_url:
        return
    
    try:
        response = requests.post(
            webhook_url,
            json={"text": message},
            timeout=5
        )
        response.raise_for_status()
        logger.info("Slack alert sent")
    except requests.RequestException as e:
        logger.error(f"Failed to send Slack alert: {e}")
 
def dispatch_alerts(
    triggered: list[tuple[AlertThreshold, EnergyPrice]]
) -> None:
    """Send all triggered alerts via email and Slack."""
    for threshold, price in triggered:
        message = format_alert_message(threshold, price)
        direction = threshold.condition.upper()
        subject = (
            f"Energy Alert: {price.commodity.replace('_', ' ').upper()} "
            f"{direction} ${threshold.price_level:.0f}"
        )
        send_email_alert(subject, message, os.getenv("ALERT_EMAIL"))
        send_slack_alert(message)

Step 5: Weekly Report Generation

import pandas as pd
from io import StringIO
 
def generate_weekly_report(commodities: list[str]) -> str:
    """Generate a markdown-formatted weekly price summary."""
    lines = [
        "# Weekly Energy Price Report",
        f"Generated: {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}",
        ""
    ]
    
    for commodity in commodities:
        history = get_price_history(commodity, days=7)
        if not history:
            continue
        
        df = pd.DataFrame(history)
        df["fetched_at"] = pd.to_datetime(df["fetched_at"])
        df = df.set_index("fetched_at")
        
        latest_price = df["price"].iloc[-1]
        week_open = df["price"].iloc[0]
        week_high = df["price"].max()
        week_low = df["price"].min()
        week_change_pct = (latest_price - week_open) / week_open * 100
        
        currency = history[0]["currency"]
        unit = history[0]["unit"]
        
        lines.extend([
            f"## {commodity.replace('_', ' ').upper()}",
            f"- Current: {currency}{latest_price:.2f}/{unit}",
            f"- Week Open: {currency}{week_open:.2f}",
            f"- Week High: {currency}{week_high:.2f}",
            f"- Week Low: {currency}{week_low:.2f}",
            f"- Week Change: {week_change_pct:+.2f}%",
            ""
        ])
    
    return "\n".join(lines)

Step 6: Main Monitoring Loop

import schedule
 
def monitoring_cycle(
    client: EnergyVolatilityClient,
    thresholds: list[AlertThreshold]
) -> list[AlertThreshold]:
    """Run one monitoring cycle. Returns updated thresholds."""
    logger.info("Starting monitoring cycle")
    
    prices = client.get_multiple_prices(MONITORED_COMMODITIES)
    
    if not prices:
        logger.warning("No prices fetched in this cycle")
        return thresholds
    
    store_prices(prices)
    
    for commodity, price in prices.items():
        logger.info(
            f"{commodity}: {price.currency}{price.price:.2f}/{price.unit} "
            f"({price.change_pct_24h:+.2f}% 24h)"
        )
    
    triggered, updated_thresholds = check_thresholds(prices, thresholds)
    
    if triggered:
        logger.info(f"{len(triggered)} alert(s) triggered")
        dispatch_alerts(triggered)
    
    return updated_thresholds
 
def main():
    initialize_database()
    
    api_key = os.getenv("ENERGY_API_KEY")
    if not api_key:
        raise ValueError("ENERGY_API_KEY environment variable not set")
    
    client = EnergyVolatilityClient(api_key)
    thresholds = DEFAULT_THRESHOLDS.copy()
    
    # Mutable container for thresholds (needed for closure in scheduled job)
    state = {"thresholds": thresholds}
    
    def run_cycle():
        state["thresholds"] = monitoring_cycle(client, state["thresholds"])
    
    def run_weekly_report():
        report = generate_weekly_report(MONITORED_COMMODITIES)
        send_email_alert(
            subject="Weekly Energy Price Report",
            body=report,
            to_address=os.getenv("ALERT_EMAIL")
        )
        logger.info("Weekly report sent")
    
    # Run immediately on startup
    run_cycle()
    
    # Schedule recurring jobs
    schedule.every(15).minutes.do(run_cycle)
    schedule.every().monday.at("08:00").do(run_weekly_report)
    
    logger.info("Energy price monitor running. Checking every 15 minutes.")
    
    while True:
        schedule.run_pending()
        time.sleep(30)
 
if __name__ == "__main__":
    main()

Deploying as a Background Service

For production deployment, run this as a system service using systemd (Linux) or as a Windows Service:

# /etc/systemd/system/energy-monitor.service
[Unit]
Description=Energy Price Monitor
After=network.target
 
[Service]
Type=simple
User=appuser
WorkingDirectory=/opt/energy-monitor
EnvironmentFile=/opt/energy-monitor/.env
ExecStart=/opt/energy-monitor/venv/bin/python main.py
Restart=always
RestartSec=30
 
[Install]
WantedBy=multi-user.target

Enable and start: systemctl enable energy-monitor && systemctl start energy-monitor

Adding a Price Dashboard

Extend the system with a simple price dashboard using Flask:

from flask import Flask, jsonify, render_template_string
import sqlite3
 
app = Flask(__name__)
 
@app.route("/api/latest")
def get_latest_prices():
    """Return the most recent price for each monitored commodity."""
    with get_db_connection() as conn:
        rows = conn.execute("""
            SELECT p.* FROM price_history p
            INNER JOIN (
                SELECT commodity, MAX(fetched_at) as max_time
                FROM price_history
                GROUP BY commodity
            ) latest ON p.commodity = latest.commodity 
                     AND p.fetched_at = latest.max_time
        """).fetchall()
    return jsonify([dict(row) for row in rows])
 
@app.route("/api/history/<commodity>")
def get_history(commodity: str):
    history = get_price_history(commodity, days=7)
    return jsonify(history)

Expected Output

When running, the system produces structured logs like:

2026-04-19 06:00:01 - INFO - Starting monitoring cycle
2026-04-19 06:00:02 - INFO - brent_crude: $151.40/bbl (+2.30% 24h)
2026-04-19 06:00:03 - INFO - wti_crude: $148.20/bbl (+2.10% 24h)
2026-04-19 06:00:04 - INFO - natural_gas: $7.85/MMBtu (+0.90% 24h)
2026-04-19 06:00:05 - INFO - eu_power: $198.50/MWh (-0.50% 24h)
2026-04-19 06:00:06 - INFO - us_power: $115.30/MWh (+1.20% 24h)
2026-04-19 06:00:06 - INFO - Stored 5 price records at 2026-04-19T06:00:06
2026-04-19 06:00:06 - INFO - 1 alert(s) triggered
2026-04-19 06:00:07 - INFO - Email alert sent: Energy Alert: BRENT CRUDE ABOVE $150
2026-04-19 06:00:07 - INFO - Slack alert sent

What This System Enables

With this monitoring system running, your procurement and risk management teams gain:

Early warning on price spikes — a 15-minute alert window before your competitors see the same data in their morning reports can translate to locking in contracts at materially better prices.

Historical trend analysis — the SQLite database accumulates months of intraday price data, enabling you to model correlations between energy prices and your operational costs with precision.

Threshold-based decision support — configurable alert levels mean you define your own risk tolerance, not a vendor's default. At $130/bbl you might increase inventory; at $155/bbl you might activate hedging protocols.

Audit trail for procurement decisions — timestamped price records provide the documentation trail needed to explain procurement decisions to finance and audit stakeholders.

Get your Energy Volatility API key and deploy this monitoring system today. In the current energy price environment, automated monitoring is not optional — it is the infrastructure for every procurement decision your team makes.