Education

Automate Data Quality in ETL Pipelines with the DataForge API

Learn how to catch schema violations, fix formatting inconsistencies, and validate business rules in your ETL pipelines using the DataForge API and Python.

Automate Data Quality in ETL Pipelines with the DataForge API

The Hidden Cost of Dirty Data in ETL Pipelines

Data quality problems are expensive. According to Gartner, poor data quality costs organizations an average of $12.9 million per year. Yet most ETL pipelines treat data quality as an afterthought — a few hand-rolled validation scripts that fail silently or produce cryptic errors that take hours to debug.

The problem compounds at scale. When you're ingesting data from 10 sources with different schemas, date formats, currency conventions, and null value representations, a purely hand-coded validation layer becomes a full-time job to maintain.

The DataForge API brings standardized validation, formatting normalization, and business rule enforcement to your ETL pipelines as a service. Your pipeline code stays lean; the complexity lives in the API.

This guide covers:

  • Schema validation for multi-source ingestion
  • Normalization of dates, currencies, and phone numbers
  • Business rule validation with custom constraints
  • Error reporting and data quarantining
  • Integration with Pandas and Airflow

What DataForge Handles vs. What You Handle

ConcernDataForge APIYour Pipeline Code
Schema validation✅ Field types, required fields, enums
Format normalization✅ Dates, phones, addresses, currencies
Business rule validation✅ Custom constraints via JSON config
Error classification✅ Critical / Warning / Info severity
Deduplication✅ Fuzzy and exact matching
Data routing (good/bad)✅ Your logic
Database writes✅ Your logic
Downstream transforms✅ Your logic

Setup

pip install requests pandas apache-airflow python-dotenv
import os
import requests
from dotenv import load_dotenv
 
load_dotenv()
 
DATAFORGE_API_KEY = os.getenv("DATAFORGE_API_KEY")
BASE_URL = "https://apivult.com/dataforge/v1"
HEADERS = {
    "X-RapidAPI-Key": DATAFORGE_API_KEY,
    "Content-Type": "application/json",
}

Step 1: Define Your Validation Schema

DataForge uses a JSON schema to describe what valid data looks like. You define this once per data source:

CUSTOMER_SCHEMA = {
    "fields": {
        "customer_id": {
            "type": "string",
            "required": True,
            "pattern": "^CUST-[0-9]{6}$"
        },
        "full_name": {
            "type": "string",
            "required": True,
            "min_length": 2,
            "max_length": 100
        },
        "email": {
            "type": "email",
            "required": True
        },
        "phone": {
            "type": "phone",
            "normalize": True,       # auto-format to E.164
            "default_country": "US"
        },
        "date_of_birth": {
            "type": "date",
            "normalize": True,       # normalize to ISO 8601
            "max": "today-18y"       # must be 18+ years old
        },
        "annual_revenue": {
            "type": "currency",
            "normalize": True,       # strip symbols, convert to float
            "min": 0,
            "currency_code": "USD"
        },
        "tier": {
            "type": "enum",
            "values": ["basic", "pro", "enterprise"]
        },
    },
    "business_rules": [
        {
            "rule": "if tier == 'enterprise' then annual_revenue >= 1000000",
            "severity": "warning",
            "message": "Enterprise tier customers typically have revenue >= $1M"
        },
        {
            "rule": "customer_id must be unique",
            "severity": "critical",
            "message": "Duplicate customer_id detected"
        }
    ]
}

Step 2: Validate and Normalize a Batch of Records

import pandas as pd
 
def validate_batch(records: list[dict], schema: dict) -> dict:
    """
    Send a batch of records to DataForge for validation and normalization.
    Returns: { valid_records, invalid_records, summary }
    """
    response = requests.post(
        f"{BASE_URL}/validate",
        headers=HEADERS,
        json={
            "records": records,
            "schema": schema,
            "options": {
                "normalize": True,
                "return_normalized": True,
                "max_records": 1000,
            }
        },
        timeout=30,
    )
    response.raise_for_status()
    return response.json()
 
 
def process_csv_file(file_path: str, schema: dict) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Load a CSV, validate all records, return (good_df, bad_df).
    """
    df = pd.read_csv(file_path)
    records = df.to_dict(orient="records")
 
    # Validate in batches of 500 for large files
    batch_size = 500
    all_valid = []
    all_invalid = []
 
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        result = validate_batch(batch, schema)
 
        all_valid.extend(result["valid_records"])
        all_invalid.extend(result["invalid_records"])
 
        print(f"Batch {i // batch_size + 1}: "
              f"{len(result['valid_records'])} valid, "
              f"{len(result['invalid_records'])} invalid")
 
    good_df = pd.DataFrame(all_valid) if all_valid else pd.DataFrame()
    bad_df = pd.DataFrame(all_invalid) if all_invalid else pd.DataFrame()
 
    return good_df, bad_df

Step 3: Inspect Validation Errors

DataForge returns detailed error information for each invalid record:

def inspect_errors(invalid_records: list[dict]) -> None:
    """Print a human-readable error report."""
    from collections import Counter
 
    error_codes = []
    for record in invalid_records:
        for error in record.get("errors", []):
            error_codes.append(error["code"])
 
    print(f"\n=== Data Quality Report ===")
    print(f"Total invalid records: {len(invalid_records)}")
    print(f"\nError breakdown:")
    for code, count in Counter(error_codes).most_common():
        print(f"  {code}: {count}")
 
    print(f"\nSample errors:")
    for record in invalid_records[:3]:
        print(f"\n  Record ID: {record.get('customer_id', 'N/A')}")
        for error in record["errors"]:
            print(f"    [{error['severity'].upper()}] {error['field']}: {error['message']}")
            if "normalized_value" in error:
                print(f"    → Suggested: {error['normalized_value']}")

Sample output:

=== Data Quality Report ===
Total invalid records: 47

Error breakdown:
  INVALID_DATE_FORMAT: 23
  PHONE_UNRECOGNIZED: 12
  REQUIRED_FIELD_MISSING: 8
  ENUM_VALUE_INVALID: 4

Sample errors:

  Record ID: CUST-004821
    [CRITICAL] email: Invalid email format — missing @ symbol
    [WARNING] date_of_birth: Non-ISO format detected: "12/25/1990"
    → Suggested: "1990-12-25"

  Record ID: CUST-009341
    [CRITICAL] tier: Value "Premium" not in allowed values: [basic, pro, enterprise]

Step 4: Quarantine Invalid Records

Don't drop bad data — quarantine it with enough context to fix it:

import json
from datetime import datetime
 
def quarantine_invalid_records(
    invalid_records: list[dict],
    source: str,
    quarantine_path: str = "./quarantine"
) -> str:
    """Save invalid records to quarantine with metadata."""
    import os
    os.makedirs(quarantine_path, exist_ok=True)
 
    timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
    file_name = f"{quarantine_path}/{source}_{timestamp}_quarantine.jsonl"
 
    with open(file_name, "w") as f:
        for record in invalid_records:
            quarantine_entry = {
                "quarantined_at": datetime.utcnow().isoformat() + "Z",
                "source": source,
                "original_data": record.get("original"),
                "errors": record.get("errors"),
                "needs_review": any(
                    e["severity"] == "critical" for e in record.get("errors", [])
                ),
            }
            f.write(json.dumps(quarantine_entry) + "\n")
 
    print(f"Quarantined {len(invalid_records)} records → {file_name}")
    return file_name

Step 5: Airflow DAG Integration

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
 
def extract(**context):
    # Your extraction logic
    records = fetch_from_source()
    context["ti"].xcom_push(key="raw_records", value=records)
 
def validate(**context):
    records = context["ti"].xcom_pull(key="raw_records")
    result = validate_batch(records, CUSTOMER_SCHEMA)
 
    context["ti"].xcom_push(key="valid_records", value=result["valid_records"])
    context["ti"].xcom_push(key="invalid_records", value=result["invalid_records"])
 
    # Fail the DAG if critical error rate is too high
    invalid_rate = len(result["invalid_records"]) / len(records)
    if invalid_rate > 0.05:  # fail if >5% invalid
        raise ValueError(f"Data quality threshold exceeded: {invalid_rate:.1%} invalid")
 
def load(**context):
    valid_records = context["ti"].xcom_pull(key="valid_records")
    # Load to your data warehouse
 
def quarantine(**context):
    invalid_records = context["ti"].xcom_pull(key="invalid_records")
    quarantine_invalid_records(invalid_records, source="customer_feed")
 
with DAG(
    dag_id="customer_ingestion",
    start_date=datetime(2026, 1, 1),
    schedule_interval="0 6 * * *",  # daily at 6 AM
    default_args={
        "retries": 2,
        "retry_delay": timedelta(minutes=5),
    },
) as dag:
    t_extract = PythonOperator(task_id="extract", python_callable=extract)
    t_validate = PythonOperator(task_id="validate", python_callable=validate)
    t_load = PythonOperator(task_id="load", python_callable=load)
    t_quarantine = PythonOperator(task_id="quarantine", python_callable=quarantine)
 
    t_extract >> t_validate >> [t_load, t_quarantine]

Real-World Impact

Teams that adopt API-driven validation in their ETL pipelines report:

MetricBefore DataForgeAfter DataForge
Data quality defect rate8.3%0.6%
Time to detect bad dataHours to daysSeconds
Validation code to maintain2,000+ lines~50 lines
Formats supportedManual40+ built-in

Next Steps

Start integrating data quality validation into your pipelines today. Combine DataForge with DocForge to generate automated data quality reports, or pair it with GlobalShield to ensure PII is detected and flagged during ingestion.

Visit APIVult to get started with the DataForge API.