Build a Real-Time Data Quality Dashboard with the DataForge API
Learn how to monitor, validate, and visualize data quality metrics in real time using the DataForge API and Python. Includes a complete dashboard example.

Bad data is expensive. A 2024 Gartner study estimated that poor data quality costs organizations an average of $12.9 million per year. Yet most teams only discover data quality issues after they've already damaged reports, broken pipelines, or triggered incorrect business decisions.
The fix isn't just validation at ingestion — it's continuous, real-time monitoring that surfaces anomalies as they happen. This guide shows you how to build a data quality dashboard using the DataForge API that tracks validation scores, flags anomalies, and gives your team instant visibility into the health of your data.
What We're Building
By the end of this tutorial, you'll have:
- A Python data quality monitoring service that validates data in real time
- A FastAPI-based dashboard endpoint serving quality metrics
- Automated alerting when data quality drops below a threshold
- A simple HTML dashboard that displays live quality scores
Why Use an API for Data Quality?
The DataForge API handles the heavy lifting of data validation, cleaning, and formatting. Instead of writing custom validation rules for every data type, you send raw data and get back:
- Validation scores — how clean is this data on a 0-100 scale
- Anomaly flags — which fields have unusual values, missing data, or format violations
- Cleaned output — corrected, normalized data ready for downstream use
- Field-level stats — completeness, uniqueness, format compliance per column
This means your monitoring service stays lean and focused on orchestration rather than reinventing validation logic.
Step 1: Set Up the DataForge Client
import httpx
from typing import Any
from dataclasses import dataclass
DATAFORGE_API_KEY = "YOUR_API_KEY"
DATAFORGE_BASE_URL = "https://apivult.com/dataforge/v1"
@dataclass
class ValidationResult:
overall_score: float
field_scores: dict[str, float]
anomalies: list[dict]
cleaned_data: dict
raw_response: dict
def validate_record(record: dict, schema: dict = None) -> ValidationResult:
"""
Validate a single data record using the DataForge API.
Args:
record: The data record to validate (as a dict)
schema: Optional schema definition for stricter validation
Returns:
ValidationResult with scores, anomalies, and cleaned data
"""
payload = {"data": record}
if schema:
payload["schema"] = schema
response = httpx.post(
f"{DATAFORGE_BASE_URL}/validate",
headers={
"X-RapidAPI-Key": DATAFORGE_API_KEY,
"Content-Type": "application/json"
},
json=payload,
timeout=10
)
response.raise_for_status()
data = response.json()
return ValidationResult(
overall_score=data.get("quality_score", 0),
field_scores=data.get("field_scores", {}),
anomalies=data.get("anomalies", []),
cleaned_data=data.get("cleaned", record),
raw_response=data
)
def validate_batch(records: list[dict], schema: dict = None) -> list[ValidationResult]:
"""Validate a batch of records and return per-record results."""
response = httpx.post(
f"{DATAFORGE_BASE_URL}/validate/batch",
headers={
"X-RapidAPI-Key": DATAFORGE_API_KEY,
"Content-Type": "application/json"
},
json={"records": records, "schema": schema},
timeout=30
)
response.raise_for_status()
results = response.json().get("results", [])
return [
ValidationResult(
overall_score=r.get("quality_score", 0),
field_scores=r.get("field_scores", {}),
anomalies=r.get("anomalies", []),
cleaned_data=r.get("cleaned", {}),
raw_response=r
)
for r in results
]Step 2: Build the Metrics Collector
The metrics collector aggregates validation results over time, computing rolling averages and trend data for the dashboard:
import time
from collections import deque
from threading import Lock
from statistics import mean
class DataQualityMetrics:
"""Thread-safe metrics store for data quality monitoring."""
def __init__(self, window_size: int = 1000):
self.window_size = window_size
self._lock = Lock()
self._scores: deque[float] = deque(maxlen=window_size)
self._anomalies: deque[dict] = deque(maxlen=window_size)
self._field_stats: dict[str, deque] = {}
self._timestamps: deque[float] = deque(maxlen=window_size)
self.total_validated = 0
self.total_anomalies = 0
def record(self, result: ValidationResult):
"""Add a validation result to the metrics store."""
with self._lock:
self._scores.append(result.overall_score)
self._timestamps.append(time.time())
self.total_validated += 1
for anomaly in result.anomalies:
self._anomalies.append({
**anomaly,
"timestamp": time.time()
})
self.total_anomalies += 1
for field, score in result.field_scores.items():
if field not in self._field_stats:
self._field_stats[field] = deque(maxlen=self.window_size)
self._field_stats[field].append(score)
def get_summary(self) -> dict:
"""Return a summary of current data quality metrics."""
with self._lock:
if not self._scores:
return {"status": "no_data"}
scores = list(self._scores)
recent_scores = scores[-50:] if len(scores) >= 50 else scores
return {
"overall_score": round(mean(scores), 2),
"recent_score": round(mean(recent_scores), 2),
"min_score": round(min(scores), 2),
"max_score": round(max(scores), 2),
"total_validated": self.total_validated,
"total_anomalies": self.total_anomalies,
"anomaly_rate": round(self.total_anomalies / max(self.total_validated, 1), 4),
"field_scores": {
field: round(mean(list(vals)), 2)
for field, vals in self._field_stats.items()
},
"recent_anomalies": list(self._anomalies)[-10:]
}
def is_healthy(self, threshold: float = 80.0) -> bool:
"""Check if recent data quality is above the threshold."""
with self._lock:
if not self._scores:
return True
recent = list(self._scores)[-20:]
return mean(recent) >= threshold
# Global metrics instance
metrics = DataQualityMetrics()Step 3: Create the Monitoring Service
The monitoring service validates incoming data and records metrics continuously:
import asyncio
import logging
logger = logging.getLogger("data_quality")
# Define your data schema for strict validation
CUSTOMER_SCHEMA = {
"fields": {
"email": {"type": "email", "required": True},
"phone": {"type": "phone", "required": False},
"name": {"type": "string", "min_length": 2, "max_length": 100},
"country_code": {"type": "iso_country", "required": True},
"signup_date": {"type": "date", "format": "ISO8601"}
}
}
QUALITY_THRESHOLD = 75.0 # Alert if rolling average drops below this
class DataQualityMonitor:
def __init__(
self,
schema: dict = None,
alert_threshold: float = QUALITY_THRESHOLD,
alert_webhook: str = None
):
self.schema = schema
self.alert_threshold = alert_threshold
self.alert_webhook = alert_webhook
self._consecutive_failures = 0
def process_record(self, record: dict) -> ValidationResult:
"""Validate a record and update metrics."""
result = validate_record(record, self.schema)
metrics.record(result)
if not metrics.is_healthy(self.alert_threshold):
self._consecutive_failures += 1
if self._consecutive_failures >= 5:
self._send_alert()
else:
self._consecutive_failures = 0
if result.overall_score < self.alert_threshold:
logger.warning(
f"Low quality record: score={result.overall_score:.1f}, "
f"anomalies={len(result.anomalies)}"
)
return result
def process_batch(self, records: list[dict]) -> list[ValidationResult]:
"""Validate a batch of records."""
results = validate_batch(records, self.schema)
for result in results:
metrics.record(result)
return results
def _send_alert(self):
"""Send an alert when data quality drops."""
summary = metrics.get_summary()
message = (
f"DATA QUALITY ALERT\n"
f"Rolling average: {summary['recent_score']}\n"
f"Threshold: {self.alert_threshold}\n"
f"Anomaly rate: {summary['anomaly_rate']:.2%}\n"
f"Recent anomalies: {summary['recent_anomalies']}"
)
logger.error(message)
if self.alert_webhook:
try:
httpx.post(self.alert_webhook, json={"text": message}, timeout=5)
except Exception as e:
logger.error(f"Failed to send alert: {e}")Step 4: Expose a Dashboard API with FastAPI
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse
import uvicorn
app = FastAPI(title="Data Quality Dashboard")
monitor = DataQualityMonitor(schema=CUSTOMER_SCHEMA)
@app.get("/metrics")
def get_metrics():
"""Return current data quality metrics as JSON."""
return metrics.get_summary()
@app.post("/validate")
def validate_data(record: dict):
"""Validate a single record and return the result."""
result = monitor.process_record(record)
return {
"score": result.overall_score,
"anomalies": result.anomalies,
"field_scores": result.field_scores,
"cleaned": result.cleaned_data
}
@app.get("/dashboard", response_class=HTMLResponse)
def dashboard():
"""Simple HTML dashboard for data quality monitoring."""
return """
<!DOCTYPE html>
<html>
<head>
<title>Data Quality Dashboard</title>
<style>
body { font-family: monospace; background: #111; color: #0f0; padding: 20px; }
.metric { background: #1a1a1a; padding: 15px; margin: 10px 0; border-radius: 5px; }
.good { color: #0f0; } .warn { color: #ff0; } .bad { color: #f00; }
h1 { color: #fff; }
</style>
<script>
async function refresh() {
const res = await fetch('/metrics');
const data = await res.json();
const score = data.recent_score || 0;
const cls = score >= 80 ? 'good' : score >= 60 ? 'warn' : 'bad';
document.getElementById('content').innerHTML = `
<div class="metric">
<b>Rolling Score (last 50):</b>
<span class="${cls}">${score}/100</span>
</div>
<div class="metric">
<b>Overall Score:</b> ${data.overall_score}/100
</div>
<div class="metric">
<b>Total Validated:</b> ${data.total_validated}
</div>
<div class="metric">
<b>Anomaly Rate:</b> ${(data.anomaly_rate * 100).toFixed(2)}%
</div>
<div class="metric">
<b>Field Scores:</b><pre>${JSON.stringify(data.field_scores, null, 2)}</pre>
</div>
`;
}
setInterval(refresh, 5000);
refresh();
</script>
</head>
<body>
<h1>Data Quality Monitor</h1>
<div id="content">Loading...</div>
</body>
</html>
"""
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)Step 5: Feed Real Data into the Monitor
Integrate the monitor into your data pipeline. Here's an example processing records from a CSV file:
import csv
def process_csv_file(filepath: str, batch_size: int = 50):
"""Process a CSV file and validate all records."""
with open(filepath, newline="") as f:
reader = csv.DictReader(f)
batch = []
for i, row in enumerate(reader):
batch.append(dict(row))
if len(batch) >= batch_size:
results = monitor.process_batch(batch)
avg_score = mean(r.overall_score for r in results)
print(f"Batch {i // batch_size + 1}: avg score = {avg_score:.1f}")
batch = []
# Process remaining records
if batch:
monitor.process_batch(batch)
print("\nFinal metrics:")
summary = metrics.get_summary()
print(f" Overall score: {summary['overall_score']}")
print(f" Anomaly rate: {summary['anomaly_rate']:.2%}")
print(f" Records validated: {summary['total_validated']}")What the Dashboard Tells You
Once running, the dashboard gives you instant answers to questions like:
- Is our data quality trending up or down? The rolling 50-record score shows momentum, not just current state.
- Which fields are the problem? Field-level scores pinpoint exactly where data quality breaks down — is it the email field? The phone number format? The country code?
- How often are anomalies occurring? Anomaly rate trends reveal whether a data quality issue is growing or stabilizing.
- What do recent anomalies look like? The last 10 anomalies give your team concrete examples to investigate.
Expected Results
In a typical production deployment:
- Processing latency: 50-150ms per record via the DataForge API
- Batch validation: up to 500 records per request
- Dashboard refresh interval: 5 seconds (configurable)
- Alert lag: under 30 seconds from quality drop to notification
Teams using continuous data quality monitoring report catching data pipeline issues 4-6x faster than teams relying on daily batch reports.
Next Steps
Get started with the DataForge API at apivult.com. The free tier is sufficient to validate this setup against your data. For production deployments, the Pro tier adds higher rate limits, extended history, and dedicated support.
Once your monitoring is in place, consider extending it with:
- Historical trend charts (store metrics in PostgreSQL or InfluxDB)
- Per-source quality tracking (different scores per data source)
- Automated data cleaning pipelines using DataForge's correction output
More Articles
How to Automate Data Validation and Cleaning in Python (2026 Guide)
Stop wasting hours on manual data cleaning. Learn how to automate validation, deduplication, and formatting with the DataForge API using Python in under 30 minutes.
March 30, 2026
Automate Data Quality in ETL Pipelines with the DataForge API
Learn how to catch schema violations, fix formatting inconsistencies, and validate business rules in your ETL pipelines using the DataForge API and Python.
March 31, 2026