Education· Last updated April 8, 2026

Build a FinCEN-Ready AML Compliance Dashboard with SanctionShield AI API

FinCEN's new AML whistleblower NPRM raises the compliance bar for financial institutions. Build a comprehensive AML monitoring dashboard using SanctionShield AI API in Python.

Build a FinCEN-Ready AML Compliance Dashboard with SanctionShield AI API

On April 1, 2026, FinCEN published a proposed rule in the Federal Register implementing the anti-money laundering whistleblower provisions of the Anti-Money Laundering Act of 2020. The rule creates whistleblower incentives of 10-30% of penalties exceeding $1 million across BSA violations, sanctions, foreign investment security, and data security failures.

The practical implication for compliance teams: every gap in your AML program is now a potential whistleblower trigger. Incomplete sanctions screening, undocumented risk assessments, and screening exceptions without proper justification are exactly the kind of deficiencies that whistleblowers — including current employees — can report for significant financial rewards.

This guide shows you how to build a comprehensive AML compliance dashboard using SanctionShield AI API that provides the audit-ready documentation, screening coverage metrics, and risk tiering that regulators and whistleblowers evaluate when assessing a program's adequacy.

What "FinCEN-Ready" AML Means in 2026

A FinCEN-ready AML program has three technical properties:

  1. Coverage completeness: Every counterparty, beneficiary, and beneficial owner is screened — not just account holders.
  2. Audit traceability: Every screening decision is logged with timestamp, match details, disposition, and reviewer identity.
  3. Risk-tiered disposition: High-risk matches are escalated, medium-risk matches are reviewed, and clean matches are archived with evidence.

The SanctionShield AI API provides the screening engine. This guide adds the coverage tracking, audit logging, and risk-tiered workflows on top.

Architecture

Incoming Entity (customer, vendor, beneficiary)
           │
           ▼
┌────────────────────────┐
│ Entity Normalizer      │  Standardize names, aliases, transliterations
└────────────┬───────────┘
             │
             ▼
┌────────────────────────┐
│ SanctionShield AI API  │  Screen against OFAC, UN, EU, FATF lists
└────────────┬───────────┘
             │
             ▼
┌────────────────────────┐
│ Risk Tier Engine       │  BLOCKED / HIGH / MEDIUM / LOW / CLEAN
└────────────┬───────────┘
             │
             ▼
┌────────────────────────┐
│ Audit Logger           │  Immutable log with reviewer trail
└────────────┬───────────┘
             │
             ▼
┌────────────────────────┐
│ Compliance Dashboard   │  Coverage metrics, exception queue
└────────────────────────┘

Step 1: Entity Normalizer

AML screening fails most often when entity names are inconsistently formatted. Build a normalizer first:

# normalization/entity_normalizer.py
import re
from dataclasses import dataclass
from typing import Optional
 
@dataclass
class NormalizedEntity:
    primary_name: str
    aliases: list[str]
    entity_type: str          # INDIVIDUAL, ORGANIZATION, VESSEL, AIRCRAFT
    country_code: Optional[str]
    id_numbers: list[dict]    # {"type": "passport", "value": "AB123456"}
    date_of_birth: Optional[str]
    risk_tier_hint: Optional[str]  # Pre-screen risk signal from onboarding data
 
def normalize_person_name(name: str) -> tuple[str, list[str]]:
    """
    Normalize a person name and generate screening aliases.
    Critical: Most AML false negatives come from name format mismatches.
    """
    # Remove excessive whitespace
    name = " ".join(name.strip().split())
    
    # Generate order variations
    parts = name.split()
    aliases = []
    
    if len(parts) >= 2:
        # First Last
        aliases.append(f"{parts[0]} {parts[-1]}")
        # Last First
        aliases.append(f"{parts[-1]} {parts[0]}")
        # Last, First
        aliases.append(f"{parts[-1]}, {parts[0]}")
        # First Middle Last / Last First Middle (handle middle names)
        if len(parts) == 3:
            aliases.append(f"{parts[0]} {parts[2]}")  # Drop middle name
 
    # Remove duplicates while preserving order
    seen = {name}
    unique_aliases = [a for a in aliases if a not in seen and not seen.add(a)]
    
    return name, unique_aliases
 
def normalize_organization_name(name: str) -> tuple[str, list[str]]:
    """
    Normalize organization name with common abbreviation expansions.
    """
    EXPANSIONS = {
        "LLC": ["Limited Liability Company", "L.L.C."],
        "LTD": ["Limited", "Ltd.", "Limited Company"],
        "CO": ["Company", "Co.", "Corporation"],
        "CORP": ["Corporation", "Corp."],
        "INC": ["Incorporated", "Inc."]
    }
 
    name_upper = name.upper()
    aliases = []
    
    for abbrev, expansions in EXPANSIONS.items():
        if abbrev in name_upper:
            for expansion in expansions:
                aliases.append(re.sub(abbrev, expansion, name, flags=re.IGNORECASE))
 
    return name, aliases[:5]  # Limit to 5 aliases per entity

Step 2: Screening Engine with Risk Tiering

# screening/sanctionshield_screener.py
import httpx
import asyncio
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
from normalization.entity_normalizer import NormalizedEntity
 
SANCTIONSHIELD_API_URL = "https://apivult.com/api/sanctionshield/v1/screen"
API_KEY = "YOUR_API_KEY"
 
@dataclass
class ScreeningResult:
    entity_id: str
    screened_name: str
    timestamp: str
    match_found: bool
    risk_tier: str          # BLOCKED / HIGH / MEDIUM / LOW / CLEAN
    match_score: float      # 0.0 - 1.0
    matched_lists: list[str]
    match_details: list[dict]
    required_action: str
    auto_approved: bool
    screening_id: str       # Unique ID for audit trail
 
async def screen_entity(
    entity: NormalizedEntity,
    entity_id: str,
    lists: list[str] = None
) -> ScreeningResult:
    """
    Screen a normalized entity against sanctions lists.
    Returns a risk-tiered screening result.
    """
    lists = lists or ["OFAC_SDN", "OFAC_CONSOLIDATED", "UN_CONSOLIDATED", "EU_CONSOLIDATED", "FATF_HIGH_RISK"]
 
    payload = {
        "name": entity.primary_name,
        "aliases": entity.aliases,
        "entity_type": entity.entity_type,
        "country_code": entity.country_code,
        "date_of_birth": entity.date_of_birth,
        "id_numbers": entity.id_numbers,
        "lists": lists,
        "fuzzy_matching": True,
        "fuzzy_threshold": 0.82,          # Catch transliteration variants
        "include_match_details": True
    }
 
    headers = {
        "X-RapidAPI-Key": API_KEY,
        "Content-Type": "application/json"
    }
 
    async with httpx.AsyncClient(timeout=15.0) as client:
        response = await client.post(SANCTIONSHIELD_API_URL, json=payload, headers=headers)
        response.raise_for_status()
        result = response.json()
 
    # Apply risk tiering
    match_score = result.get("highest_match_score", 0.0)
    matched_lists = result.get("matched_lists", [])
 
    if match_score >= 0.95 or any(l in matched_lists for l in ["OFAC_SDN", "UN_CONSOLIDATED"]):
        risk_tier = "BLOCKED"
        required_action = "REJECT — Do not process. File SAR within 30 days if transaction was attempted."
        auto_approved = False
    elif match_score >= 0.85:
        risk_tier = "HIGH"
        required_action = "HOLD — Senior compliance officer review required within 24 hours."
        auto_approved = False
    elif match_score >= 0.70:
        risk_tier = "MEDIUM"
        required_action = "REVIEW — Compliance analyst review required within 72 hours."
        auto_approved = False
    elif match_score >= 0.50:
        risk_tier = "LOW"
        required_action = "ENHANCED_DUE_DILIGENCE — Document enhanced due diligence basis."
        auto_approved = True  # With enhanced monitoring
    else:
        risk_tier = "CLEAN"
        required_action = "APPROVE — No match found. Document and archive."
        auto_approved = True
 
    return ScreeningResult(
        entity_id=entity_id,
        screened_name=entity.primary_name,
        timestamp=datetime.now(timezone.utc).isoformat(),
        match_found=match_score >= 0.50,
        risk_tier=risk_tier,
        match_score=match_score,
        matched_lists=matched_lists,
        match_details=result.get("matches", []),
        required_action=required_action,
        auto_approved=auto_approved,
        screening_id=result.get("screening_id", "")
    )

Step 3: Immutable Audit Logger

The audit log is your most important compliance artifact. Every screening must be logged with sufficient detail to reconstruct the decision:

# audit/immutable_audit_logger.py
import json
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from screening.sanctionshield_screener import ScreeningResult
 
class ImmutableAuditLogger:
    """
    Append-only audit log for AML screening decisions.
    
    Each entry includes a hash chain to detect tampering —
    critical for demonstrating log integrity to examiners.
    """
    
    def __init__(self, log_path: str = "audit/aml_screening_log.jsonl"):
        self.log_path = Path(log_path)
        self.log_path.parent.mkdir(parents=True, exist_ok=True)
        self._last_hash = self._compute_chain_root()
 
    def _compute_chain_root(self) -> str:
        """Get the hash of the last log entry (or genesis hash)."""
        if not self.log_path.exists():
            return hashlib.sha256(b"GENESIS").hexdigest()
        
        last_line = None
        with open(self.log_path, "r") as f:
            for line in f:
                last_line = line.strip()
        
        if not last_line:
            return hashlib.sha256(b"GENESIS").hexdigest()
        
        return hashlib.sha256(last_line.encode()).hexdigest()
 
    def log_screening(
        self,
        result: ScreeningResult,
        operator_id: str,
        business_justification: str = ""
    ) -> str:
        """
        Append a screening result to the audit log.
        Returns the log entry hash for reference.
        """
        entry = {
            "log_timestamp": datetime.now(timezone.utc).isoformat(),
            "screening_id": result.screening_id,
            "entity_id": result.entity_id,
            "screened_name": result.screened_name,
            "screening_timestamp": result.timestamp,
            "risk_tier": result.risk_tier,
            "match_score": result.match_score,
            "matched_lists": result.matched_lists,
            "required_action": result.required_action,
            "auto_approved": result.auto_approved,
            "operator_id": operator_id,
            "business_justification": business_justification,
            "previous_entry_hash": self._last_hash
        }
 
        entry_json = json.dumps(entry, sort_keys=True)
        entry_hash = hashlib.sha256(entry_json.encode()).hexdigest()
        
        # Append with hash — never overwrite
        with open(self.log_path, "a") as f:
            f.write(entry_json + "\n")
 
        self._last_hash = entry_hash
        return entry_hash
 
    def generate_coverage_report(self, date_from: str, date_to: str) -> dict:
        """
        Generate a compliance coverage report for a date range.
        Used for board reporting and examiner requests.
        """
        entries = []
        with open(self.log_path, "r") as f:
            for line in f:
                entry = json.loads(line.strip())
                if date_from <= entry["log_timestamp"] <= date_to:
                    entries.append(entry)
 
        if not entries:
            return {"error": "No entries found for date range"}
 
        tier_counts = {}
        for entry in entries:
            tier = entry["risk_tier"]
            tier_counts[tier] = tier_counts.get(tier, 0) + 1
 
        blocked = tier_counts.get("BLOCKED", 0)
        high = tier_counts.get("HIGH", 0)
        total = len(entries)
 
        return {
            "period": f"{date_from} to {date_to}",
            "total_screenings": total,
            "tier_breakdown": tier_counts,
            "match_rate": round((blocked + high) / total * 100, 2) if total > 0 else 0,
            "auto_approved_rate": round(
                sum(1 for e in entries if e["auto_approved"]) / total * 100, 2
            ) if total > 0 else 0,
            "pending_reviews": sum(1 for e in entries if not e["auto_approved"] and e["risk_tier"] in ("HIGH", "MEDIUM"))
        }

Step 4: Compliance Dashboard with Coverage Metrics

# dashboard/aml_compliance_dashboard.py
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
from screening.sanctionshield_screener import screen_entity, ScreeningResult
from normalization.entity_normalizer import NormalizedEntity, normalize_person_name
from audit.immutable_audit_logger import ImmutableAuditLogger
 
app = FastAPI(title="AML Compliance Dashboard")
audit_logger = ImmutableAuditLogger()
 
class EntityScreenRequest(BaseModel):
    entity_id: str
    name: str
    entity_type: str = "INDIVIDUAL"
    country_code: str = None
    operator_id: str
    business_justification: str = ""
 
@app.post("/screen")
async def screen_and_log(request: EntityScreenRequest):
    """Screen an entity and log the result to the audit trail."""
    primary_name, aliases = normalize_person_name(request.name)
    
    entity = NormalizedEntity(
        primary_name=primary_name,
        aliases=aliases,
        entity_type=request.entity_type,
        country_code=request.country_code,
        id_numbers=[],
        date_of_birth=None,
        risk_tier_hint=None
    )
 
    result = await screen_entity(entity, request.entity_id)
    
    log_hash = audit_logger.log_screening(
        result=result,
        operator_id=request.operator_id,
        business_justification=request.business_justification
    )
 
    return {
        "screening_id": result.screening_id,
        "risk_tier": result.risk_tier,
        "match_score": result.match_score,
        "required_action": result.required_action,
        "auto_approved": result.auto_approved,
        "log_hash": log_hash
    }
 
@app.get("/coverage-report")
async def coverage_report(date_from: str, date_to: str):
    """Generate AML screening coverage report for the specified period."""
    return audit_logger.generate_coverage_report(date_from, date_to)
 
@app.get("/exception-queue")
async def exception_queue():
    """Return pending HIGH and MEDIUM risk screenings requiring review."""
    # In production, query from database — not from flat log file
    return {"message": "Returns open screening exceptions requiring human review"}

What Regulators Will Look For

When FinCEN examiners review your AML program in the context of a whistleblower complaint, they evaluate:

Examiner QuestionYour Evidence
Are all counterparties screened?Coverage metrics: total_screenings / total_counterparties
Are high-risk matches reviewed?Audit log showing BLOCKED/HIGH dispositions with reviewer ID
Are decisions documented?business_justification field in each log entry
Is the log tamper-evident?Hash chain in the audit log
Are SARs filed within 30 days?SAR filing records linked to BLOCKED screening IDs

The dashboard provides all of this in an exportable, examiner-ready format.

Start building your FinCEN-ready AML program at SanctionShield AI API on APIVult.