ScamVerify
Tutorials

Detect Invoice and Payment Fraud

Step-by-step tutorial for building an accounts payable screening tool that verifies invoices using multi-channel ScamVerify™ API lookups before processing payment.

This tutorial walks you through building an accounts payable screening tool that verifies invoices before your organization processes payment. By the end, you will have a working pipeline that uploads an invoice image, extracts entities (addresses, phone numbers, URLs), cross-references them across multiple ScamVerify™ channels, and returns a payment decision.

Invoice fraud losses are accelerating. In Texas alone, average fake invoice losses jumped from $33,000 to $567,000 per incident, a 16x increase year over year. Scoular Company, a Nebraska grain trader, lost $17.2 million to a single invoice fraud scheme. Most of these losses are preventable with automated verification at the point of payment approval.

Prerequisites

  • Python 3.9 or later (primary) or Node.js 18+ (secondary)
  • A ScamVerify™ API key (get one at scamverify.ai/settings/api)
  • A test invoice image (JPG, PNG, WebP, HEIC, or single-page PDF, max 4.5 MB)

What You Will Build

An invoice screening pipeline that:

  1. Uploads an invoice image to the ScamVerify™ document analysis API
  2. Extracts phone numbers, URLs, and addresses from the invoice
  3. Cross-references extracted phone numbers via the phone lookup API
  4. Cross-references extracted URLs via the URL lookup API
  5. Combines all signals into a risk decision (auto-approve, manual review, or auto-reject)
  6. Handles rate limiting, quota management, and error recovery

Set up the project

mkdir invoice-screener && cd invoice-screener
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate
pip install requests python-dotenv flask

Create a .env file:

SCAMVERIFY_API_KEY=sv_live_your_api_key_here

Create the multi-channel ScamVerify™ client

Build a client that can call the document, phone, and URL endpoints. The document endpoint uses multipart/form-data while phone and URL use JSON.

# scamverify.py
import os
import time
import requests
from dataclasses import dataclass, field
from typing import Optional


class ScamVerifyError(Exception):
    def __init__(self, message: str, status_code: int = 0, retryable: bool = False):
        super().__init__(message)
        self.status_code = status_code
        self.retryable = retryable


class ScamVerifyClient:
    BASE_URL = "https://scamverify.ai/api/v1"
    MAX_RETRIES = 2

    def __init__(self, api_key: str):
        if not api_key or not api_key.startswith("sv_"):
            raise ValueError("Invalid API key. Keys must start with sv_live_ or sv_test_")
        self.api_key = api_key
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {api_key}",
        })

    def _handle_error(self, response: requests.Response) -> None:
        if response.status_code == 401:
            raise ScamVerifyError("Invalid or revoked API key", 401)
        if response.status_code == 402:
            raise ScamVerifyError("Quota exhausted", 402)
        if response.status_code == 429:
            retry_after = response.headers.get("Retry-After", "60")
            raise ScamVerifyError(
                f"Rate limited. Retry after {retry_after}s.", 429, retryable=True
            )
        if response.status_code >= 500:
            raise ScamVerifyError(
                f"Server error: {response.status_code}", response.status_code, retryable=True
            )
        if not response.ok:
            error_data = {}
            try:
                error_data = response.json().get("error", {})
            except Exception:
                pass
            raise ScamVerifyError(
                error_data.get("message", f"API error: {response.status_code}"),
                response.status_code,
            )

    def _request_with_retry(self, method: str, url: str, **kwargs) -> requests.Response:
        """Make a request with exponential backoff for retryable errors."""
        last_error = None
        for attempt in range(self.MAX_RETRIES + 1):
            try:
                response = self.session.request(method, url, **kwargs)
                if response.ok:
                    return response
                self._handle_error(response)
            except ScamVerifyError as e:
                last_error = e
                if not e.retryable or attempt == self.MAX_RETRIES:
                    raise
                delay = 2 ** attempt
                time.sleep(delay)
        raise last_error

    def analyze_document(self, file_path: str) -> dict:
        """Upload a document image for analysis (multipart/form-data)."""
        mime_types = {
            ".jpg": "image/jpeg", ".jpeg": "image/jpeg",
            ".png": "image/png", ".webp": "image/webp",
            ".heic": "image/heic", ".heif": "image/heif",
            ".pdf": "application/pdf",
        }
        ext = os.path.splitext(file_path)[1].lower()
        mime_type = mime_types.get(ext, "application/octet-stream")

        with open(file_path, "rb") as f:
            files = {"file": (os.path.basename(file_path), f, mime_type)}
            response = self._request_with_retry(
                "POST", f"{self.BASE_URL}/document/analyze", files=files
            )

        return response.json()

    def lookup_phone(self, phone_number: str) -> dict:
        """Look up a phone number (JSON)."""
        response = self._request_with_retry(
            "POST",
            f"{self.BASE_URL}/phone/lookup",
            json={"phone_number": phone_number},
            headers={"Content-Type": "application/json"},
        )
        return response.json()

    def lookup_url(self, url: str) -> dict:
        """Look up a URL (JSON)."""
        response = self._request_with_retry(
            "POST",
            f"{self.BASE_URL}/url/lookup",
            json={"url": url},
            headers={"Content-Type": "application/json"},
        )
        return response.json()

Build the invoice screening pipeline

Create the core screening logic that chains document analysis with phone and URL cross-references.

# screener.py
from dataclasses import dataclass, field
from typing import Optional


# Risk thresholds for payment decisions
AUTO_APPROVE_MAX = 25     # Score 0 to 25: auto-approve
MANUAL_REVIEW_MAX = 60    # Score 26 to 60: manual review
# Score 61+: auto-reject


@dataclass
class InvoiceScreeningResult:
    decision: str  # "auto_approve", "manual_review", "auto_reject"
    combined_risk_score: int
    document_risk_score: int
    document_verdict: str
    document_type: str
    claimed_issuer: Optional[str]
    recommended_action: str
    red_flags: list
    entity_findings: list = field(default_factory=list)
    phone_cross_references: list = field(default_factory=list)
    url_cross_references: list = field(default_factory=list)
    address_findings: list = field(default_factory=list)


def screen_invoice(client, file_path: str) -> InvoiceScreeningResult:
    """Full invoice screening pipeline with multi-channel cross-references.

    Flow:
    1. Upload invoice to document analysis
    2. Extract phone numbers and URLs from the response
    3. Cross-reference each phone number via phone lookup
    4. Cross-reference each URL via URL lookup
    5. Combine all signals into a single risk decision
    """

    # Step 1: Document analysis
    doc_result = client.analyze_document(file_path)

    extracted = doc_result.get("extracted_entities", {})
    verifications = doc_result.get("entity_verifications", {})

    # Step 2: Cross-reference extracted phone numbers
    phone_results = []
    for phone in extracted.get("phone_numbers", [])[:3]:  # Cap at 3 to manage quota
        try:
            phone_data = client.lookup_phone(phone)
            phone_results.append({
                "number": phone,
                "risk_score": phone_data.get("risk_score", 0),
                "verdict": phone_data.get("verdict", "unknown"),
                "carrier": phone_data.get("signals", {}).get("carrier"),
                "line_type": phone_data.get("signals", {}).get("line_type"),
                "ftc_complaints": phone_data.get("signals", {}).get("ftc_complaints", 0),
                "robocall_flagged": phone_data.get("signals", {}).get("robocall_flagged", False),
            })
        except Exception as e:
            phone_results.append({
                "number": phone,
                "error": str(e),
                "risk_score": None,
            })

    # Step 3: Cross-reference extracted URLs
    url_results = []
    for url in extracted.get("urls", [])[:3]:  # Cap at 3 to manage quota
        try:
            url_data = client.lookup_url(url)
            url_results.append({
                "url": url,
                "risk_score": url_data.get("risk_score", 0),
                "verdict": url_data.get("verdict", "unknown"),
                "domain_age_days": url_data.get("signals", {}).get("domain_age_days"),
                "brand_impersonation": url_data.get("signals", {}).get("brand_impersonation"),
                "urlhaus_listed": url_data.get("signals", {}).get("urlhaus_listed", False),
            })
        except Exception as e:
            url_results.append({
                "url": url,
                "error": str(e),
                "risk_score": None,
            })

    # Step 4: Analyze address verifications
    address_findings = []
    for addr in verifications.get("addresses", []):
        finding = {"address": addr.get("address", "")}
        if addr.get("is_cmra"):
            finding["warning"] = "CMRA address (mailbox service). Legitimate businesses rarely use PO box services as their primary address."
            finding["severity"] = "high"
        elif not addr.get("address_valid"):
            finding["warning"] = "Address could not be validated."
            finding["severity"] = "medium"
        elif addr.get("institution_found") and addr.get("institution_matches"):
            finding["status"] = "verified"
            finding["severity"] = "positive"
        else:
            finding["status"] = "unverified"
            finding["severity"] = "low"
        address_findings.append(finding)

    # Step 5: Compute combined risk score
    doc_score = doc_result.get("risk_score", 50)
    combined_score = compute_combined_score(
        doc_score, phone_results, url_results, address_findings
    )

    # Step 6: Make payment decision
    if combined_score <= AUTO_APPROVE_MAX:
        decision = "auto_approve"
    elif combined_score <= MANUAL_REVIEW_MAX:
        decision = "manual_review"
    else:
        decision = "auto_reject"

    return InvoiceScreeningResult(
        decision=decision,
        combined_risk_score=combined_score,
        document_risk_score=doc_score,
        document_verdict=doc_result.get("verdict", "unknown"),
        document_type=doc_result.get("document_type", "unknown"),
        claimed_issuer=doc_result.get("claimed_issuer"),
        recommended_action=doc_result.get("recommended_action", ""),
        red_flags=doc_result.get("red_flags", []),
        entity_findings=doc_result.get("evidence_summary", []),
        phone_cross_references=phone_results,
        url_cross_references=url_results,
        address_findings=address_findings,
    )


def compute_combined_score(
    doc_score: int,
    phone_results: list,
    url_results: list,
    address_findings: list,
) -> int:
    """Combine signals from all channels into a single risk score.

    Weights:
    - Document analysis: 50% (base)
    - Phone cross-references: 20%
    - URL cross-references: 20%
    - Address findings: 10%
    """
    # Phone score: average of successfully checked numbers
    phone_scores = [p["risk_score"] for p in phone_results if p.get("risk_score") is not None]
    avg_phone_score = sum(phone_scores) / len(phone_scores) if phone_scores else 0

    # URL score: average of successfully checked URLs
    url_scores = [u["risk_score"] for u in url_results if u.get("risk_score") is not None]
    avg_url_score = sum(url_scores) / len(url_scores) if url_scores else 0

    # Address score: CMRA = 80, invalid = 50, unverified = 30, verified = 0
    address_severity_scores = {
        "high": 80,
        "medium": 50,
        "low": 30,
        "positive": 0,
    }
    address_scores = [
        address_severity_scores.get(a.get("severity", "low"), 30)
        for a in address_findings
    ]
    avg_address_score = sum(address_scores) / len(address_scores) if address_scores else 0

    # Weighted combination
    combined = (
        doc_score * 0.50
        + avg_phone_score * 0.20
        + avg_url_score * 0.20
        + avg_address_score * 0.10
    )

    # Boost for critical signals regardless of weight
    for phone in phone_results:
        if phone.get("ftc_complaints", 0) > 10:
            combined += 15  # Known scam number
        if phone.get("robocall_flagged"):
            combined += 10

    for url in url_results:
        if url.get("urlhaus_listed"):
            combined += 20  # Known malware URL
        bi = url.get("brand_impersonation")
        if bi and bi.get("detected"):
            combined += 15  # Impersonating a brand

    for addr in address_findings:
        if addr.get("severity") == "high":
            combined += 10  # CMRA address

    return min(100, max(0, int(combined)))

Create the Flask endpoint

Wire the screening pipeline into a web endpoint.

# app.py
import os
from flask import Flask, request, jsonify
from dotenv import load_dotenv
from scamverify import ScamVerifyClient, ScamVerifyError
from screener import screen_invoice

load_dotenv()

app = Flask(__name__)
client = ScamVerifyClient(os.environ["SCAMVERIFY_API_KEY"])

UPLOAD_DIR = "/tmp/invoice-screener"
os.makedirs(UPLOAD_DIR, exist_ok=True)

# Decision display metadata
DECISION_LABELS = {
    "auto_approve": {
        "label": "Approved",
        "color": "green",
        "action": "Invoice passed automated screening. Safe to process payment.",
    },
    "manual_review": {
        "label": "Review Required",
        "color": "yellow",
        "action": "Invoice flagged for manual review before payment processing.",
    },
    "auto_reject": {
        "label": "Rejected",
        "color": "red",
        "action": "Invoice failed screening. Do not process payment without independent verification.",
    },
}


@app.route("/screen", methods=["POST"])
def screen():
    if "file" not in request.files:
        return jsonify({
            "error": "No file uploaded. Send an invoice image in the 'file' field.",
        }), 400

    uploaded_file = request.files["file"]
    if uploaded_file.filename == "":
        return jsonify({"error": "Empty filename."}), 400

    temp_path = os.path.join(UPLOAD_DIR, uploaded_file.filename)
    uploaded_file.save(temp_path)

    try:
        result = screen_invoice(client, temp_path)
        decision_info = DECISION_LABELS[result.decision]

        return jsonify({
            "success": True,
            "decision": result.decision,
            "decision_label": decision_info["label"],
            "decision_action": decision_info["action"],
            "combined_risk_score": result.combined_risk_score,
            "document": {
                "risk_score": result.document_risk_score,
                "verdict": result.document_verdict,
                "type": result.document_type,
                "claimed_issuer": result.claimed_issuer,
                "red_flags": result.red_flags,
                "recommended_action": result.recommended_action,
            },
            "cross_references": {
                "phones": result.phone_cross_references,
                "urls": result.url_cross_references,
            },
            "address_findings": result.address_findings,
            "evidence": result.entity_findings,
        })

    except ScamVerifyError as e:
        return jsonify({"error": str(e)}), e.status_code or 500
    except ValueError as e:
        return jsonify({"error": str(e)}), 400
    except Exception as e:
        return jsonify({"error": "Screening failed. Please try again."}), 500
    finally:
        if os.path.exists(temp_path):
            os.remove(temp_path)


@app.route("/health")
def health():
    return jsonify({"status": "ok"})


if __name__ == "__main__":
    app.run(debug=True, port=3000)

Test the screening pipeline

Start the server and submit an invoice.

python app.py

In another terminal:

# Screen an invoice image
curl -X POST http://localhost:3000/screen \
  -F "file=@suspicious-invoice.jpg"

# Screen a PDF invoice
curl -X POST http://localhost:3000/screen \
  -F "file=@vendor-invoice.pdf"

Add the Node.js alternative

Here is the multi-channel screening flow in Node.js.

// screener.js
const fs = require('fs');
const path = require('path');

const BASE_URL = 'https://scamverify.ai/api/v1';

async function screenInvoice(apiKey, filePath) {
  const headers = { 'Authorization': `Bearer ${apiKey}` };

  // Step 1: Upload invoice to document analysis (multipart/form-data)
  const fileBuffer = fs.readFileSync(filePath);
  const fileName = path.basename(filePath);
  const formData = new FormData();
  const blob = new Blob([fileBuffer]);
  formData.append('file', blob, fileName);

  const docResponse = await fetch(`${BASE_URL}/document/analyze`, {
    method: 'POST',
    headers,
    body: formData,
  });
  if (!docResponse.ok) throw new Error(`Document analysis failed: ${docResponse.status}`);
  const docResult = await docResponse.json();

  const extracted = docResult.extracted_entities || {};

  // Step 2: Cross-reference phone numbers
  const phoneChecks = await Promise.allSettled(
    (extracted.phone_numbers || []).slice(0, 3).map(async (phone) => {
      const res = await fetch(`${BASE_URL}/phone/lookup`, {
        method: 'POST',
        headers: { ...headers, 'Content-Type': 'application/json' },
        body: JSON.stringify({ phone_number: phone }),
      });
      if (!res.ok) throw new Error(`Phone lookup failed: ${res.status}`);
      const data = await res.json();
      return { number: phone, ...data };
    })
  );

  // Step 3: Cross-reference URLs
  const urlChecks = await Promise.allSettled(
    (extracted.urls || []).slice(0, 3).map(async (url) => {
      const res = await fetch(`${BASE_URL}/url/lookup`, {
        method: 'POST',
        headers: { ...headers, 'Content-Type': 'application/json' },
        body: JSON.stringify({ url }),
      });
      if (!res.ok) throw new Error(`URL lookup failed: ${res.status}`);
      const data = await res.json();
      return { url, ...data };
    })
  );

  // Step 4: Combine results
  const phoneResults = phoneChecks
    .filter(r => r.status === 'fulfilled')
    .map(r => r.value);

  const urlResults = urlChecks
    .filter(r => r.status === 'fulfilled')
    .map(r => r.value);

  // Step 5: Compute decision
  const combinedScore = computeScore(docResult, phoneResults, urlResults);

  let decision;
  if (combinedScore <= 25) decision = 'auto_approve';
  else if (combinedScore <= 60) decision = 'manual_review';
  else decision = 'auto_reject';

  return {
    decision,
    combined_risk_score: combinedScore,
    document: docResult,
    phone_cross_references: phoneResults,
    url_cross_references: urlResults,
  };
}

function computeScore(docResult, phoneResults, urlResults) {
  const docScore = docResult.risk_score || 50;
  const avgPhone = phoneResults.length > 0
    ? phoneResults.reduce((sum, p) => sum + (p.risk_score || 0), 0) / phoneResults.length
    : 0;
  const avgUrl = urlResults.length > 0
    ? urlResults.reduce((sum, u) => sum + (u.risk_score || 0), 0) / urlResults.length
    : 0;

  let score = docScore * 0.5 + avgPhone * 0.25 + avgUrl * 0.25;

  // Critical signal boosts
  for (const p of phoneResults) {
    if ((p.signals?.ftc_complaints || 0) > 10) score += 15;
    if (p.signals?.robocall_flagged) score += 10;
  }
  for (const u of urlResults) {
    if (u.signals?.urlhaus_listed) score += 20;
    if (u.signals?.brand_impersonation?.detected) score += 15;
  }

  return Math.min(100, Math.max(0, Math.round(score)));
}

module.exports = { screenInvoice };

Understand the risk decision matrix

The screening pipeline maps combined risk scores to payment actions:

Combined ScoreDecisionActionTypical Scenario
0 to 25Auto-approveProcess payment normally.Verified business address, clean phone number, legitimate domain.
26 to 60Manual reviewRoute to AP team for human review before payment.New vendor, unverifiable address, or phone number with a few complaints.
61 to 100Auto-rejectBlock payment. Require independent vendor verification.CMRA address, phone flagged for robocalls, URL on threat feeds.

The combined score is computed from four channels:

ChannelWeightWhat It Catches
Document analysis50%Fake logos, fabricated case numbers, payment pressure language, CMRA addresses.
Phone cross-reference20%FTC/FCC complaints on contact numbers, robocall flags, VoIP carriers.
URL cross-reference20%Payment URLs on threat feeds, brand impersonation, newly registered domains.
Address verification10%Mailbox service addresses, vacant addresses, unverifiable institutions.

Critical signals (FTC complaints over 10, URLhaus listings, brand impersonation) add fixed bonus points regardless of weight, ensuring that a single high-severity finding can push an invoice from "review" to "reject."

Complete Project Structure

invoice-screener/
  .env                  # API key
  scamverify.py         # Multi-channel API client (document, phone, URL)
  screener.py           # Screening pipeline with risk scoring and decision logic
  app.py                # Flask server with /screen endpoint
  requirements.txt      # requests, flask, python-dotenv

Quota usage for invoice screening. Each invoice screen consumes 1 document lookup plus up to 3 phone lookups and 3 URL lookups (depending on how many entities the invoice contains). Plan your quota accordingly. Document and phone/URL quotas are tracked separately. Check your current usage at the usage endpoint.

Next Steps

On this page