How to Build an Automated KYC Onboarding Pipeline in Python with SanctionShield AI
Step-by-step Python tutorial: build a production-ready KYC onboarding pipeline with sanctions screening, PEP checks, risk scoring, and automated approval routing.

What You'll Build
This tutorial walks through building a complete automated KYC (Know Your Customer) onboarding pipeline in Python. By the end, you'll have a working system that:
- Accepts new customer registrations
- Screens each applicant against global sanctions lists and PEP databases
- Assigns a risk score and compliance tier
- Routes low-risk applicants to auto-approval
- Routes medium-risk applicants to enhanced due diligence queue
- Blocks high-risk and sanctioned applicants with SAR documentation
- Logs all screening decisions for audit purposes
This is the same pattern used by fintech platforms, crypto exchanges, and lending companies to automate what would otherwise require hours of manual compliance review per application.
Why Automated KYC Is Now Essential
The compliance landscape in 2026 makes manual KYC onboarding unsustainable at growth-stage volumes. Consider the regulatory requirements now in effect or taking effect:
- GENIUS Act (US, effective Jan 2027): Stablecoin issuers must screen all customers against OFAC lists with documented procedures
- FinCEN AML Reform (proposed April 2026): Risk-based programs must demonstrate documented, systematic screening — not ad hoc manual review
- EU AML Package: Payment service providers must screen against EU Consolidated List at onboarding and on an ongoing basis
- FATF Recommendation 10: Financial institutions must perform CDD (Customer Due Diligence) that includes beneficial ownership verification and PEP checks
Manual review of a growing customer base doesn't scale to meet these requirements. An automated pipeline that screens every applicant, documents every decision, and routes exceptions to human review provides both the operational efficiency and the audit trail that regulators expect.
Prerequisites
pip install requests python-dotenv fastapi uvicorn pydanticYou'll need:
- A SanctionShield AI API key from RapidAPI (search "SanctionShield AI")
- Python 3.9+
Step 1: Define the Customer Data Model
Start with a clean data model for incoming KYC applications:
# models.py
from pydantic import BaseModel, EmailStr
from typing import Optional, Literal
from datetime import date
from enum import Enum
class EntityType(str, Enum):
individual = "individual"
organization = "organization"
class KYCApplication(BaseModel):
"""Incoming KYC application from onboarding form."""
application_id: str
entity_type: EntityType
# Individual fields
full_name: Optional[str] = None
date_of_birth: Optional[date] = None
nationality: Optional[str] = None # ISO 3166-1 alpha-2
# Organization fields
company_name: Optional[str] = None
registration_country: Optional[str] = None
registration_number: Optional[str] = None
# Contact
email: EmailStr
class RiskTier(str, Enum):
low = "low" # Auto-approve
medium = "medium" # Enhanced due diligence queue
high = "high" # Manual review required
blocked = "blocked" # Sanctions match — block and SAR
class KYCDecision(BaseModel):
"""Output of the KYC pipeline for each application."""
application_id: str
entity_name: str
risk_tier: RiskTier
risk_score: int # 0-100
sanctions_match: bool
pep_match: bool
adverse_media_found: bool
decision_reason: str
screening_id: str # For audit trail
next_action: strStep 2: Build the Screening Client
# screening_client.py
import requests
import os
from typing import Optional
from models import KYCApplication, RiskTier, KYCDecision
from datetime import datetime
SANCTIONSHIELD_API_URL = "https://apivult.com/sanctionshield/v1/screen"
SANCTIONSHIELD_KEY = os.environ.get("SANCTIONSHIELD_API_KEY")
SCREENING_LISTS = [
"ofac_sdn",
"ofac_consolidated",
"eu_consolidated",
"un_sanctions",
"uk_hm_treasury",
"pep_global",
"fatf_high_risk_jurisdictions"
]
def build_screening_payload(application: KYCApplication) -> dict:
"""Build the API payload from the KYC application."""
payload = {
"screening_lists": SCREENING_LISTS,
"include_adverse_media": True,
"fuzzy_matching": True,
"fuzzy_threshold": 0.85
}
if application.entity_type == "individual":
payload.update({
"entity_name": application.full_name,
"entity_type": "individual",
"date_of_birth": str(application.date_of_birth) if application.date_of_birth else None,
"nationality": application.nationality
})
else:
payload.update({
"entity_name": application.company_name,
"entity_type": "organization",
"country": application.registration_country,
"registration_number": application.registration_number,
"include_beneficial_ownership_check": True
})
return {k: v for k, v in payload.items() if v is not None}
def screen_applicant(application: KYCApplication) -> KYCDecision:
"""
Screen a KYC applicant against sanctions, PEP, and adverse media databases.
Returns a KYCDecision with risk tier and routing instructions.
"""
entity_name = application.full_name or application.company_name or "Unknown"
payload = build_screening_payload(application)
response = requests.post(
SANCTIONSHIELD_API_URL,
headers={
"X-RapidAPI-Key": SANCTIONSHIELD_KEY,
"Content-Type": "application/json"
},
json=payload,
timeout=10
)
response.raise_for_status()
result = response.json()
# Extract key screening outcomes
sanctions_match = result.get("match_found", False)
pep_match = result.get("pep_match", False)
adverse_media = result.get("adverse_media_found", False)
risk_score = result.get("risk_score", 0)
screening_id = result.get("screening_id", f"SCR-{application.application_id}")
# Apply risk tiering logic
risk_tier, decision_reason, next_action = classify_risk(
sanctions_match=sanctions_match,
pep_match=pep_match,
adverse_media=adverse_media,
risk_score=risk_score,
nationality=application.nationality or application.registration_country
)
return KYCDecision(
application_id=application.application_id,
entity_name=entity_name,
risk_tier=risk_tier,
risk_score=risk_score,
sanctions_match=sanctions_match,
pep_match=pep_match,
adverse_media_found=adverse_media,
decision_reason=decision_reason,
screening_id=screening_id,
next_action=next_action
)
HIGH_RISK_COUNTRIES = {
"IR", "KP", "SY", "CU", "RU", "BY", # OFAC comprehensive sanctions
"AF", "YE", "LY", "SO", # FATF high-risk
"MM", "VE" # Additional high-risk
}
def classify_risk(
sanctions_match: bool,
pep_match: bool,
adverse_media: bool,
risk_score: int,
nationality: Optional[str]
) -> tuple[RiskTier, str, str]:
"""Apply risk-based classification rules to screening outcomes."""
# Tier 1: Immediate block — sanctions match
if sanctions_match:
return (
RiskTier.blocked,
"Applicant matches entry on OFAC/international sanctions list",
"Block account creation. Generate SAR documentation for compliance review."
)
# Tier 2: High risk — PEP + other risk factors, or very high score
if pep_match and (adverse_media or risk_score > 75):
return (
RiskTier.high,
"PEP match combined with adverse media or elevated risk score",
"Route to senior compliance officer for Enhanced Due Diligence review."
)
# Tier 3: Medium risk — PEP alone, adverse media, high-risk country, or moderate score
if pep_match or adverse_media or (nationality in HIGH_RISK_COUNTRIES) or risk_score > 50:
reasons = []
if pep_match:
reasons.append("PEP match")
if adverse_media:
reasons.append("adverse media found")
if nationality in HIGH_RISK_COUNTRIES:
reasons.append(f"high-risk country ({nationality})")
if risk_score > 50:
reasons.append(f"elevated risk score ({risk_score})")
return (
RiskTier.medium,
f"Enhanced due diligence required: {', '.join(reasons)}",
"Collect additional documentation: source of funds, purpose of relationship. Route to EDD queue."
)
# Tier 4: Low risk — clear screening, low risk score
return (
RiskTier.low,
"No matches found. Risk score within acceptable range.",
"Auto-approve account creation. Schedule 12-month periodic review."
)Step 3: Build the Pipeline Orchestrator
# pipeline.py
import json
import logging
from datetime import datetime
from pathlib import Path
from models import KYCApplication, KYCDecision, RiskTier
from screening_client import screen_applicant
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Audit log path — in production, use a database or SIEM
AUDIT_LOG_PATH = Path("kyc_audit_log.jsonl")
def log_decision(application: KYCApplication, decision: KYCDecision) -> None:
"""Write immutable audit log entry for compliance record."""
log_entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"application_id": decision.application_id,
"entity_name": decision.entity_name,
"entity_type": application.entity_type,
"risk_tier": decision.risk_tier,
"risk_score": decision.risk_score,
"sanctions_match": decision.sanctions_match,
"pep_match": decision.pep_match,
"adverse_media_found": decision.adverse_media_found,
"decision_reason": decision.decision_reason,
"screening_id": decision.screening_id,
"next_action": decision.next_action
}
with open(AUDIT_LOG_PATH, "a") as f:
f.write(json.dumps(log_entry) + "\n")
logger.info(f"KYC decision logged: {decision.application_id} → {decision.risk_tier}")
def generate_sar_record(application: KYCApplication, decision: KYCDecision) -> dict:
"""Generate Suspicious Activity Report data for blocked applicants."""
return {
"sar_type": "kyc_block",
"subject_name": decision.entity_name,
"application_id": decision.application_id,
"screening_id": decision.screening_id,
"block_reason": decision.decision_reason,
"timestamp": datetime.utcnow().isoformat() + "Z",
"status": "pending_compliance_review",
"instructions": "Submit SAR to FinCEN within 30 days of suspicious activity confirmation"
}
def process_kyc_application(application: KYCApplication) -> dict:
"""
Main pipeline entry point. Screens applicant and routes based on risk tier.
Returns action dict consumed by the onboarding system.
"""
logger.info(f"Processing KYC for application {application.application_id}")
# Run screening
try:
decision = screen_applicant(application)
except Exception as e:
logger.error(f"Screening failed for {application.application_id}: {e}")
# Fail safe: route to manual review on API errors
return {
"application_id": application.application_id,
"action": "manual_review",
"reason": f"Screening service error: {str(e)}",
"priority": "high"
}
# Log for audit trail
log_decision(application, decision)
# Route based on risk tier
if decision.risk_tier == RiskTier.blocked:
sar = generate_sar_record(application, decision)
logger.warning(f"BLOCKED: {decision.entity_name} — sanctions match")
return {
"application_id": decision.application_id,
"action": "block",
"reason": decision.decision_reason,
"sar_record": sar
}
elif decision.risk_tier == RiskTier.high:
logger.info(f"HIGH RISK: {decision.entity_name} → senior review queue")
return {
"application_id": decision.application_id,
"action": "senior_review_queue",
"reason": decision.decision_reason,
"risk_score": decision.risk_score,
"edd_required": True
}
elif decision.risk_tier == RiskTier.medium:
logger.info(f"MEDIUM RISK: {decision.entity_name} → EDD queue")
return {
"application_id": decision.application_id,
"action": "edd_queue",
"reason": decision.decision_reason,
"risk_score": decision.risk_score,
"documents_required": [
"source_of_funds",
"purpose_of_relationship",
"beneficial_ownership_declaration"
]
}
else: # low risk
logger.info(f"APPROVED: {decision.entity_name} — auto-approval")
return {
"application_id": decision.application_id,
"action": "approve",
"risk_score": decision.risk_score,
"next_review_date": "12_months"
}Step 4: Expose as a FastAPI Endpoint
# main.py
from fastapi import FastAPI, HTTPException, Header
from models import KYCApplication
from pipeline import process_kyc_application
import os
app = FastAPI(title="KYC Onboarding API", version="1.0.0")
INTERNAL_API_KEY = os.environ.get("INTERNAL_API_KEY")
@app.post("/kyc/screen")
async def screen_kyc_application(
application: KYCApplication,
x_api_key: str = Header(...)
):
"""
Screen a new KYC application and return routing decision.
Internal use only — requires valid API key.
"""
if x_api_key != INTERNAL_API_KEY:
raise HTTPException(status_code=401, detail="Invalid API key")
result = process_kyc_application(application)
return result
@app.get("/health")
async def health():
return {"status": "ok"}Start the server:
uvicorn main:app --host 0.0.0.0 --port 8000Step 5: Test the Pipeline
# test_pipeline.py
import requests
BASE_URL = "http://localhost:8000"
API_KEY = "your-internal-api-key"
test_cases = [
# Low risk individual
{
"application_id": "APP-001",
"entity_type": "individual",
"full_name": "John Smith",
"date_of_birth": "1985-03-15",
"nationality": "CA",
"email": "[email protected]"
},
# High-risk country individual
{
"application_id": "APP-002",
"entity_type": "individual",
"full_name": "Test User",
"date_of_birth": "1990-07-20",
"nationality": "IR", # Iran — OFAC comprehensive sanctions
"email": "[email protected]"
}
]
for test in test_cases:
response = requests.post(
f"{BASE_URL}/kyc/screen",
json=test,
headers={"x-api-key": API_KEY}
)
result = response.json()
print(f"\nApp {test['application_id']}: {result['action'].upper()}")
print(f" Reason: {result.get('reason', 'N/A')}")Production Considerations
Database for audit logs: Replace the JSONL file with PostgreSQL or a SIEM-compatible log store. Audit logs must be tamper-evident and retained for the required period (7 years under BSA).
Async processing: For high-volume onboarding, process screening asynchronously using a task queue (Celery + Redis or AWS SQS). Return a pending status immediately and webhook the result when screening completes.
Ongoing monitoring: Schedule daily re-screening of your active customer base using SanctionShield AI's batch endpoint. Configure webhooks to trigger an EDD workflow if a previously-approved customer appears on a newly-updated sanctions list.
Failure handling: Never auto-approve on screening API errors. Route to manual review on any exception, and alert your compliance team.
Rate limits: SanctionShield AI supports high-throughput screening — check your plan's rate limits and implement exponential backoff for burst traffic.
Results: What Automated KYC Delivers
Organizations implementing automated KYC pipelines similar to this architecture typically see:
- 90%+ of applications auto-approved within seconds (assuming clean customer base)
- ~8% routed to EDD for additional documentation
- ~2% blocked for sanctions matches or very high risk scores
- Manual review reduced by 85%+ compared to fully manual KYC programs
- Audit-ready documentation for every screening decision from day one
The pipeline described here takes under a day to implement and provides the documented, systematic screening process that FinCEN's reform proposal and the GENIUS Act require — without building ML models from scratch.
Next Steps
- Get your SanctionShield AI API key at RapidAPI.com
- Add identity document verification to the pipeline using Stripe Identity or Jumio
- Implement the ongoing monitoring webhook to re-screen customers when sanctions lists update
- Review the full KYC API comparison to choose the right document verification vendor for your use case
More Articles
Real-Time AML Sanctions Screening in Python: A Complete Integration Guide
Real-time sanctions screening against OFAC, UN, EU lists. Integrate SanctionShield API for AML/KYC in Python.
March 31, 2026
Best KYC API Solutions in 2026: Know Your Customer Tools Compared
Compare the top KYC API solutions for identity verification, sanctions screening, and AML compliance in 2026. Includes pricing, features, and use case recommendations.
April 12, 2026