Education· Last updated April 20, 2026

How to Build a Real-Time OFAC Sanctions Monitoring System in Python

Step-by-step guide to building an automated OFAC sanctions screening and alert system in Python using SanctionShield API — including batch processing, webhook alerts, and audit logging.

How to Build a Real-Time OFAC Sanctions Monitoring System in Python

OFAC issued 18 new designations in four days earlier this month. Then, on April 14-15, it added 25+ entities in the Shamkhani Iranian oil smuggling network — including vessels, front companies, and financiers across multiple jurisdictions. Each new designation creates an obligation: if any of those entities matches an existing counterparty in your database, you need to know immediately.

This guide walks through building a production-grade OFAC sanctions monitoring system in Python that handles both real-time screening at transaction time and continuous monitoring of your existing counterparty database.

What We're Building

By the end of this guide, you'll have a complete Python system that:

  1. Screens new counterparties at onboarding or transaction time
  2. Monitors your existing database against new OFAC designations via scheduled batch jobs
  3. Sends alerts when matches are detected, with full audit trail
  4. Handles fuzzy matching for name variations and transliterations
  5. Stores screening records for OFAC compliance documentation requirements

Prerequisites: Python 3.10+, requests, schedule, sqlite3 (stdlib), basic async knowledge

Architecture Overview

┌─────────────────────────────────────────────────────────┐
│                  Your Application                        │
├─────────────────┬───────────────────────────────────────┤
│  New Entity     │  Existing Portfolio                   │
│  Screening      │  Monitoring (Scheduled)                │
│  (Real-time)    │                                        │
└────────┬────────┴─────────────────┬─────────────────────┘
         │                          │
         ▼                          ▼
┌─────────────────────────────────────────────────────────┐
│             SanctionShield AI API                        │
│  OFAC SDN | UN Consolidated | EU Sanctions | PEP         │
└──────────────────────────────┬──────────────────────────┘
                               │
         ┌─────────────────────┼──────────────────────┐
         ▼                     ▼                       ▼
    Match Found           No Match              Error/Timeout
    → Alert + Block       → Approve             → Queue Retry
    → Audit Log           → Audit Log           → Alert Ops

Step 1: Project Setup

pip install requests schedule python-dotenv
sanctions_monitor/
├── config.py
├── screener.py
├── monitor.py
├── alerts.py
├── database.py
└── main.py

Step 2: Configuration

# config.py
import os
from dotenv import load_dotenv
 
load_dotenv()
 
SANCTIONSHIELD_API_KEY = os.getenv("SANCTIONSHIELD_API_KEY", "YOUR_API_KEY")
API_BASE_URL = "https://apivult.com/sanctionshield/v1"
 
WATCHLISTS = [
    "OFAC_SDN",
    "OFAC_CONS",
    "UN_CONSOLIDATED",
    "EU_SANCTIONS",
    "PEP_GLOBAL"
]
 
# Match sensitivity — 0.80 is recommended for compliance use
# Higher threshold = fewer false positives but more false negatives
MATCH_THRESHOLD = 0.80
 
# Alert channels
SLACK_WEBHOOK_URL = os.getenv("SLACK_WEBHOOK_URL")
ALERT_EMAIL = os.getenv("ALERT_EMAIL")
 
# How often to re-screen existing counterparties (hours)
MONITORING_INTERVAL_HOURS = 24

Step 3: Core Screening Client

# screener.py
import requests
import time
import logging
from typing import Optional
from config import SANCTIONSHIELD_API_KEY, API_BASE_URL, WATCHLISTS, MATCH_THRESHOLD
 
logger = logging.getLogger(__name__)
 
 
class SanctionShieldClient:
    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({
            "X-RapidAPI-Key": SANCTIONSHIELD_API_KEY,
            "Content-Type": "application/json"
        })
        self.max_retries = 3
        self.retry_delay = 2.0
 
    def screen_entity(
        self,
        name: str,
        entity_type: str = "individual",
        date_of_birth: Optional[str] = None,
        nationality: Optional[str] = None,
        registration_country: Optional[str] = None,
        identification_number: Optional[str] = None
    ) -> dict:
        """
        Screen a single entity against OFAC and global sanctions lists.
        
        Returns structured result with status: CLEAR | POTENTIAL_MATCH | CONFIRMED_MATCH
        """
        payload = {
            "name": name,
            "entity_type": entity_type,
            "watchlists": WATCHLISTS,
            "match_threshold": MATCH_THRESHOLD,
            "enable_fuzzy_match": True,
            "enable_transliteration": True  # handles Arabic, Chinese, Cyrillic names
        }
 
        if date_of_birth:
            payload["date_of_birth"] = date_of_birth
        if nationality:
            payload["nationality"] = nationality
        if registration_country:
            payload["registration_country"] = registration_country
        if identification_number:
            payload["identification_number"] = identification_number
 
        for attempt in range(self.max_retries):
            try:
                response = self.session.post(
                    f"{API_BASE_URL}/screen",
                    json=payload,
                    timeout=10
                )
                response.raise_for_status()
                return response.json()
 
            except requests.exceptions.Timeout:
                logger.warning(f"Screening timeout for '{name}' (attempt {attempt + 1})")
                if attempt < self.max_retries - 1:
                    time.sleep(self.retry_delay * (attempt + 1))
                else:
                    # Fail-safe: return error status for manual review
                    return {
                        "status": "ERROR",
                        "risk_level": "MANUAL_REVIEW",
                        "error": "Screening service timeout — manual review required"
                    }
 
            except requests.exceptions.HTTPError as e:
                logger.error(f"API error screening '{name}': {e}")
                raise
 
    def batch_screen(self, entities: list[dict]) -> list[dict]:
        """
        Screen multiple entities in a single API call.
        More efficient than individual calls for portfolio monitoring.
        """
        response = self.session.post(
            f"{API_BASE_URL}/batch-screen",
            json={
                "entities": entities,
                "watchlists": WATCHLISTS,
                "match_threshold": MATCH_THRESHOLD,
                "enable_fuzzy_match": True
            },
            timeout=60  # batch calls take longer
        )
        response.raise_for_status()
        return response.json()["results"]

Step 4: Audit Database

# database.py
import sqlite3
import json
from datetime import datetime
from contextlib import contextmanager
 
DB_PATH = "sanctions_audit.db"
 
 
def initialize_db():
    with get_connection() as conn:
        conn.executescript("""
            CREATE TABLE IF NOT EXISTS screening_records (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                entity_id TEXT NOT NULL,
                entity_name TEXT NOT NULL,
                entity_type TEXT,
                status TEXT NOT NULL,
                risk_level TEXT NOT NULL,
                matches_json TEXT,
                screened_at TEXT NOT NULL,
                api_screening_id TEXT,
                triggered_by TEXT,
                disposition TEXT,
                disposition_notes TEXT,
                disposition_by TEXT,
                disposition_at TEXT
            );
 
            CREATE TABLE IF NOT EXISTS alert_log (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                screening_record_id INTEGER,
                alert_channel TEXT,
                alert_sent_at TEXT,
                acknowledged_at TEXT,
                FOREIGN KEY (screening_record_id) REFERENCES screening_records(id)
            );
 
            CREATE INDEX IF NOT EXISTS idx_entity_id ON screening_records(entity_id);
            CREATE INDEX IF NOT EXISTS idx_status ON screening_records(status);
            CREATE INDEX IF NOT EXISTS idx_screened_at ON screening_records(screened_at);
        """)
 
 
@contextmanager
def get_connection():
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()
 
 
def save_screening_record(
    entity_id: str,
    entity_name: str,
    entity_type: str,
    result: dict,
    triggered_by: str = "automated"
) -> int:
    with get_connection() as conn:
        cursor = conn.execute("""
            INSERT INTO screening_records 
            (entity_id, entity_name, entity_type, status, risk_level,
             matches_json, screened_at, api_screening_id, triggered_by)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            entity_id,
            entity_name,
            entity_type,
            result.get("status", "ERROR"),
            result.get("risk_level", "UNKNOWN"),
            json.dumps(result.get("matches", [])),
            datetime.utcnow().isoformat(),
            result.get("screening_id"),
            triggered_by
        ))
        return cursor.lastrowid

Step 5: Alert System

# alerts.py
import requests
import logging
from datetime import datetime
from config import SLACK_WEBHOOK_URL
 
logger = logging.getLogger(__name__)
 
 
def send_match_alert(entity_name: str, entity_id: str, matches: list, risk_level: str):
    """
    Send an alert when a sanctions match is detected.
    """
    message = build_alert_message(entity_name, entity_id, matches, risk_level)
 
    # Slack alert
    if SLACK_WEBHOOK_URL:
        try:
            requests.post(
                SLACK_WEBHOOK_URL,
                json={"text": message},
                timeout=5
            )
        except Exception as e:
            logger.error(f"Failed to send Slack alert: {e}")
 
    # Always log to file as backup
    logger.critical(f"SANCTIONS MATCH DETECTED: {message}")
 
 
def build_alert_message(
    entity_name: str,
    entity_id: str,
    matches: list,
    risk_level: str
) -> str:
    lines = [
        f"🚨 *SANCTIONS SCREENING ALERT*",
        f"*Entity*: {entity_name} (ID: {entity_id})",
        f"*Risk Level*: {risk_level}",
        f"*Timestamp*: {datetime.utcnow().isoformat()} UTC",
        f"*Matches Found*: {len(matches)}",
        ""
    ]
    for match in matches[:3]:  # show top 3 matches
        lines.append(
            f"• {match.get('matched_name')} "
            f"({match.get('confidence', 0):.0%} confidence) — "
            f"{match.get('watchlist')} / {match.get('sanctions_program')}"
        )
    lines.append("\n*Action Required*: Review in compliance dashboard immediately.")
    return "\n".join(lines)

Step 6: Main Screening Workflow

# main.py
import logging
import schedule
import time
from screener import SanctionShieldClient
from database import initialize_db, save_screening_record
from alerts import send_match_alert
 
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)
 
client = SanctionShieldClient()
 
 
def screen_new_counterparty(
    entity_id: str,
    entity_name: str,
    entity_type: str = "individual",
    **kwargs
) -> dict:
    """
    Screen a new counterparty at onboarding time.
    Returns decision: APPROVE | REVIEW | BLOCK
    """
    logger.info(f"Screening new counterparty: {entity_name}")
    result = client.screen_entity(entity_name, entity_type, **kwargs)
 
    record_id = save_screening_record(
        entity_id, entity_name, entity_type, result, triggered_by="onboarding"
    )
 
    if result.get("status") in ("POTENTIAL_MATCH", "CONFIRMED_MATCH"):
        send_match_alert(
            entity_name,
            entity_id,
            result.get("matches", []),
            result.get("risk_level", "HIGH")
        )
        return {
            "decision": "BLOCK" if result["status"] == "CONFIRMED_MATCH" else "REVIEW",
            "record_id": record_id,
            "screening_result": result
        }
 
    return {"decision": "APPROVE", "record_id": record_id}
 
 
def run_portfolio_monitoring():
    """
    Re-screen all active counterparties against current sanctions lists.
    Runs on a schedule to catch newly designated entities.
    """
    logger.info("Starting portfolio monitoring scan...")
    
    # In production: load from your counterparty database
    # This example shows the pattern
    counterparties = load_active_counterparties()
    
    if not counterparties:
        logger.info("No counterparties to screen")
        return
 
    # Batch API call for efficiency
    entities_payload = [
        {
            "id": cp["id"],
            "name": cp["name"],
            "entity_type": cp.get("type", "individual")
        }
        for cp in counterparties
    ]
 
    results = client.batch_screen(entities_payload)
    
    hits = 0
    for entity, result in zip(counterparties, results):
        save_screening_record(
            entity["id"], entity["name"], entity.get("type", "individual"),
            result, triggered_by="scheduled_monitoring"
        )
        if result.get("status") in ("POTENTIAL_MATCH", "CONFIRMED_MATCH"):
            hits += 1
            send_match_alert(
                entity["name"], entity["id"],
                result.get("matches", []),
                result.get("risk_level", "HIGH")
            )
 
    logger.info(f"Portfolio scan complete: {len(counterparties)} screened, {hits} hits")
 
 
def load_active_counterparties() -> list[dict]:
    """
    Load active counterparties from your database.
    Replace with your actual data source.
    """
    return [
        {"id": "cp_001", "name": "Acme Trading LLC", "type": "organization"},
        {"id": "cp_002", "name": "John Smith", "type": "individual"},
    ]
 
 
if __name__ == "__main__":
    initialize_db()
    
    # Screen a new counterparty
    result = screen_new_counterparty(
        entity_id="cp_new_001",
        entity_name="Viktor Petrov",
        entity_type="individual",
        nationality="RU"
    )
    print(f"Onboarding decision: {result['decision']}")
 
    # Schedule portfolio monitoring every 24 hours
    schedule.every(24).hours.do(run_portfolio_monitoring)
    
    # Run once immediately at startup
    run_portfolio_monitoring()
    
    logger.info("Sanctions monitor running. Portfolio re-screen every 24 hours.")
    while True:
        schedule.run_pending()
        time.sleep(60)

Compliance Documentation: What to Retain

OFAC requires documented evidence that your screening program is operating effectively. Your sanctions_audit.db captures the minimum required:

Record TypeRetention Requirement
Screening results (CLEAR)5 years
Potential/Confirmed matches5 years from resolution
Disposition decisions5 years from decision
Voluntary self-disclosures10 years

Export screening records monthly to immutable storage (S3 with Object Lock, or equivalent). This ensures the audit trail cannot be modified after the fact — a requirement for OFAC voluntary self-disclosure programs.

Expected Performance Benchmarks

Running this system against a counterparty database of 10,000 entities:

  • Individual screening (real-time): ~150-200ms per call
  • Batch monitoring (10K entities): ~45-90 seconds via batch endpoint
  • Alert delivery (Slack): under 2 seconds from match detection
  • Database write throughput: >1,000 records/minute on standard SQLite

For portfolios exceeding 100K entities, migrate from SQLite to PostgreSQL and consider parallel batch processing.

What This System Cannot Do

This guide covers the sanctions screening component of KYC. A production compliance program also requires:

  • Identity verification (document + biometric) — use Onfido or Persona APIs
  • Adverse media screening — use ComplyAdvantage or Dow Jones APIs
  • Transaction monitoring for AML — use Sardine, Unit21, or equivalent
  • Legal entity verification (corporate beneficial ownership) — use OpenCorporates or Dun & Bradstreet APIs

Sanctions screening is necessary but not sufficient. Build the other layers progressively, starting with the highest-risk components for your specific business model.

Get your API key at apivult.com and run your first screening in under 5 minutes.