Education· Last updated April 6, 2026

How to Build a Multi-Source Financial Data Reconciliation Pipeline with DataForge API

Learn how to automate financial data reconciliation across ERP, bank feeds, and payment processors using DataForge API. Catch discrepancies before they become audit issues.

How to Build a Multi-Source Financial Data Reconciliation Pipeline with DataForge API

Finance teams lose hundreds of hours every quarter manually reconciling transactions across disconnected systems. An ERP shows one balance, the bank feed shows another, and the payment processor adds a third set of numbers. Tracking down every cent costs time that should go to analysis and strategy.

Automated financial data reconciliation solves this. With the right API pipeline, you can match transactions across sources in seconds, flag exceptions automatically, and produce audit-ready reconciliation reports without spreadsheet gymnastics.

This guide walks through building a production-grade reconciliation pipeline in Python using the DataForge API.

Why Financial Reconciliation Is Hard

Most reconciliation failures stem from three root causes:

  1. Schema inconsistencies: ERP exports dates as DD/MM/YYYY, the bank feed uses ISO 8601, and the payment processor uses Unix timestamps.
  2. Amount formatting differences: One system uses 1,250.00, another uses 1250.0, and a third stores amounts in cents as integers.
  3. Reference field mismatches: The ERP writes INV-2026-00142, the bank shows INV2026142, and the card processor records only 142.

Manual reconciliation requires a human to normalize all three before comparing. DataForge handles normalization programmatically, letting you focus on the exceptions that actually matter.

Architecture Overview

ERP Export (CSV)
    │
    ▼
Bank Feed (JSON via API)
    │
    ▼
Payment Processor (SFTP dump)
    │
    ▼
DataForge API (normalize + validate + deduplicate)
    │
    ▼
Reconciliation Engine (match + flag)
    │
    ▼
Discrepancy Report (PDF or webhook alert)

Each source lands in a staging area. DataForge normalizes the data to a canonical format. The reconciliation engine then performs matching and flags unmatched records.

Step 1: Install Dependencies

pip install requests pandas python-dotenv
import os
import requests
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv
 
load_dotenv()
DATAFORGE_API_KEY = os.getenv("YOUR_API_KEY")
DATAFORGE_BASE = "https://apivult.com/api/dataforge"

Step 2: Load Data from Each Source

def load_erp_data(filepath: str) -> pd.DataFrame:
    """Load ERP export CSV."""
    df = pd.read_csv(filepath)
    return df[["invoice_id", "amount", "currency", "transaction_date", "vendor_id"]]
 
 
def load_bank_feed(api_url: str, token: str) -> pd.DataFrame:
    """Load bank feed transactions via API."""
    resp = requests.get(api_url, headers={"Authorization": f"Bearer {token}"})
    resp.raise_for_status()
    transactions = resp.json()["transactions"]
    return pd.DataFrame(transactions)
 
 
def load_payment_processor(filepath: str) -> pd.DataFrame:
    """Load payment processor SFTP dump."""
    df = pd.read_csv(filepath)
    return df[["ref", "gross_amount", "currency_code", "settled_at"]]

Step 3: Normalize with DataForge API

This is where DataForge does the heavy lifting — normalizing inconsistent field names, date formats, and amount representations into a standard schema.

def normalize_records(records: list[dict], schema: dict) -> list[dict]:
    """Send records to DataForge for normalization and validation."""
    payload = {
        "records": records,
        "schema": schema,
        "options": {
            "date_format": "ISO8601",
            "amount_format": "decimal_2",
            "normalize_ids": True,
            "strip_special_chars": True
        }
    }
 
    resp = requests.post(
        f"{DATAFORGE_BASE}/normalize",
        json=payload,
        headers={
            "X-RapidAPI-Key": DATAFORGE_API_KEY,
            "Content-Type": "application/json"
        }
    )
    resp.raise_for_status()
    result = resp.json()
 
    if not result["success"]:
        raise ValueError(f"Normalization failed: {result['error']}")
 
    return result["data"]["records"]
 
 
# Define canonical schema
CANONICAL_SCHEMA = {
    "fields": {
        "transaction_id": {"type": "string", "normalize": True},
        "amount": {"type": "decimal", "precision": 2},
        "currency": {"type": "string", "uppercase": True},
        "date": {"type": "date", "format": "ISO8601"},
        "source": {"type": "string"}
    }
}

Step 4: Validate Data Quality

Before matching, validate that each dataset meets minimum quality thresholds. DataForge can score data quality and surface issues before they contaminate the reconciliation.

def validate_dataset(records: list[dict], source_name: str) -> dict:
    """Validate dataset quality and return quality score."""
    payload = {
        "records": records,
        "checks": [
            {"field": "transaction_id", "rule": "not_null"},
            {"field": "amount", "rule": "positive"},
            {"field": "currency", "rule": "iso_4217"},
            {"field": "date", "rule": "not_future"}
        ],
        "source": source_name
    }
 
    resp = requests.post(
        f"{DATAFORGE_BASE}/validate",
        json=payload,
        headers={"X-RapidAPI-Key": DATAFORGE_API_KEY}
    )
    resp.raise_for_status()
    result = resp.json()
 
    quality_score = result["data"]["quality_score"]
    issues = result["data"]["issues"]
 
    print(f"[{source_name}] Quality score: {quality_score:.1%}")
    for issue in issues[:5]:
        print(f"  ⚠ Row {issue['row']}: {issue['field']}{issue['message']}")
 
    return result["data"]

Step 5: Build the Reconciliation Engine

With normalized, validated data, run the matching logic:

def reconcile(erp_records: list[dict],
              bank_records: list[dict],
              payment_records: list[dict]) -> dict:
    """Three-way reconciliation across ERP, bank, and payment processor."""
    erp_df = pd.DataFrame(erp_records)
    bank_df = pd.DataFrame(bank_records)
    pay_df = pd.DataFrame(payment_records)
 
    # Merge ERP ↔ Bank on transaction_id
    erp_bank = pd.merge(
        erp_df, bank_df,
        on="transaction_id",
        how="outer",
        suffixes=("_erp", "_bank")
    )
 
    # Flag amount mismatches
    erp_bank["amount_match"] = (
        erp_bank["amount_erp"].round(2) == erp_bank["amount_bank"].round(2)
    )
 
    # Merge with payment processor
    three_way = pd.merge(
        erp_bank, pay_df,
        on="transaction_id",
        how="outer"
    )
 
    matched = three_way[
        three_way["amount_match"] &
        three_way["amount"].notna()
    ]
    unmatched = three_way[
        ~three_way["amount_match"] |
        three_way["transaction_id"].isna()
    ]
 
    return {
        "total_records": len(three_way),
        "matched": len(matched),
        "unmatched": len(unmatched),
        "match_rate": len(matched) / len(three_way),
        "discrepancies": unmatched.to_dict(orient="records")
    }

Step 6: Generate Discrepancy Report

def generate_report(reconciliation_result: dict, run_date: str) -> None:
    """Print a structured reconciliation summary."""
    result = reconciliation_result
    print("\n" + "═" * 50)
    print(f"RECONCILIATION REPORT — {run_date}")
    print("═" * 50)
    print(f"Total records processed : {result['total_records']:,}")
    print(f"Matched                 : {result['matched']:,}")
    print(f"Unmatched / exceptions  : {result['unmatched']:,}")
    print(f"Match rate              : {result['match_rate']:.1%}")
    print()
 
    if result["discrepancies"]:
        print("TOP DISCREPANCIES:")
        for i, disc in enumerate(result["discrepancies"][:10], 1):
            print(f"  {i}. ID={disc.get('transaction_id', 'MISSING')} "
                  f"ERP={disc.get('amount_erp')} "
                  f"Bank={disc.get('amount_bank')}")
    else:
        print("✅ No discrepancies found.")
 
    print("═" * 50)

Step 7: Run the Full Pipeline

def run_reconciliation_pipeline(
    erp_csv: str,
    bank_api_url: str,
    bank_token: str,
    payment_csv: str
):
    run_date = datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC")
    print(f"Starting reconciliation pipeline: {run_date}")
 
    # 1. Load
    erp_raw = load_erp_data(erp_csv).to_dict(orient="records")
    bank_raw = load_bank_feed(bank_api_url, bank_token).to_dict(orient="records")
    pay_raw = load_payment_processor(payment_csv).to_dict(orient="records")
 
    # 2. Normalize
    erp_normalized = normalize_records(erp_raw, CANONICAL_SCHEMA)
    bank_normalized = normalize_records(bank_raw, CANONICAL_SCHEMA)
    pay_normalized = normalize_records(pay_raw, CANONICAL_SCHEMA)
 
    # 3. Validate
    validate_dataset(erp_normalized, "ERP")
    validate_dataset(bank_normalized, "Bank")
    validate_dataset(pay_normalized, "PaymentProcessor")
 
    # 4. Reconcile
    result = reconcile(erp_normalized, bank_normalized, pay_normalized)
 
    # 5. Report
    generate_report(result, run_date)
 
    return result
 
 
if __name__ == "__main__":
    run_reconciliation_pipeline(
        erp_csv="exports/erp_transactions_2026_q1.csv",
        bank_api_url="https://internal-bank-api/transactions",
        bank_token=os.getenv("BANK_API_TOKEN"),
        payment_csv="exports/payment_processor_march_2026.csv"
    )

Production Considerations

Schedule this pipeline to run daily (or after every import batch) using a cron job or workflow orchestrator like Prefect or Airflow.

Alert on match rate drops: Set a threshold — if match rate falls below 98%, trigger a Slack/Teams alert for immediate investigation.

Audit trail: Persist reconciliation results to a database. Auditors expect a full history of reconciliation runs, not just the latest.

Exception workflow: Route discrepancies to a ticket queue (Jira, ServiceNow) rather than a spreadsheet. This keeps exception resolution trackable and auditable.

Results You Can Expect

Teams that automate reconciliation with DataForge typically see:

  • 85–90% reduction in reconciliation time per period
  • Near-zero manual lookups for format-related mismatches
  • Audit-ready reports available within minutes of data availability
  • Earlier detection of fraud and posting errors — days earlier than month-end close

Ready to stop chasing mismatches manually? Try DataForge API free and run your first reconciliation in under 30 minutes.