Build a FinCEN-Ready AML Compliance Dashboard with SanctionShield AI API
FinCEN's new AML whistleblower NPRM raises the compliance bar for financial institutions. Build a comprehensive AML monitoring dashboard using SanctionShield AI API in Python.

On April 1, 2026, FinCEN published a proposed rule in the Federal Register implementing the anti-money laundering whistleblower provisions of the Anti-Money Laundering Act of 2020. The rule creates whistleblower incentives of 10-30% of penalties exceeding $1 million across BSA violations, sanctions, foreign investment security, and data security failures.
The practical implication for compliance teams: every gap in your AML program is now a potential whistleblower trigger. Incomplete sanctions screening, undocumented risk assessments, and screening exceptions without proper justification are exactly the kind of deficiencies that whistleblowers — including current employees — can report for significant financial rewards.
This guide shows you how to build a comprehensive AML compliance dashboard using SanctionShield AI API that provides the audit-ready documentation, screening coverage metrics, and risk tiering that regulators and whistleblowers evaluate when assessing a program's adequacy.
What "FinCEN-Ready" AML Means in 2026
A FinCEN-ready AML program has three technical properties:
- Coverage completeness: Every counterparty, beneficiary, and beneficial owner is screened — not just account holders.
- Audit traceability: Every screening decision is logged with timestamp, match details, disposition, and reviewer identity.
- Risk-tiered disposition: High-risk matches are escalated, medium-risk matches are reviewed, and clean matches are archived with evidence.
The SanctionShield AI API provides the screening engine. This guide adds the coverage tracking, audit logging, and risk-tiered workflows on top.
Architecture
Incoming Entity (customer, vendor, beneficiary)
│
▼
┌────────────────────────┐
│ Entity Normalizer │ Standardize names, aliases, transliterations
└────────────┬───────────┘
│
▼
┌────────────────────────┐
│ SanctionShield AI API │ Screen against OFAC, UN, EU, FATF lists
└────────────┬───────────┘
│
▼
┌────────────────────────┐
│ Risk Tier Engine │ BLOCKED / HIGH / MEDIUM / LOW / CLEAN
└────────────┬───────────┘
│
▼
┌────────────────────────┐
│ Audit Logger │ Immutable log with reviewer trail
└────────────┬───────────┘
│
▼
┌────────────────────────┐
│ Compliance Dashboard │ Coverage metrics, exception queue
└────────────────────────┘
Step 1: Entity Normalizer
AML screening fails most often when entity names are inconsistently formatted. Build a normalizer first:
# normalization/entity_normalizer.py
import re
from dataclasses import dataclass
from typing import Optional
@dataclass
class NormalizedEntity:
primary_name: str
aliases: list[str]
entity_type: str # INDIVIDUAL, ORGANIZATION, VESSEL, AIRCRAFT
country_code: Optional[str]
id_numbers: list[dict] # {"type": "passport", "value": "AB123456"}
date_of_birth: Optional[str]
risk_tier_hint: Optional[str] # Pre-screen risk signal from onboarding data
def normalize_person_name(name: str) -> tuple[str, list[str]]:
"""
Normalize a person name and generate screening aliases.
Critical: Most AML false negatives come from name format mismatches.
"""
# Remove excessive whitespace
name = " ".join(name.strip().split())
# Generate order variations
parts = name.split()
aliases = []
if len(parts) >= 2:
# First Last
aliases.append(f"{parts[0]} {parts[-1]}")
# Last First
aliases.append(f"{parts[-1]} {parts[0]}")
# Last, First
aliases.append(f"{parts[-1]}, {parts[0]}")
# First Middle Last / Last First Middle (handle middle names)
if len(parts) == 3:
aliases.append(f"{parts[0]} {parts[2]}") # Drop middle name
# Remove duplicates while preserving order
seen = {name}
unique_aliases = [a for a in aliases if a not in seen and not seen.add(a)]
return name, unique_aliases
def normalize_organization_name(name: str) -> tuple[str, list[str]]:
"""
Normalize organization name with common abbreviation expansions.
"""
EXPANSIONS = {
"LLC": ["Limited Liability Company", "L.L.C."],
"LTD": ["Limited", "Ltd.", "Limited Company"],
"CO": ["Company", "Co.", "Corporation"],
"CORP": ["Corporation", "Corp."],
"INC": ["Incorporated", "Inc."]
}
name_upper = name.upper()
aliases = []
for abbrev, expansions in EXPANSIONS.items():
if abbrev in name_upper:
for expansion in expansions:
aliases.append(re.sub(abbrev, expansion, name, flags=re.IGNORECASE))
return name, aliases[:5] # Limit to 5 aliases per entityStep 2: Screening Engine with Risk Tiering
# screening/sanctionshield_screener.py
import httpx
import asyncio
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
from normalization.entity_normalizer import NormalizedEntity
SANCTIONSHIELD_API_URL = "https://apivult.com/api/sanctionshield/v1/screen"
API_KEY = "YOUR_API_KEY"
@dataclass
class ScreeningResult:
entity_id: str
screened_name: str
timestamp: str
match_found: bool
risk_tier: str # BLOCKED / HIGH / MEDIUM / LOW / CLEAN
match_score: float # 0.0 - 1.0
matched_lists: list[str]
match_details: list[dict]
required_action: str
auto_approved: bool
screening_id: str # Unique ID for audit trail
async def screen_entity(
entity: NormalizedEntity,
entity_id: str,
lists: list[str] = None
) -> ScreeningResult:
"""
Screen a normalized entity against sanctions lists.
Returns a risk-tiered screening result.
"""
lists = lists or ["OFAC_SDN", "OFAC_CONSOLIDATED", "UN_CONSOLIDATED", "EU_CONSOLIDATED", "FATF_HIGH_RISK"]
payload = {
"name": entity.primary_name,
"aliases": entity.aliases,
"entity_type": entity.entity_type,
"country_code": entity.country_code,
"date_of_birth": entity.date_of_birth,
"id_numbers": entity.id_numbers,
"lists": lists,
"fuzzy_matching": True,
"fuzzy_threshold": 0.82, # Catch transliteration variants
"include_match_details": True
}
headers = {
"X-RapidAPI-Key": API_KEY,
"Content-Type": "application/json"
}
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(SANCTIONSHIELD_API_URL, json=payload, headers=headers)
response.raise_for_status()
result = response.json()
# Apply risk tiering
match_score = result.get("highest_match_score", 0.0)
matched_lists = result.get("matched_lists", [])
if match_score >= 0.95 or any(l in matched_lists for l in ["OFAC_SDN", "UN_CONSOLIDATED"]):
risk_tier = "BLOCKED"
required_action = "REJECT — Do not process. File SAR within 30 days if transaction was attempted."
auto_approved = False
elif match_score >= 0.85:
risk_tier = "HIGH"
required_action = "HOLD — Senior compliance officer review required within 24 hours."
auto_approved = False
elif match_score >= 0.70:
risk_tier = "MEDIUM"
required_action = "REVIEW — Compliance analyst review required within 72 hours."
auto_approved = False
elif match_score >= 0.50:
risk_tier = "LOW"
required_action = "ENHANCED_DUE_DILIGENCE — Document enhanced due diligence basis."
auto_approved = True # With enhanced monitoring
else:
risk_tier = "CLEAN"
required_action = "APPROVE — No match found. Document and archive."
auto_approved = True
return ScreeningResult(
entity_id=entity_id,
screened_name=entity.primary_name,
timestamp=datetime.now(timezone.utc).isoformat(),
match_found=match_score >= 0.50,
risk_tier=risk_tier,
match_score=match_score,
matched_lists=matched_lists,
match_details=result.get("matches", []),
required_action=required_action,
auto_approved=auto_approved,
screening_id=result.get("screening_id", "")
)Step 3: Immutable Audit Logger
The audit log is your most important compliance artifact. Every screening must be logged with sufficient detail to reconstruct the decision:
# audit/immutable_audit_logger.py
import json
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from screening.sanctionshield_screener import ScreeningResult
class ImmutableAuditLogger:
"""
Append-only audit log for AML screening decisions.
Each entry includes a hash chain to detect tampering —
critical for demonstrating log integrity to examiners.
"""
def __init__(self, log_path: str = "audit/aml_screening_log.jsonl"):
self.log_path = Path(log_path)
self.log_path.parent.mkdir(parents=True, exist_ok=True)
self._last_hash = self._compute_chain_root()
def _compute_chain_root(self) -> str:
"""Get the hash of the last log entry (or genesis hash)."""
if not self.log_path.exists():
return hashlib.sha256(b"GENESIS").hexdigest()
last_line = None
with open(self.log_path, "r") as f:
for line in f:
last_line = line.strip()
if not last_line:
return hashlib.sha256(b"GENESIS").hexdigest()
return hashlib.sha256(last_line.encode()).hexdigest()
def log_screening(
self,
result: ScreeningResult,
operator_id: str,
business_justification: str = ""
) -> str:
"""
Append a screening result to the audit log.
Returns the log entry hash for reference.
"""
entry = {
"log_timestamp": datetime.now(timezone.utc).isoformat(),
"screening_id": result.screening_id,
"entity_id": result.entity_id,
"screened_name": result.screened_name,
"screening_timestamp": result.timestamp,
"risk_tier": result.risk_tier,
"match_score": result.match_score,
"matched_lists": result.matched_lists,
"required_action": result.required_action,
"auto_approved": result.auto_approved,
"operator_id": operator_id,
"business_justification": business_justification,
"previous_entry_hash": self._last_hash
}
entry_json = json.dumps(entry, sort_keys=True)
entry_hash = hashlib.sha256(entry_json.encode()).hexdigest()
# Append with hash — never overwrite
with open(self.log_path, "a") as f:
f.write(entry_json + "\n")
self._last_hash = entry_hash
return entry_hash
def generate_coverage_report(self, date_from: str, date_to: str) -> dict:
"""
Generate a compliance coverage report for a date range.
Used for board reporting and examiner requests.
"""
entries = []
with open(self.log_path, "r") as f:
for line in f:
entry = json.loads(line.strip())
if date_from <= entry["log_timestamp"] <= date_to:
entries.append(entry)
if not entries:
return {"error": "No entries found for date range"}
tier_counts = {}
for entry in entries:
tier = entry["risk_tier"]
tier_counts[tier] = tier_counts.get(tier, 0) + 1
blocked = tier_counts.get("BLOCKED", 0)
high = tier_counts.get("HIGH", 0)
total = len(entries)
return {
"period": f"{date_from} to {date_to}",
"total_screenings": total,
"tier_breakdown": tier_counts,
"match_rate": round((blocked + high) / total * 100, 2) if total > 0 else 0,
"auto_approved_rate": round(
sum(1 for e in entries if e["auto_approved"]) / total * 100, 2
) if total > 0 else 0,
"pending_reviews": sum(1 for e in entries if not e["auto_approved"] and e["risk_tier"] in ("HIGH", "MEDIUM"))
}Step 4: Compliance Dashboard with Coverage Metrics
# dashboard/aml_compliance_dashboard.py
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
from screening.sanctionshield_screener import screen_entity, ScreeningResult
from normalization.entity_normalizer import NormalizedEntity, normalize_person_name
from audit.immutable_audit_logger import ImmutableAuditLogger
app = FastAPI(title="AML Compliance Dashboard")
audit_logger = ImmutableAuditLogger()
class EntityScreenRequest(BaseModel):
entity_id: str
name: str
entity_type: str = "INDIVIDUAL"
country_code: str = None
operator_id: str
business_justification: str = ""
@app.post("/screen")
async def screen_and_log(request: EntityScreenRequest):
"""Screen an entity and log the result to the audit trail."""
primary_name, aliases = normalize_person_name(request.name)
entity = NormalizedEntity(
primary_name=primary_name,
aliases=aliases,
entity_type=request.entity_type,
country_code=request.country_code,
id_numbers=[],
date_of_birth=None,
risk_tier_hint=None
)
result = await screen_entity(entity, request.entity_id)
log_hash = audit_logger.log_screening(
result=result,
operator_id=request.operator_id,
business_justification=request.business_justification
)
return {
"screening_id": result.screening_id,
"risk_tier": result.risk_tier,
"match_score": result.match_score,
"required_action": result.required_action,
"auto_approved": result.auto_approved,
"log_hash": log_hash
}
@app.get("/coverage-report")
async def coverage_report(date_from: str, date_to: str):
"""Generate AML screening coverage report for the specified period."""
return audit_logger.generate_coverage_report(date_from, date_to)
@app.get("/exception-queue")
async def exception_queue():
"""Return pending HIGH and MEDIUM risk screenings requiring review."""
# In production, query from database — not from flat log file
return {"message": "Returns open screening exceptions requiring human review"}What Regulators Will Look For
When FinCEN examiners review your AML program in the context of a whistleblower complaint, they evaluate:
| Examiner Question | Your Evidence |
|---|---|
| Are all counterparties screened? | Coverage metrics: total_screenings / total_counterparties |
| Are high-risk matches reviewed? | Audit log showing BLOCKED/HIGH dispositions with reviewer ID |
| Are decisions documented? | business_justification field in each log entry |
| Is the log tamper-evident? | Hash chain in the audit log |
| Are SARs filed within 30 days? | SAR filing records linked to BLOCKED screening IDs |
The dashboard provides all of this in an exportable, examiner-ready format.
Start building your FinCEN-ready AML program at SanctionShield AI API on APIVult.
More Articles
Real-Time AML Sanctions Screening in Python: A Complete Integration Guide
Real-time sanctions screening against OFAC, UN, EU lists. Integrate SanctionShield API for AML/KYC in Python.
March 31, 2026
How to Automate Vendor Due Diligence Screening with SanctionShield AI API
Build automated vendor due diligence pipelines that screen suppliers against OFAC, UN, and EU watchlists in real time using SanctionShield AI API in Python.
April 5, 2026