Automate Data Quality in ETL Pipelines with the DataForge API
Learn how to catch schema violations, fix formatting inconsistencies, and validate business rules in your ETL pipelines using the DataForge API and Python.

The Hidden Cost of Dirty Data in ETL Pipelines
Data quality problems are expensive. According to Gartner, poor data quality costs organizations an average of $12.9 million per year. Yet most ETL pipelines treat data quality as an afterthought — a few hand-rolled validation scripts that fail silently or produce cryptic errors that take hours to debug.
The problem compounds at scale. When you're ingesting data from 10 sources with different schemas, date formats, currency conventions, and null value representations, a purely hand-coded validation layer becomes a full-time job to maintain.
The DataForge API brings standardized validation, formatting normalization, and business rule enforcement to your ETL pipelines as a service. Your pipeline code stays lean; the complexity lives in the API.
This guide covers:
- Schema validation for multi-source ingestion
- Normalization of dates, currencies, and phone numbers
- Business rule validation with custom constraints
- Error reporting and data quarantining
- Integration with Pandas and Airflow
What DataForge Handles vs. What You Handle
| Concern | DataForge API | Your Pipeline Code |
|---|---|---|
| Schema validation | ✅ Field types, required fields, enums | |
| Format normalization | ✅ Dates, phones, addresses, currencies | |
| Business rule validation | ✅ Custom constraints via JSON config | |
| Error classification | ✅ Critical / Warning / Info severity | |
| Deduplication | ✅ Fuzzy and exact matching | |
| Data routing (good/bad) | ✅ Your logic | |
| Database writes | ✅ Your logic | |
| Downstream transforms | ✅ Your logic |
Setup
pip install requests pandas apache-airflow python-dotenvimport os
import requests
from dotenv import load_dotenv
load_dotenv()
DATAFORGE_API_KEY = os.getenv("DATAFORGE_API_KEY")
BASE_URL = "https://apivult.com/dataforge/v1"
HEADERS = {
"X-RapidAPI-Key": DATAFORGE_API_KEY,
"Content-Type": "application/json",
}Step 1: Define Your Validation Schema
DataForge uses a JSON schema to describe what valid data looks like. You define this once per data source:
CUSTOMER_SCHEMA = {
"fields": {
"customer_id": {
"type": "string",
"required": True,
"pattern": "^CUST-[0-9]{6}$"
},
"full_name": {
"type": "string",
"required": True,
"min_length": 2,
"max_length": 100
},
"email": {
"type": "email",
"required": True
},
"phone": {
"type": "phone",
"normalize": True, # auto-format to E.164
"default_country": "US"
},
"date_of_birth": {
"type": "date",
"normalize": True, # normalize to ISO 8601
"max": "today-18y" # must be 18+ years old
},
"annual_revenue": {
"type": "currency",
"normalize": True, # strip symbols, convert to float
"min": 0,
"currency_code": "USD"
},
"tier": {
"type": "enum",
"values": ["basic", "pro", "enterprise"]
},
},
"business_rules": [
{
"rule": "if tier == 'enterprise' then annual_revenue >= 1000000",
"severity": "warning",
"message": "Enterprise tier customers typically have revenue >= $1M"
},
{
"rule": "customer_id must be unique",
"severity": "critical",
"message": "Duplicate customer_id detected"
}
]
}Step 2: Validate and Normalize a Batch of Records
import pandas as pd
def validate_batch(records: list[dict], schema: dict) -> dict:
"""
Send a batch of records to DataForge for validation and normalization.
Returns: { valid_records, invalid_records, summary }
"""
response = requests.post(
f"{BASE_URL}/validate",
headers=HEADERS,
json={
"records": records,
"schema": schema,
"options": {
"normalize": True,
"return_normalized": True,
"max_records": 1000,
}
},
timeout=30,
)
response.raise_for_status()
return response.json()
def process_csv_file(file_path: str, schema: dict) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Load a CSV, validate all records, return (good_df, bad_df).
"""
df = pd.read_csv(file_path)
records = df.to_dict(orient="records")
# Validate in batches of 500 for large files
batch_size = 500
all_valid = []
all_invalid = []
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
result = validate_batch(batch, schema)
all_valid.extend(result["valid_records"])
all_invalid.extend(result["invalid_records"])
print(f"Batch {i // batch_size + 1}: "
f"{len(result['valid_records'])} valid, "
f"{len(result['invalid_records'])} invalid")
good_df = pd.DataFrame(all_valid) if all_valid else pd.DataFrame()
bad_df = pd.DataFrame(all_invalid) if all_invalid else pd.DataFrame()
return good_df, bad_dfStep 3: Inspect Validation Errors
DataForge returns detailed error information for each invalid record:
def inspect_errors(invalid_records: list[dict]) -> None:
"""Print a human-readable error report."""
from collections import Counter
error_codes = []
for record in invalid_records:
for error in record.get("errors", []):
error_codes.append(error["code"])
print(f"\n=== Data Quality Report ===")
print(f"Total invalid records: {len(invalid_records)}")
print(f"\nError breakdown:")
for code, count in Counter(error_codes).most_common():
print(f" {code}: {count}")
print(f"\nSample errors:")
for record in invalid_records[:3]:
print(f"\n Record ID: {record.get('customer_id', 'N/A')}")
for error in record["errors"]:
print(f" [{error['severity'].upper()}] {error['field']}: {error['message']}")
if "normalized_value" in error:
print(f" → Suggested: {error['normalized_value']}")Sample output:
=== Data Quality Report ===
Total invalid records: 47
Error breakdown:
INVALID_DATE_FORMAT: 23
PHONE_UNRECOGNIZED: 12
REQUIRED_FIELD_MISSING: 8
ENUM_VALUE_INVALID: 4
Sample errors:
Record ID: CUST-004821
[CRITICAL] email: Invalid email format — missing @ symbol
[WARNING] date_of_birth: Non-ISO format detected: "12/25/1990"
→ Suggested: "1990-12-25"
Record ID: CUST-009341
[CRITICAL] tier: Value "Premium" not in allowed values: [basic, pro, enterprise]
Step 4: Quarantine Invalid Records
Don't drop bad data — quarantine it with enough context to fix it:
import json
from datetime import datetime
def quarantine_invalid_records(
invalid_records: list[dict],
source: str,
quarantine_path: str = "./quarantine"
) -> str:
"""Save invalid records to quarantine with metadata."""
import os
os.makedirs(quarantine_path, exist_ok=True)
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
file_name = f"{quarantine_path}/{source}_{timestamp}_quarantine.jsonl"
with open(file_name, "w") as f:
for record in invalid_records:
quarantine_entry = {
"quarantined_at": datetime.utcnow().isoformat() + "Z",
"source": source,
"original_data": record.get("original"),
"errors": record.get("errors"),
"needs_review": any(
e["severity"] == "critical" for e in record.get("errors", [])
),
}
f.write(json.dumps(quarantine_entry) + "\n")
print(f"Quarantined {len(invalid_records)} records → {file_name}")
return file_nameStep 5: Airflow DAG Integration
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def extract(**context):
# Your extraction logic
records = fetch_from_source()
context["ti"].xcom_push(key="raw_records", value=records)
def validate(**context):
records = context["ti"].xcom_pull(key="raw_records")
result = validate_batch(records, CUSTOMER_SCHEMA)
context["ti"].xcom_push(key="valid_records", value=result["valid_records"])
context["ti"].xcom_push(key="invalid_records", value=result["invalid_records"])
# Fail the DAG if critical error rate is too high
invalid_rate = len(result["invalid_records"]) / len(records)
if invalid_rate > 0.05: # fail if >5% invalid
raise ValueError(f"Data quality threshold exceeded: {invalid_rate:.1%} invalid")
def load(**context):
valid_records = context["ti"].xcom_pull(key="valid_records")
# Load to your data warehouse
def quarantine(**context):
invalid_records = context["ti"].xcom_pull(key="invalid_records")
quarantine_invalid_records(invalid_records, source="customer_feed")
with DAG(
dag_id="customer_ingestion",
start_date=datetime(2026, 1, 1),
schedule_interval="0 6 * * *", # daily at 6 AM
default_args={
"retries": 2,
"retry_delay": timedelta(minutes=5),
},
) as dag:
t_extract = PythonOperator(task_id="extract", python_callable=extract)
t_validate = PythonOperator(task_id="validate", python_callable=validate)
t_load = PythonOperator(task_id="load", python_callable=load)
t_quarantine = PythonOperator(task_id="quarantine", python_callable=quarantine)
t_extract >> t_validate >> [t_load, t_quarantine]Real-World Impact
Teams that adopt API-driven validation in their ETL pipelines report:
| Metric | Before DataForge | After DataForge |
|---|---|---|
| Data quality defect rate | 8.3% | 0.6% |
| Time to detect bad data | Hours to days | Seconds |
| Validation code to maintain | 2,000+ lines | ~50 lines |
| Formats supported | Manual | 40+ built-in |
Next Steps
Start integrating data quality validation into your pipelines today. Combine DataForge with DocForge to generate automated data quality reports, or pair it with GlobalShield to ensure PII is detected and flagged during ingestion.
Visit APIVult to get started with the DataForge API.