How to Build a Multi-Source Financial Data Reconciliation Pipeline with DataForge API
Learn how to automate financial data reconciliation across ERP, bank feeds, and payment processors using DataForge API. Catch discrepancies before they become audit issues.

Finance teams lose hundreds of hours every quarter manually reconciling transactions across disconnected systems. An ERP shows one balance, the bank feed shows another, and the payment processor adds a third set of numbers. Tracking down every cent costs time that should go to analysis and strategy.
Automated financial data reconciliation solves this. With the right API pipeline, you can match transactions across sources in seconds, flag exceptions automatically, and produce audit-ready reconciliation reports without spreadsheet gymnastics.
This guide walks through building a production-grade reconciliation pipeline in Python using the DataForge API.
Why Financial Reconciliation Is Hard
Most reconciliation failures stem from three root causes:
- Schema inconsistencies: ERP exports dates as
DD/MM/YYYY, the bank feed uses ISO 8601, and the payment processor uses Unix timestamps. - Amount formatting differences: One system uses
1,250.00, another uses1250.0, and a third stores amounts in cents as integers. - Reference field mismatches: The ERP writes
INV-2026-00142, the bank showsINV2026142, and the card processor records only142.
Manual reconciliation requires a human to normalize all three before comparing. DataForge handles normalization programmatically, letting you focus on the exceptions that actually matter.
Architecture Overview
ERP Export (CSV)
│
▼
Bank Feed (JSON via API)
│
▼
Payment Processor (SFTP dump)
│
▼
DataForge API (normalize + validate + deduplicate)
│
▼
Reconciliation Engine (match + flag)
│
▼
Discrepancy Report (PDF or webhook alert)
Each source lands in a staging area. DataForge normalizes the data to a canonical format. The reconciliation engine then performs matching and flags unmatched records.
Step 1: Install Dependencies
pip install requests pandas python-dotenvimport os
import requests
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
DATAFORGE_API_KEY = os.getenv("YOUR_API_KEY")
DATAFORGE_BASE = "https://apivult.com/api/dataforge"Step 2: Load Data from Each Source
def load_erp_data(filepath: str) -> pd.DataFrame:
"""Load ERP export CSV."""
df = pd.read_csv(filepath)
return df[["invoice_id", "amount", "currency", "transaction_date", "vendor_id"]]
def load_bank_feed(api_url: str, token: str) -> pd.DataFrame:
"""Load bank feed transactions via API."""
resp = requests.get(api_url, headers={"Authorization": f"Bearer {token}"})
resp.raise_for_status()
transactions = resp.json()["transactions"]
return pd.DataFrame(transactions)
def load_payment_processor(filepath: str) -> pd.DataFrame:
"""Load payment processor SFTP dump."""
df = pd.read_csv(filepath)
return df[["ref", "gross_amount", "currency_code", "settled_at"]]Step 3: Normalize with DataForge API
This is where DataForge does the heavy lifting — normalizing inconsistent field names, date formats, and amount representations into a standard schema.
def normalize_records(records: list[dict], schema: dict) -> list[dict]:
"""Send records to DataForge for normalization and validation."""
payload = {
"records": records,
"schema": schema,
"options": {
"date_format": "ISO8601",
"amount_format": "decimal_2",
"normalize_ids": True,
"strip_special_chars": True
}
}
resp = requests.post(
f"{DATAFORGE_BASE}/normalize",
json=payload,
headers={
"X-RapidAPI-Key": DATAFORGE_API_KEY,
"Content-Type": "application/json"
}
)
resp.raise_for_status()
result = resp.json()
if not result["success"]:
raise ValueError(f"Normalization failed: {result['error']}")
return result["data"]["records"]
# Define canonical schema
CANONICAL_SCHEMA = {
"fields": {
"transaction_id": {"type": "string", "normalize": True},
"amount": {"type": "decimal", "precision": 2},
"currency": {"type": "string", "uppercase": True},
"date": {"type": "date", "format": "ISO8601"},
"source": {"type": "string"}
}
}Step 4: Validate Data Quality
Before matching, validate that each dataset meets minimum quality thresholds. DataForge can score data quality and surface issues before they contaminate the reconciliation.
def validate_dataset(records: list[dict], source_name: str) -> dict:
"""Validate dataset quality and return quality score."""
payload = {
"records": records,
"checks": [
{"field": "transaction_id", "rule": "not_null"},
{"field": "amount", "rule": "positive"},
{"field": "currency", "rule": "iso_4217"},
{"field": "date", "rule": "not_future"}
],
"source": source_name
}
resp = requests.post(
f"{DATAFORGE_BASE}/validate",
json=payload,
headers={"X-RapidAPI-Key": DATAFORGE_API_KEY}
)
resp.raise_for_status()
result = resp.json()
quality_score = result["data"]["quality_score"]
issues = result["data"]["issues"]
print(f"[{source_name}] Quality score: {quality_score:.1%}")
for issue in issues[:5]:
print(f" ⚠ Row {issue['row']}: {issue['field']} — {issue['message']}")
return result["data"]Step 5: Build the Reconciliation Engine
With normalized, validated data, run the matching logic:
def reconcile(erp_records: list[dict],
bank_records: list[dict],
payment_records: list[dict]) -> dict:
"""Three-way reconciliation across ERP, bank, and payment processor."""
erp_df = pd.DataFrame(erp_records)
bank_df = pd.DataFrame(bank_records)
pay_df = pd.DataFrame(payment_records)
# Merge ERP ↔ Bank on transaction_id
erp_bank = pd.merge(
erp_df, bank_df,
on="transaction_id",
how="outer",
suffixes=("_erp", "_bank")
)
# Flag amount mismatches
erp_bank["amount_match"] = (
erp_bank["amount_erp"].round(2) == erp_bank["amount_bank"].round(2)
)
# Merge with payment processor
three_way = pd.merge(
erp_bank, pay_df,
on="transaction_id",
how="outer"
)
matched = three_way[
three_way["amount_match"] &
three_way["amount"].notna()
]
unmatched = three_way[
~three_way["amount_match"] |
three_way["transaction_id"].isna()
]
return {
"total_records": len(three_way),
"matched": len(matched),
"unmatched": len(unmatched),
"match_rate": len(matched) / len(three_way),
"discrepancies": unmatched.to_dict(orient="records")
}Step 6: Generate Discrepancy Report
def generate_report(reconciliation_result: dict, run_date: str) -> None:
"""Print a structured reconciliation summary."""
result = reconciliation_result
print("\n" + "═" * 50)
print(f"RECONCILIATION REPORT — {run_date}")
print("═" * 50)
print(f"Total records processed : {result['total_records']:,}")
print(f"Matched : {result['matched']:,}")
print(f"Unmatched / exceptions : {result['unmatched']:,}")
print(f"Match rate : {result['match_rate']:.1%}")
print()
if result["discrepancies"]:
print("TOP DISCREPANCIES:")
for i, disc in enumerate(result["discrepancies"][:10], 1):
print(f" {i}. ID={disc.get('transaction_id', 'MISSING')} "
f"ERP={disc.get('amount_erp')} "
f"Bank={disc.get('amount_bank')}")
else:
print("✅ No discrepancies found.")
print("═" * 50)Step 7: Run the Full Pipeline
def run_reconciliation_pipeline(
erp_csv: str,
bank_api_url: str,
bank_token: str,
payment_csv: str
):
run_date = datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC")
print(f"Starting reconciliation pipeline: {run_date}")
# 1. Load
erp_raw = load_erp_data(erp_csv).to_dict(orient="records")
bank_raw = load_bank_feed(bank_api_url, bank_token).to_dict(orient="records")
pay_raw = load_payment_processor(payment_csv).to_dict(orient="records")
# 2. Normalize
erp_normalized = normalize_records(erp_raw, CANONICAL_SCHEMA)
bank_normalized = normalize_records(bank_raw, CANONICAL_SCHEMA)
pay_normalized = normalize_records(pay_raw, CANONICAL_SCHEMA)
# 3. Validate
validate_dataset(erp_normalized, "ERP")
validate_dataset(bank_normalized, "Bank")
validate_dataset(pay_normalized, "PaymentProcessor")
# 4. Reconcile
result = reconcile(erp_normalized, bank_normalized, pay_normalized)
# 5. Report
generate_report(result, run_date)
return result
if __name__ == "__main__":
run_reconciliation_pipeline(
erp_csv="exports/erp_transactions_2026_q1.csv",
bank_api_url="https://internal-bank-api/transactions",
bank_token=os.getenv("BANK_API_TOKEN"),
payment_csv="exports/payment_processor_march_2026.csv"
)Production Considerations
Schedule this pipeline to run daily (or after every import batch) using a cron job or workflow orchestrator like Prefect or Airflow.
Alert on match rate drops: Set a threshold — if match rate falls below 98%, trigger a Slack/Teams alert for immediate investigation.
Audit trail: Persist reconciliation results to a database. Auditors expect a full history of reconciliation runs, not just the latest.
Exception workflow: Route discrepancies to a ticket queue (Jira, ServiceNow) rather than a spreadsheet. This keeps exception resolution trackable and auditable.
Results You Can Expect
Teams that automate reconciliation with DataForge typically see:
- 85–90% reduction in reconciliation time per period
- Near-zero manual lookups for format-related mismatches
- Audit-ready reports available within minutes of data availability
- Earlier detection of fraud and posting errors — days earlier than month-end close
Ready to stop chasing mismatches manually? Try DataForge API free and run your first reconciliation in under 30 minutes.
More Articles
How to Automate Data Validation and Cleaning in Python (2026 Guide)
Automate data validation, deduplication, and cleaning with DataForge API. Build production-quality data pipelines in Python.
March 30, 2026
Automate Data Quality in ETL Pipelines with the DataForge API
Learn how to catch schema violations, fix formatting inconsistencies, and validate business rules in your ETL pipelines using the DataForge API and Python.
March 31, 2026