Education· Last updated April 12, 2026

How to Build an Automated KYC Onboarding Pipeline in Python with SanctionShield AI

Step-by-step Python tutorial: build a production-ready KYC onboarding pipeline with sanctions screening, PEP checks, risk scoring, and automated approval routing.

How to Build an Automated KYC Onboarding Pipeline in Python with SanctionShield AI

What You'll Build

This tutorial walks through building a complete automated KYC (Know Your Customer) onboarding pipeline in Python. By the end, you'll have a working system that:

  • Accepts new customer registrations
  • Screens each applicant against global sanctions lists and PEP databases
  • Assigns a risk score and compliance tier
  • Routes low-risk applicants to auto-approval
  • Routes medium-risk applicants to enhanced due diligence queue
  • Blocks high-risk and sanctioned applicants with SAR documentation
  • Logs all screening decisions for audit purposes

This is the same pattern used by fintech platforms, crypto exchanges, and lending companies to automate what would otherwise require hours of manual compliance review per application.


Why Automated KYC Is Now Essential

The compliance landscape in 2026 makes manual KYC onboarding unsustainable at growth-stage volumes. Consider the regulatory requirements now in effect or taking effect:

  • GENIUS Act (US, effective Jan 2027): Stablecoin issuers must screen all customers against OFAC lists with documented procedures
  • FinCEN AML Reform (proposed April 2026): Risk-based programs must demonstrate documented, systematic screening — not ad hoc manual review
  • EU AML Package: Payment service providers must screen against EU Consolidated List at onboarding and on an ongoing basis
  • FATF Recommendation 10: Financial institutions must perform CDD (Customer Due Diligence) that includes beneficial ownership verification and PEP checks

Manual review of a growing customer base doesn't scale to meet these requirements. An automated pipeline that screens every applicant, documents every decision, and routes exceptions to human review provides both the operational efficiency and the audit trail that regulators expect.


Prerequisites

pip install requests python-dotenv fastapi uvicorn pydantic

You'll need:

  • A SanctionShield AI API key from RapidAPI (search "SanctionShield AI")
  • Python 3.9+

Step 1: Define the Customer Data Model

Start with a clean data model for incoming KYC applications:

# models.py
from pydantic import BaseModel, EmailStr
from typing import Optional, Literal
from datetime import date
from enum import Enum
 
class EntityType(str, Enum):
    individual = "individual"
    organization = "organization"
 
class KYCApplication(BaseModel):
    """Incoming KYC application from onboarding form."""
    application_id: str
    entity_type: EntityType
    
    # Individual fields
    full_name: Optional[str] = None
    date_of_birth: Optional[date] = None
    nationality: Optional[str] = None  # ISO 3166-1 alpha-2
    
    # Organization fields
    company_name: Optional[str] = None
    registration_country: Optional[str] = None
    registration_number: Optional[str] = None
    
    # Contact
    email: EmailStr
    
class RiskTier(str, Enum):
    low = "low"          # Auto-approve
    medium = "medium"    # Enhanced due diligence queue
    high = "high"        # Manual review required
    blocked = "blocked"  # Sanctions match — block and SAR
 
class KYCDecision(BaseModel):
    """Output of the KYC pipeline for each application."""
    application_id: str
    entity_name: str
    risk_tier: RiskTier
    risk_score: int        # 0-100
    sanctions_match: bool
    pep_match: bool
    adverse_media_found: bool
    decision_reason: str
    screening_id: str      # For audit trail
    next_action: str

Step 2: Build the Screening Client

# screening_client.py
import requests
import os
from typing import Optional
from models import KYCApplication, RiskTier, KYCDecision
from datetime import datetime
 
SANCTIONSHIELD_API_URL = "https://apivult.com/sanctionshield/v1/screen"
SANCTIONSHIELD_KEY = os.environ.get("SANCTIONSHIELD_API_KEY")
 
SCREENING_LISTS = [
    "ofac_sdn",
    "ofac_consolidated",
    "eu_consolidated",
    "un_sanctions",
    "uk_hm_treasury",
    "pep_global",
    "fatf_high_risk_jurisdictions"
]
 
def build_screening_payload(application: KYCApplication) -> dict:
    """Build the API payload from the KYC application."""
    payload = {
        "screening_lists": SCREENING_LISTS,
        "include_adverse_media": True,
        "fuzzy_matching": True,
        "fuzzy_threshold": 0.85
    }
    
    if application.entity_type == "individual":
        payload.update({
            "entity_name": application.full_name,
            "entity_type": "individual",
            "date_of_birth": str(application.date_of_birth) if application.date_of_birth else None,
            "nationality": application.nationality
        })
    else:
        payload.update({
            "entity_name": application.company_name,
            "entity_type": "organization",
            "country": application.registration_country,
            "registration_number": application.registration_number,
            "include_beneficial_ownership_check": True
        })
    
    return {k: v for k, v in payload.items() if v is not None}
 
def screen_applicant(application: KYCApplication) -> KYCDecision:
    """
    Screen a KYC applicant against sanctions, PEP, and adverse media databases.
    Returns a KYCDecision with risk tier and routing instructions.
    """
    entity_name = application.full_name or application.company_name or "Unknown"
    
    payload = build_screening_payload(application)
    
    response = requests.post(
        SANCTIONSHIELD_API_URL,
        headers={
            "X-RapidAPI-Key": SANCTIONSHIELD_KEY,
            "Content-Type": "application/json"
        },
        json=payload,
        timeout=10
    )
    response.raise_for_status()
    result = response.json()
    
    # Extract key screening outcomes
    sanctions_match = result.get("match_found", False)
    pep_match = result.get("pep_match", False)
    adverse_media = result.get("adverse_media_found", False)
    risk_score = result.get("risk_score", 0)
    screening_id = result.get("screening_id", f"SCR-{application.application_id}")
    
    # Apply risk tiering logic
    risk_tier, decision_reason, next_action = classify_risk(
        sanctions_match=sanctions_match,
        pep_match=pep_match,
        adverse_media=adverse_media,
        risk_score=risk_score,
        nationality=application.nationality or application.registration_country
    )
    
    return KYCDecision(
        application_id=application.application_id,
        entity_name=entity_name,
        risk_tier=risk_tier,
        risk_score=risk_score,
        sanctions_match=sanctions_match,
        pep_match=pep_match,
        adverse_media_found=adverse_media,
        decision_reason=decision_reason,
        screening_id=screening_id,
        next_action=next_action
    )
 
HIGH_RISK_COUNTRIES = {
    "IR", "KP", "SY", "CU", "RU", "BY",  # OFAC comprehensive sanctions
    "AF", "YE", "LY", "SO",               # FATF high-risk
    "MM", "VE"                             # Additional high-risk
}
 
def classify_risk(
    sanctions_match: bool,
    pep_match: bool,
    adverse_media: bool,
    risk_score: int,
    nationality: Optional[str]
) -> tuple[RiskTier, str, str]:
    """Apply risk-based classification rules to screening outcomes."""
    
    # Tier 1: Immediate block — sanctions match
    if sanctions_match:
        return (
            RiskTier.blocked,
            "Applicant matches entry on OFAC/international sanctions list",
            "Block account creation. Generate SAR documentation for compliance review."
        )
    
    # Tier 2: High risk — PEP + other risk factors, or very high score
    if pep_match and (adverse_media or risk_score > 75):
        return (
            RiskTier.high,
            "PEP match combined with adverse media or elevated risk score",
            "Route to senior compliance officer for Enhanced Due Diligence review."
        )
    
    # Tier 3: Medium risk — PEP alone, adverse media, high-risk country, or moderate score
    if pep_match or adverse_media or (nationality in HIGH_RISK_COUNTRIES) or risk_score > 50:
        reasons = []
        if pep_match:
            reasons.append("PEP match")
        if adverse_media:
            reasons.append("adverse media found")
        if nationality in HIGH_RISK_COUNTRIES:
            reasons.append(f"high-risk country ({nationality})")
        if risk_score > 50:
            reasons.append(f"elevated risk score ({risk_score})")
        
        return (
            RiskTier.medium,
            f"Enhanced due diligence required: {', '.join(reasons)}",
            "Collect additional documentation: source of funds, purpose of relationship. Route to EDD queue."
        )
    
    # Tier 4: Low risk — clear screening, low risk score
    return (
        RiskTier.low,
        "No matches found. Risk score within acceptable range.",
        "Auto-approve account creation. Schedule 12-month periodic review."
    )

Step 3: Build the Pipeline Orchestrator

# pipeline.py
import json
import logging
from datetime import datetime
from pathlib import Path
from models import KYCApplication, KYCDecision, RiskTier
from screening_client import screen_applicant
 
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
 
# Audit log path — in production, use a database or SIEM
AUDIT_LOG_PATH = Path("kyc_audit_log.jsonl")
 
def log_decision(application: KYCApplication, decision: KYCDecision) -> None:
    """Write immutable audit log entry for compliance record."""
    log_entry = {
        "timestamp": datetime.utcnow().isoformat() + "Z",
        "application_id": decision.application_id,
        "entity_name": decision.entity_name,
        "entity_type": application.entity_type,
        "risk_tier": decision.risk_tier,
        "risk_score": decision.risk_score,
        "sanctions_match": decision.sanctions_match,
        "pep_match": decision.pep_match,
        "adverse_media_found": decision.adverse_media_found,
        "decision_reason": decision.decision_reason,
        "screening_id": decision.screening_id,
        "next_action": decision.next_action
    }
    
    with open(AUDIT_LOG_PATH, "a") as f:
        f.write(json.dumps(log_entry) + "\n")
    
    logger.info(f"KYC decision logged: {decision.application_id}{decision.risk_tier}")
 
def generate_sar_record(application: KYCApplication, decision: KYCDecision) -> dict:
    """Generate Suspicious Activity Report data for blocked applicants."""
    return {
        "sar_type": "kyc_block",
        "subject_name": decision.entity_name,
        "application_id": decision.application_id,
        "screening_id": decision.screening_id,
        "block_reason": decision.decision_reason,
        "timestamp": datetime.utcnow().isoformat() + "Z",
        "status": "pending_compliance_review",
        "instructions": "Submit SAR to FinCEN within 30 days of suspicious activity confirmation"
    }
 
def process_kyc_application(application: KYCApplication) -> dict:
    """
    Main pipeline entry point. Screens applicant and routes based on risk tier.
    Returns action dict consumed by the onboarding system.
    """
    logger.info(f"Processing KYC for application {application.application_id}")
    
    # Run screening
    try:
        decision = screen_applicant(application)
    except Exception as e:
        logger.error(f"Screening failed for {application.application_id}: {e}")
        # Fail safe: route to manual review on API errors
        return {
            "application_id": application.application_id,
            "action": "manual_review",
            "reason": f"Screening service error: {str(e)}",
            "priority": "high"
        }
    
    # Log for audit trail
    log_decision(application, decision)
    
    # Route based on risk tier
    if decision.risk_tier == RiskTier.blocked:
        sar = generate_sar_record(application, decision)
        logger.warning(f"BLOCKED: {decision.entity_name} — sanctions match")
        return {
            "application_id": decision.application_id,
            "action": "block",
            "reason": decision.decision_reason,
            "sar_record": sar
        }
    
    elif decision.risk_tier == RiskTier.high:
        logger.info(f"HIGH RISK: {decision.entity_name} → senior review queue")
        return {
            "application_id": decision.application_id,
            "action": "senior_review_queue",
            "reason": decision.decision_reason,
            "risk_score": decision.risk_score,
            "edd_required": True
        }
    
    elif decision.risk_tier == RiskTier.medium:
        logger.info(f"MEDIUM RISK: {decision.entity_name} → EDD queue")
        return {
            "application_id": decision.application_id,
            "action": "edd_queue",
            "reason": decision.decision_reason,
            "risk_score": decision.risk_score,
            "documents_required": [
                "source_of_funds",
                "purpose_of_relationship",
                "beneficial_ownership_declaration"
            ]
        }
    
    else:  # low risk
        logger.info(f"APPROVED: {decision.entity_name} — auto-approval")
        return {
            "application_id": decision.application_id,
            "action": "approve",
            "risk_score": decision.risk_score,
            "next_review_date": "12_months"
        }

Step 4: Expose as a FastAPI Endpoint

# main.py
from fastapi import FastAPI, HTTPException, Header
from models import KYCApplication
from pipeline import process_kyc_application
import os
 
app = FastAPI(title="KYC Onboarding API", version="1.0.0")
 
INTERNAL_API_KEY = os.environ.get("INTERNAL_API_KEY")
 
@app.post("/kyc/screen")
async def screen_kyc_application(
    application: KYCApplication,
    x_api_key: str = Header(...)
):
    """
    Screen a new KYC application and return routing decision.
    Internal use only — requires valid API key.
    """
    if x_api_key != INTERNAL_API_KEY:
        raise HTTPException(status_code=401, detail="Invalid API key")
    
    result = process_kyc_application(application)
    return result
 
@app.get("/health")
async def health():
    return {"status": "ok"}

Start the server:

uvicorn main:app --host 0.0.0.0 --port 8000

Step 5: Test the Pipeline

# test_pipeline.py
import requests
 
BASE_URL = "http://localhost:8000"
API_KEY = "your-internal-api-key"
 
test_cases = [
    # Low risk individual
    {
        "application_id": "APP-001",
        "entity_type": "individual",
        "full_name": "John Smith",
        "date_of_birth": "1985-03-15",
        "nationality": "CA",
        "email": "[email protected]"
    },
    # High-risk country individual
    {
        "application_id": "APP-002",
        "entity_type": "individual",
        "full_name": "Test User",
        "date_of_birth": "1990-07-20",
        "nationality": "IR",  # Iran — OFAC comprehensive sanctions
        "email": "[email protected]"
    }
]
 
for test in test_cases:
    response = requests.post(
        f"{BASE_URL}/kyc/screen",
        json=test,
        headers={"x-api-key": API_KEY}
    )
    result = response.json()
    print(f"\nApp {test['application_id']}: {result['action'].upper()}")
    print(f"  Reason: {result.get('reason', 'N/A')}")

Production Considerations

Database for audit logs: Replace the JSONL file with PostgreSQL or a SIEM-compatible log store. Audit logs must be tamper-evident and retained for the required period (7 years under BSA).

Async processing: For high-volume onboarding, process screening asynchronously using a task queue (Celery + Redis or AWS SQS). Return a pending status immediately and webhook the result when screening completes.

Ongoing monitoring: Schedule daily re-screening of your active customer base using SanctionShield AI's batch endpoint. Configure webhooks to trigger an EDD workflow if a previously-approved customer appears on a newly-updated sanctions list.

Failure handling: Never auto-approve on screening API errors. Route to manual review on any exception, and alert your compliance team.

Rate limits: SanctionShield AI supports high-throughput screening — check your plan's rate limits and implement exponential backoff for burst traffic.


Results: What Automated KYC Delivers

Organizations implementing automated KYC pipelines similar to this architecture typically see:

  • 90%+ of applications auto-approved within seconds (assuming clean customer base)
  • ~8% routed to EDD for additional documentation
  • ~2% blocked for sanctions matches or very high risk scores
  • Manual review reduced by 85%+ compared to fully manual KYC programs
  • Audit-ready documentation for every screening decision from day one

The pipeline described here takes under a day to implement and provides the documented, systematic screening process that FinCEN's reform proposal and the GENIUS Act require — without building ML models from scratch.


Next Steps

  • Get your SanctionShield AI API key at RapidAPI.com
  • Add identity document verification to the pipeline using Stripe Identity or Jumio
  • Implement the ongoing monitoring webhook to re-screen customers when sanctions lists update
  • Review the full KYC API comparison to choose the right document verification vendor for your use case