How to Build a Real-Time Energy Price Monitoring System in Python with Energy Volatility API
Step-by-step Python guide to building an automated energy price monitoring and alert system. Covers Brent crude, natural gas, and power price tracking with threshold alerts and historical trend analysis.

In April 2026, physical Brent crude prices are trading above $150 per barrel and natural gas prices have hit multi-year highs. For procurement teams, CFOs, and operations managers with significant energy exposure, the difference between checking prices at 9 AM and having an automated alert fire at 6 AM can be a 5-10% swing in procurement cost.
In this guide, we will build a complete energy price monitoring system in Python using the Energy Volatility API. By the end, you will have a production-ready system that:
- Fetches real-time prices for Brent crude, WTI, natural gas, and power
- Calculates price change percentages against configurable baselines
- Triggers email and Slack alerts when prices cross defined thresholds
- Stores price history in a local SQLite database for trend analysis
- Generates weekly price reports as formatted summaries
The full system runs as a scheduled job — check prices every 15 minutes, alert immediately on threshold breaches, report weekly. Total implementation: under 200 lines of Python.
Prerequisites
pip install requests pandas matplotlib schedule python-dotenv smtplibCreate a .env file in your project directory:
ENERGY_API_KEY=YOUR_API_KEY
[email protected]
SMTP_SERVER=smtp.gmail.com
SMTP_PORT=587
[email protected]
SMTP_PASSWORD=your-app-password
SLACK_WEBHOOK_URL=https://hooks.slack.com/your-webhook-urlStep 1: API Client and Price Fetching
Start with a clean API client that handles authentication, rate limiting, and error handling:
import os
import time
import logging
from dataclasses import dataclass
from typing import Optional
import requests
from dotenv import load_dotenv
load_dotenv()
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class EnergyPrice:
commodity: str
price: float
currency: str
unit: str
timestamp: str
change_24h: float
change_pct_24h: float
class EnergyVolatilityClient:
BASE_URL = "https://apivult.com/api/energy-volatility"
def __init__(self, api_key: str):
self._session = requests.Session()
self._session.headers.update({
"X-RapidAPI-Key": api_key,
"Content-Type": "application/json"
})
self._last_request_time = 0.0
self._min_interval = 1.0 # rate limit: 1 request per second
def _throttle(self) -> None:
elapsed = time.time() - self._last_request_time
if elapsed < self._min_interval:
time.sleep(self._min_interval - elapsed)
self._last_request_time = time.time()
def get_price(self, commodity: str) -> Optional[EnergyPrice]:
"""Fetch current price for a commodity. Returns None on failure."""
self._throttle()
try:
response = self._session.get(
f"{self.BASE_URL}/price",
params={"commodity": commodity},
timeout=10
)
response.raise_for_status()
data = response.json()
return EnergyPrice(
commodity=commodity,
price=data["price"],
currency=data["currency"],
unit=data["unit"],
timestamp=data["timestamp"],
change_24h=data["change_24h"],
change_pct_24h=data["change_pct_24h"]
)
except requests.RequestException as e:
logger.error(f"Failed to fetch {commodity} price: {e}")
return None
def get_multiple_prices(self, commodities: list[str]) -> dict[str, EnergyPrice]:
"""Fetch prices for multiple commodities. Returns dict of commodity -> price."""
results = {}
for commodity in commodities:
price = self.get_price(commodity)
if price is not None:
results[commodity] = price
return results
# Commodities to monitor
MONITORED_COMMODITIES = [
"brent_crude", # Brent crude oil $/bbl
"wti_crude", # WTI crude oil $/bbl
"natural_gas", # Henry Hub natural gas $/MMBtu
"eu_power", # European power price €/MWh
"us_power", # US power price $/MWh
]Step 2: Price Storage with SQLite
Store price history locally for trend analysis and report generation:
import sqlite3
from contextlib import contextmanager
from datetime import datetime
DB_PATH = "energy_prices.db"
def initialize_database() -> None:
"""Create tables if they don't exist."""
with sqlite3.connect(DB_PATH) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS price_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
commodity TEXT NOT NULL,
price REAL NOT NULL,
currency TEXT NOT NULL,
unit TEXT NOT NULL,
change_24h REAL,
change_pct_24h REAL,
fetched_at TEXT NOT NULL
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_commodity_time
ON price_history (commodity, fetched_at)
""")
conn.commit()
@contextmanager
def get_db_connection():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
def store_prices(prices: dict[str, EnergyPrice]) -> None:
"""Store fetched prices in the database."""
fetched_at = datetime.utcnow().isoformat()
with get_db_connection() as conn:
conn.executemany(
"""INSERT INTO price_history
(commodity, price, currency, unit, change_24h, change_pct_24h, fetched_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
[
(p.commodity, p.price, p.currency, p.unit,
p.change_24h, p.change_pct_24h, fetched_at)
for p in prices.values()
]
)
conn.commit()
logger.info(f"Stored {len(prices)} price records at {fetched_at}")
def get_price_history(commodity: str, days: int = 7) -> list[dict]:
"""Retrieve price history for a commodity over the past N days."""
cutoff = datetime.utcnow().replace(
hour=0, minute=0, second=0
).isoformat()
with get_db_connection() as conn:
rows = conn.execute(
"""SELECT * FROM price_history
WHERE commodity = ? AND fetched_at >= ?
ORDER BY fetched_at ASC""",
(commodity, cutoff)
).fetchall()
return [dict(row) for row in rows]Step 3: Alert Configuration and Threshold Logic
Define threshold-based alerts with cooldown periods to prevent alert fatigue:
from dataclasses import dataclass, field
from typing import Callable
@dataclass
class AlertThreshold:
commodity: str
condition: str # "above" or "below"
price_level: float
cooldown_minutes: int = 60
last_triggered: Optional[datetime] = field(default=None, repr=False)
def should_trigger(self, current_price: float) -> bool:
"""Check if this threshold should fire given the current price."""
if self.condition == "above" and current_price <= self.price_level:
return False
if self.condition == "below" and current_price >= self.price_level:
return False
if self.last_triggered is not None:
elapsed = (datetime.utcnow() - self.last_triggered).total_seconds() / 60
if elapsed < self.cooldown_minutes:
return False
return True
def mark_triggered(self) -> "AlertThreshold":
"""Return new threshold with updated last_triggered timestamp."""
return AlertThreshold(
commodity=self.commodity,
condition=self.condition,
price_level=self.price_level,
cooldown_minutes=self.cooldown_minutes,
last_triggered=datetime.utcnow()
)
# Configure your alert thresholds
DEFAULT_THRESHOLDS = [
AlertThreshold("brent_crude", "above", 155.0, cooldown_minutes=120),
AlertThreshold("brent_crude", "below", 130.0, cooldown_minutes=120),
AlertThreshold("wti_crude", "above", 150.0, cooldown_minutes=120),
AlertThreshold("natural_gas", "above", 8.0, cooldown_minutes=60),
AlertThreshold("eu_power", "above", 200.0, cooldown_minutes=60),
AlertThreshold("us_power", "above", 120.0, cooldown_minutes=60),
]
def check_thresholds(
prices: dict[str, EnergyPrice],
thresholds: list[AlertThreshold]
) -> tuple[list[tuple[AlertThreshold, EnergyPrice]], list[AlertThreshold]]:
"""
Returns (triggered_alerts, updated_thresholds).
Triggered alerts is a list of (threshold, price) pairs.
Updated thresholds has last_triggered timestamps updated for fired alerts.
"""
triggered = []
updated_thresholds = []
for threshold in thresholds:
price = prices.get(threshold.commodity)
if price is not None and threshold.should_trigger(price.price):
triggered.append((threshold, price))
updated_thresholds.append(threshold.mark_triggered())
else:
updated_thresholds.append(threshold)
return triggered, updated_thresholdsStep 4: Alert Delivery (Email and Slack)
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import json
def format_alert_message(
threshold: AlertThreshold,
price: EnergyPrice
) -> str:
direction = "ABOVE" if threshold.condition == "above" else "BELOW"
return (
f"⚠️ ENERGY PRICE ALERT\n"
f"Commodity: {price.commodity.replace('_', ' ').upper()}\n"
f"Current Price: {price.currency}{price.price:.2f}/{price.unit}\n"
f"Alert Level: {direction} {price.currency}{threshold.price_level:.2f}\n"
f"24h Change: {price.change_pct_24h:+.2f}%\n"
f"Timestamp: {price.timestamp}"
)
def send_email_alert(
subject: str,
body: str,
to_address: str
) -> None:
msg = MIMEMultipart()
msg["From"] = os.getenv("SMTP_USER")
msg["To"] = to_address
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
try:
with smtplib.SMTP(
os.getenv("SMTP_SERVER"),
int(os.getenv("SMTP_PORT"))
) as server:
server.starttls()
server.login(os.getenv("SMTP_USER"), os.getenv("SMTP_PASSWORD"))
server.send_message(msg)
logger.info(f"Email alert sent: {subject}")
except smtplib.SMTPException as e:
logger.error(f"Failed to send email alert: {e}")
def send_slack_alert(message: str) -> None:
webhook_url = os.getenv("SLACK_WEBHOOK_URL")
if not webhook_url:
return
try:
response = requests.post(
webhook_url,
json={"text": message},
timeout=5
)
response.raise_for_status()
logger.info("Slack alert sent")
except requests.RequestException as e:
logger.error(f"Failed to send Slack alert: {e}")
def dispatch_alerts(
triggered: list[tuple[AlertThreshold, EnergyPrice]]
) -> None:
"""Send all triggered alerts via email and Slack."""
for threshold, price in triggered:
message = format_alert_message(threshold, price)
direction = threshold.condition.upper()
subject = (
f"Energy Alert: {price.commodity.replace('_', ' ').upper()} "
f"{direction} ${threshold.price_level:.0f}"
)
send_email_alert(subject, message, os.getenv("ALERT_EMAIL"))
send_slack_alert(message)Step 5: Weekly Report Generation
import pandas as pd
from io import StringIO
def generate_weekly_report(commodities: list[str]) -> str:
"""Generate a markdown-formatted weekly price summary."""
lines = [
"# Weekly Energy Price Report",
f"Generated: {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}",
""
]
for commodity in commodities:
history = get_price_history(commodity, days=7)
if not history:
continue
df = pd.DataFrame(history)
df["fetched_at"] = pd.to_datetime(df["fetched_at"])
df = df.set_index("fetched_at")
latest_price = df["price"].iloc[-1]
week_open = df["price"].iloc[0]
week_high = df["price"].max()
week_low = df["price"].min()
week_change_pct = (latest_price - week_open) / week_open * 100
currency = history[0]["currency"]
unit = history[0]["unit"]
lines.extend([
f"## {commodity.replace('_', ' ').upper()}",
f"- Current: {currency}{latest_price:.2f}/{unit}",
f"- Week Open: {currency}{week_open:.2f}",
f"- Week High: {currency}{week_high:.2f}",
f"- Week Low: {currency}{week_low:.2f}",
f"- Week Change: {week_change_pct:+.2f}%",
""
])
return "\n".join(lines)Step 6: Main Monitoring Loop
import schedule
def monitoring_cycle(
client: EnergyVolatilityClient,
thresholds: list[AlertThreshold]
) -> list[AlertThreshold]:
"""Run one monitoring cycle. Returns updated thresholds."""
logger.info("Starting monitoring cycle")
prices = client.get_multiple_prices(MONITORED_COMMODITIES)
if not prices:
logger.warning("No prices fetched in this cycle")
return thresholds
store_prices(prices)
for commodity, price in prices.items():
logger.info(
f"{commodity}: {price.currency}{price.price:.2f}/{price.unit} "
f"({price.change_pct_24h:+.2f}% 24h)"
)
triggered, updated_thresholds = check_thresholds(prices, thresholds)
if triggered:
logger.info(f"{len(triggered)} alert(s) triggered")
dispatch_alerts(triggered)
return updated_thresholds
def main():
initialize_database()
api_key = os.getenv("ENERGY_API_KEY")
if not api_key:
raise ValueError("ENERGY_API_KEY environment variable not set")
client = EnergyVolatilityClient(api_key)
thresholds = DEFAULT_THRESHOLDS.copy()
# Mutable container for thresholds (needed for closure in scheduled job)
state = {"thresholds": thresholds}
def run_cycle():
state["thresholds"] = monitoring_cycle(client, state["thresholds"])
def run_weekly_report():
report = generate_weekly_report(MONITORED_COMMODITIES)
send_email_alert(
subject="Weekly Energy Price Report",
body=report,
to_address=os.getenv("ALERT_EMAIL")
)
logger.info("Weekly report sent")
# Run immediately on startup
run_cycle()
# Schedule recurring jobs
schedule.every(15).minutes.do(run_cycle)
schedule.every().monday.at("08:00").do(run_weekly_report)
logger.info("Energy price monitor running. Checking every 15 minutes.")
while True:
schedule.run_pending()
time.sleep(30)
if __name__ == "__main__":
main()Deploying as a Background Service
For production deployment, run this as a system service using systemd (Linux) or as a Windows Service:
# /etc/systemd/system/energy-monitor.service
[Unit]
Description=Energy Price Monitor
After=network.target
[Service]
Type=simple
User=appuser
WorkingDirectory=/opt/energy-monitor
EnvironmentFile=/opt/energy-monitor/.env
ExecStart=/opt/energy-monitor/venv/bin/python main.py
Restart=always
RestartSec=30
[Install]
WantedBy=multi-user.targetEnable and start: systemctl enable energy-monitor && systemctl start energy-monitor
Adding a Price Dashboard
Extend the system with a simple price dashboard using Flask:
from flask import Flask, jsonify, render_template_string
import sqlite3
app = Flask(__name__)
@app.route("/api/latest")
def get_latest_prices():
"""Return the most recent price for each monitored commodity."""
with get_db_connection() as conn:
rows = conn.execute("""
SELECT p.* FROM price_history p
INNER JOIN (
SELECT commodity, MAX(fetched_at) as max_time
FROM price_history
GROUP BY commodity
) latest ON p.commodity = latest.commodity
AND p.fetched_at = latest.max_time
""").fetchall()
return jsonify([dict(row) for row in rows])
@app.route("/api/history/<commodity>")
def get_history(commodity: str):
history = get_price_history(commodity, days=7)
return jsonify(history)Expected Output
When running, the system produces structured logs like:
2026-04-19 06:00:01 - INFO - Starting monitoring cycle
2026-04-19 06:00:02 - INFO - brent_crude: $151.40/bbl (+2.30% 24h)
2026-04-19 06:00:03 - INFO - wti_crude: $148.20/bbl (+2.10% 24h)
2026-04-19 06:00:04 - INFO - natural_gas: $7.85/MMBtu (+0.90% 24h)
2026-04-19 06:00:05 - INFO - eu_power: $198.50/MWh (-0.50% 24h)
2026-04-19 06:00:06 - INFO - us_power: $115.30/MWh (+1.20% 24h)
2026-04-19 06:00:06 - INFO - Stored 5 price records at 2026-04-19T06:00:06
2026-04-19 06:00:06 - INFO - 1 alert(s) triggered
2026-04-19 06:00:07 - INFO - Email alert sent: Energy Alert: BRENT CRUDE ABOVE $150
2026-04-19 06:00:07 - INFO - Slack alert sent
What This System Enables
With this monitoring system running, your procurement and risk management teams gain:
Early warning on price spikes — a 15-minute alert window before your competitors see the same data in their morning reports can translate to locking in contracts at materially better prices.
Historical trend analysis — the SQLite database accumulates months of intraday price data, enabling you to model correlations between energy prices and your operational costs with precision.
Threshold-based decision support — configurable alert levels mean you define your own risk tolerance, not a vendor's default. At $130/bbl you might increase inventory; at $155/bbl you might activate hedging protocols.
Audit trail for procurement decisions — timestamped price records provide the documentation trail needed to explain procurement decisions to finance and audit stakeholders.
Get your Energy Volatility API key and deploy this monitoring system today. In the current energy price environment, automated monitoring is not optional — it is the infrastructure for every procurement decision your team makes.
More Articles
Best Energy Market Data APIs in 2026: Real-Time Prices, Forecasts & Volatility Feeds Compared
Compare the top energy market data APIs for 2026. Brent crude, WTI, natural gas, and power price feeds — reviewed for latency, coverage, pricing, and developer experience.
April 18, 2026
AI Data Centers Are Pushing Electricity Prices Higher — Energy Teams Are Responding
AI data centers push US electricity prices to $51/MWh in 2026. How energy teams adapt to surging demand and market volatility.
March 30, 2026