Education

How to Automate Data Validation and Cleaning in Python (2026 Guide)

Stop wasting hours on manual data cleaning. Learn how to automate validation, deduplication, and formatting with the DataForge API using Python in under 30 minutes.

How to Automate Data Validation and Cleaning in Python (2026 Guide)

Data quality problems cost businesses an estimated $12.9 million per year on average, according to Gartner. Yet most teams still clean data manually — writing one-off scripts, fixing the same format inconsistencies over and over, and discovering broken pipelines only after they've already polluted production databases.

The good news: automated data validation is no longer a luxury for enterprise teams with dedicated data engineering staff. In 2026, it's accessible via API in minutes.

This guide walks you through automating data validation and cleaning using the DataForge API — covering the most common use cases: schema validation, type coercion, deduplication, and format normalization.

Why Manual Data Cleaning Doesn't Scale

Here's a familiar scenario: you receive a CSV from a partner, a form submission from your frontend, or a webhook payload from a third-party service. Before you can use the data, you need to:

  • Validate that required fields are present
  • Coerce strings like "2026-03-30" to proper date objects
  • Standardize phone numbers from (555) 123-4567 to +15551234567
  • Remove duplicates introduced by retry logic
  • Flag rows with invalid email addresses, negative amounts, or out-of-range values

Writing this logic yourself is error-prone and doesn't travel well between projects. A validation rule you wrote for User model might miss edge cases that come up six months later. API-based validation gives you a maintained, versioned ruleset you can call from any language or platform.

Setting Up DataForge

First, get your API key from the APIVult dashboard. Then install the requests library if you haven't already:

pip install requests

Set your API key as an environment variable:

export APIVULT_API_KEY="YOUR_API_KEY"

Use Case 1: Schema Validation

The most common need: ensure incoming data matches an expected structure before processing it.

import os
import requests
 
API_KEY = os.environ["APIVULT_API_KEY"]
BASE_URL = "https://apivult.com/api/v1/dataforge"
 
def validate_record(record: dict, schema: dict) -> dict:
    """Validate a single record against a schema definition."""
    response = requests.post(
        f"{BASE_URL}/validate",
        headers={"X-API-Key": API_KEY},
        json={"data": record, "schema": schema},
    )
    response.raise_for_status()
    return response.json()
 
# Define your schema
user_schema = {
    "fields": {
        "email": {"type": "email", "required": True},
        "age": {"type": "integer", "min": 0, "max": 150},
        "signup_date": {"type": "date", "format": "YYYY-MM-DD"},
        "plan": {"type": "string", "enum": ["free", "basic", "pro"]},
    }
}
 
# Test with a problematic record
bad_record = {
    "email": "not-an-email",
    "age": -5,
    "signup_date": "30/03/2026",  # Wrong format
    "plan": "enterprise",          # Not in enum
}
 
result = validate_record(bad_record, user_schema)
print(result)
# {
#   "valid": false,
#   "errors": [
#     {"field": "email", "message": "Invalid email format"},
#     {"field": "age", "message": "Value must be >= 0"},
#     {"field": "signup_date", "message": "Expected format YYYY-MM-DD"},
#     {"field": "plan", "message": "Must be one of: free, basic, pro"}
#   ]
# }

Use Case 2: Bulk Cleaning and Normalization

For batch operations — processing CSV uploads, ETL pipelines, or nightly sync jobs — use the bulk endpoint:

import csv
import json
 
def clean_batch(records: list[dict], rules: dict) -> dict:
    """Clean and normalize a batch of records."""
    response = requests.post(
        f"{BASE_URL}/clean",
        headers={"X-API-Key": API_KEY},
        json={"records": records, "rules": rules},
    )
    response.raise_for_status()
    return response.json()
 
# Load your messy data
with open("customer_data.csv") as f:
    reader = csv.DictReader(f)
    records = list(reader)
 
# Define cleaning rules
cleaning_rules = {
    "trim_whitespace": True,
    "normalize_phone": {"target_format": "E164", "default_country": "US"},
    "normalize_email": {"lowercase": True},
    "date_fields": {
        "created_at": {"output_format": "ISO8601"},
        "dob": {"output_format": "YYYY-MM-DD"},
    },
    "remove_duplicates": {"key_fields": ["email"]},
    "drop_nulls": ["email", "name"],  # Remove rows where these are null
}
 
result = clean_batch(records, cleaning_rules)
 
print(f"Input: {result['stats']['input_count']} records")
print(f"Output: {result['stats']['output_count']} records")
print(f"Removed: {result['stats']['removed_count']} duplicates/nulls")
print(f"Modified: {result['stats']['modified_count']} records normalized")
 
# Save cleaned data
with open("customer_data_clean.csv", "w") as f:
    writer = csv.DictWriter(f, fieldnames=result["records"][0].keys())
    writer.writeheader()
    writer.writerows(result["records"])

Use Case 3: Real-Time Validation at Form Submission

For web applications, validate data server-side before writing to the database:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
 
app = FastAPI()
 
class SignupForm(BaseModel):
    email: str
    name: str
    company: str
    phone: str
 
@app.post("/signup")
async def signup(form: SignupForm):
    # Validate and clean with DataForge
    result = validate_record(
        record=form.dict(),
        schema={
            "fields": {
                "email": {"type": "email", "required": True},
                "name": {"type": "string", "min_length": 2, "max_length": 100},
                "company": {"type": "string", "required": True},
                "phone": {"type": "phone", "normalize": True},
            }
        }
    )
 
    if not result["valid"]:
        raise HTTPException(
            status_code=422,
            detail=result["errors"]
        )
 
    # Use the normalized version (phone formatted, email lowercased, etc.)
    clean_data = result["normalized"]
 
    # Now safe to write to DB
    await create_user(clean_data)
    return {"status": "ok"}

Use Case 4: Data Quality Reports

Before loading a new dataset, run a quality audit to understand what you're dealing with:

def audit_dataset(records: list[dict]) -> dict:
    """Generate a data quality report for a dataset."""
    response = requests.post(
        f"{BASE_URL}/audit",
        headers={"X-API-Key": API_KEY},
        json={"records": records},
    )
    response.raise_for_status()
    return response.json()
 
report = audit_dataset(records)
 
print(json.dumps(report["summary"], indent=2))
# {
#   "total_records": 15000,
#   "completeness": 0.94,
#   "duplicate_rate": 0.023,
#   "field_stats": {
#     "email": {"null_rate": 0.02, "invalid_rate": 0.008, "unique_rate": 0.97},
#     "phone": {"null_rate": 0.12, "format_inconsistency": 0.45},
#     "created_at": {"null_rate": 0.0, "format_issues": 0.003}
#   },
#   "recommendations": [
#     "Normalize phone field — 45% have inconsistent formatting",
#     "High null rate on phone (12%) — consider making optional or prompting users",
#     "Deduplicate on email — 2.3% duplicate rate detected"
#   ]
# }

Integrating Into a CI/CD Pipeline

Run data quality gates as part of your deployment or ETL pipeline:

import sys
 
def check_quality_gate(records: list[dict], min_completeness: float = 0.95) -> bool:
    report = audit_dataset(records)
    completeness = report["summary"]["completeness"]
    duplicate_rate = report["summary"]["duplicate_rate"]
 
    print(f"Completeness: {completeness:.1%} (threshold: {min_completeness:.1%})")
    print(f"Duplicate rate: {duplicate_rate:.1%}")
 
    if completeness < min_completeness:
        print(f"FAIL: Completeness {completeness:.1%} < {min_completeness:.1%}")
        return False
 
    if duplicate_rate > 0.05:
        print("FAIL: Duplicate rate exceeds 5%")
        return False
 
    print("PASS: Data quality gate passed")
    return True
 
# In your pipeline
records = load_from_source()
if not check_quality_gate(records):
    sys.exit(1)  # Halt the pipeline
 
# Proceed with clean data

Performance and Pricing

DataForge is built for production workloads:

  • Latency: ~120ms median for single-record validation, ~2-8s for batches up to 10,000 records
  • Throughput: Up to 500 records/second with batch endpoints
  • Rate limits: Depend on your plan — see pricing

For high-volume pipelines (millions of records/day), consider streaming validation rather than loading entire datasets into memory.

Summary

Manual data cleaning is a solved problem in 2026. With the DataForge API, you get:

  • Schema validation with detailed field-level error messages
  • Automatic normalization of phone numbers, emails, and dates
  • Duplicate detection and removal
  • Data quality audits with actionable recommendations
  • Language-agnostic — works from Python, Node.js, Go, or any HTTP client

Stop writing one-off cleaning scripts. Define your rules once, call them anywhere.

Get started: Sign up for a free APIVult account and make your first DataForge API call in under 5 minutes.