Detect Invoice and Payment Fraud
Step-by-step tutorial for building an accounts payable screening tool that verifies invoices using multi-channel ScamVerify™ API lookups before processing payment.
This tutorial walks you through building an accounts payable screening tool that verifies invoices before your organization processes payment. By the end, you will have a working pipeline that uploads an invoice image, extracts entities (addresses, phone numbers, URLs), cross-references them across multiple ScamVerify™ channels, and returns a payment decision.
Invoice fraud losses are accelerating. In Texas alone, average fake invoice losses jumped from $33,000 to $567,000 per incident, a 16x increase year over year. Scoular Company, a Nebraska grain trader, lost $17.2 million to a single invoice fraud scheme. Most of these losses are preventable with automated verification at the point of payment approval.
Prerequisites
- Python 3.9 or later (primary) or Node.js 18+ (secondary)
- A ScamVerify™ API key (get one at scamverify.ai/settings/api)
- A test invoice image (JPG, PNG, WebP, HEIC, or single-page PDF, max 4.5 MB)
What You Will Build
An invoice screening pipeline that:
- Uploads an invoice image to the ScamVerify™ document analysis API
- Extracts phone numbers, URLs, and addresses from the invoice
- Cross-references extracted phone numbers via the phone lookup API
- Cross-references extracted URLs via the URL lookup API
- Combines all signals into a risk decision (auto-approve, manual review, or auto-reject)
- Handles rate limiting, quota management, and error recovery
Set up the project
mkdir invoice-screener && cd invoice-screener
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install requests python-dotenv flaskCreate a .env file:
SCAMVERIFY_API_KEY=sv_live_your_api_key_hereCreate the multi-channel ScamVerify™ client
Build a client that can call the document, phone, and URL endpoints. The document endpoint uses multipart/form-data while phone and URL use JSON.
# scamverify.py
import os
import time
import requests
from dataclasses import dataclass, field
from typing import Optional
class ScamVerifyError(Exception):
def __init__(self, message: str, status_code: int = 0, retryable: bool = False):
super().__init__(message)
self.status_code = status_code
self.retryable = retryable
class ScamVerifyClient:
BASE_URL = "https://scamverify.ai/api/v1"
MAX_RETRIES = 2
def __init__(self, api_key: str):
if not api_key or not api_key.startswith("sv_"):
raise ValueError("Invalid API key. Keys must start with sv_live_ or sv_test_")
self.api_key = api_key
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
})
def _handle_error(self, response: requests.Response) -> None:
if response.status_code == 401:
raise ScamVerifyError("Invalid or revoked API key", 401)
if response.status_code == 402:
raise ScamVerifyError("Quota exhausted", 402)
if response.status_code == 429:
retry_after = response.headers.get("Retry-After", "60")
raise ScamVerifyError(
f"Rate limited. Retry after {retry_after}s.", 429, retryable=True
)
if response.status_code >= 500:
raise ScamVerifyError(
f"Server error: {response.status_code}", response.status_code, retryable=True
)
if not response.ok:
error_data = {}
try:
error_data = response.json().get("error", {})
except Exception:
pass
raise ScamVerifyError(
error_data.get("message", f"API error: {response.status_code}"),
response.status_code,
)
def _request_with_retry(self, method: str, url: str, **kwargs) -> requests.Response:
"""Make a request with exponential backoff for retryable errors."""
last_error = None
for attempt in range(self.MAX_RETRIES + 1):
try:
response = self.session.request(method, url, **kwargs)
if response.ok:
return response
self._handle_error(response)
except ScamVerifyError as e:
last_error = e
if not e.retryable or attempt == self.MAX_RETRIES:
raise
delay = 2 ** attempt
time.sleep(delay)
raise last_error
def analyze_document(self, file_path: str) -> dict:
"""Upload a document image for analysis (multipart/form-data)."""
mime_types = {
".jpg": "image/jpeg", ".jpeg": "image/jpeg",
".png": "image/png", ".webp": "image/webp",
".heic": "image/heic", ".heif": "image/heif",
".pdf": "application/pdf",
}
ext = os.path.splitext(file_path)[1].lower()
mime_type = mime_types.get(ext, "application/octet-stream")
with open(file_path, "rb") as f:
files = {"file": (os.path.basename(file_path), f, mime_type)}
response = self._request_with_retry(
"POST", f"{self.BASE_URL}/document/analyze", files=files
)
return response.json()
def lookup_phone(self, phone_number: str) -> dict:
"""Look up a phone number (JSON)."""
response = self._request_with_retry(
"POST",
f"{self.BASE_URL}/phone/lookup",
json={"phone_number": phone_number},
headers={"Content-Type": "application/json"},
)
return response.json()
def lookup_url(self, url: str) -> dict:
"""Look up a URL (JSON)."""
response = self._request_with_retry(
"POST",
f"{self.BASE_URL}/url/lookup",
json={"url": url},
headers={"Content-Type": "application/json"},
)
return response.json()Build the invoice screening pipeline
Create the core screening logic that chains document analysis with phone and URL cross-references.
# screener.py
from dataclasses import dataclass, field
from typing import Optional
# Risk thresholds for payment decisions
AUTO_APPROVE_MAX = 25 # Score 0 to 25: auto-approve
MANUAL_REVIEW_MAX = 60 # Score 26 to 60: manual review
# Score 61+: auto-reject
@dataclass
class InvoiceScreeningResult:
decision: str # "auto_approve", "manual_review", "auto_reject"
combined_risk_score: int
document_risk_score: int
document_verdict: str
document_type: str
claimed_issuer: Optional[str]
recommended_action: str
red_flags: list
entity_findings: list = field(default_factory=list)
phone_cross_references: list = field(default_factory=list)
url_cross_references: list = field(default_factory=list)
address_findings: list = field(default_factory=list)
def screen_invoice(client, file_path: str) -> InvoiceScreeningResult:
"""Full invoice screening pipeline with multi-channel cross-references.
Flow:
1. Upload invoice to document analysis
2. Extract phone numbers and URLs from the response
3. Cross-reference each phone number via phone lookup
4. Cross-reference each URL via URL lookup
5. Combine all signals into a single risk decision
"""
# Step 1: Document analysis
doc_result = client.analyze_document(file_path)
extracted = doc_result.get("extracted_entities", {})
verifications = doc_result.get("entity_verifications", {})
# Step 2: Cross-reference extracted phone numbers
phone_results = []
for phone in extracted.get("phone_numbers", [])[:3]: # Cap at 3 to manage quota
try:
phone_data = client.lookup_phone(phone)
phone_results.append({
"number": phone,
"risk_score": phone_data.get("risk_score", 0),
"verdict": phone_data.get("verdict", "unknown"),
"carrier": phone_data.get("signals", {}).get("carrier"),
"line_type": phone_data.get("signals", {}).get("line_type"),
"ftc_complaints": phone_data.get("signals", {}).get("ftc_complaints", 0),
"robocall_flagged": phone_data.get("signals", {}).get("robocall_flagged", False),
})
except Exception as e:
phone_results.append({
"number": phone,
"error": str(e),
"risk_score": None,
})
# Step 3: Cross-reference extracted URLs
url_results = []
for url in extracted.get("urls", [])[:3]: # Cap at 3 to manage quota
try:
url_data = client.lookup_url(url)
url_results.append({
"url": url,
"risk_score": url_data.get("risk_score", 0),
"verdict": url_data.get("verdict", "unknown"),
"domain_age_days": url_data.get("signals", {}).get("domain_age_days"),
"brand_impersonation": url_data.get("signals", {}).get("brand_impersonation"),
"urlhaus_listed": url_data.get("signals", {}).get("urlhaus_listed", False),
})
except Exception as e:
url_results.append({
"url": url,
"error": str(e),
"risk_score": None,
})
# Step 4: Analyze address verifications
address_findings = []
for addr in verifications.get("addresses", []):
finding = {"address": addr.get("address", "")}
if addr.get("is_cmra"):
finding["warning"] = "CMRA address (mailbox service). Legitimate businesses rarely use PO box services as their primary address."
finding["severity"] = "high"
elif not addr.get("address_valid"):
finding["warning"] = "Address could not be validated."
finding["severity"] = "medium"
elif addr.get("institution_found") and addr.get("institution_matches"):
finding["status"] = "verified"
finding["severity"] = "positive"
else:
finding["status"] = "unverified"
finding["severity"] = "low"
address_findings.append(finding)
# Step 5: Compute combined risk score
doc_score = doc_result.get("risk_score", 50)
combined_score = compute_combined_score(
doc_score, phone_results, url_results, address_findings
)
# Step 6: Make payment decision
if combined_score <= AUTO_APPROVE_MAX:
decision = "auto_approve"
elif combined_score <= MANUAL_REVIEW_MAX:
decision = "manual_review"
else:
decision = "auto_reject"
return InvoiceScreeningResult(
decision=decision,
combined_risk_score=combined_score,
document_risk_score=doc_score,
document_verdict=doc_result.get("verdict", "unknown"),
document_type=doc_result.get("document_type", "unknown"),
claimed_issuer=doc_result.get("claimed_issuer"),
recommended_action=doc_result.get("recommended_action", ""),
red_flags=doc_result.get("red_flags", []),
entity_findings=doc_result.get("evidence_summary", []),
phone_cross_references=phone_results,
url_cross_references=url_results,
address_findings=address_findings,
)
def compute_combined_score(
doc_score: int,
phone_results: list,
url_results: list,
address_findings: list,
) -> int:
"""Combine signals from all channels into a single risk score.
Weights:
- Document analysis: 50% (base)
- Phone cross-references: 20%
- URL cross-references: 20%
- Address findings: 10%
"""
# Phone score: average of successfully checked numbers
phone_scores = [p["risk_score"] for p in phone_results if p.get("risk_score") is not None]
avg_phone_score = sum(phone_scores) / len(phone_scores) if phone_scores else 0
# URL score: average of successfully checked URLs
url_scores = [u["risk_score"] for u in url_results if u.get("risk_score") is not None]
avg_url_score = sum(url_scores) / len(url_scores) if url_scores else 0
# Address score: CMRA = 80, invalid = 50, unverified = 30, verified = 0
address_severity_scores = {
"high": 80,
"medium": 50,
"low": 30,
"positive": 0,
}
address_scores = [
address_severity_scores.get(a.get("severity", "low"), 30)
for a in address_findings
]
avg_address_score = sum(address_scores) / len(address_scores) if address_scores else 0
# Weighted combination
combined = (
doc_score * 0.50
+ avg_phone_score * 0.20
+ avg_url_score * 0.20
+ avg_address_score * 0.10
)
# Boost for critical signals regardless of weight
for phone in phone_results:
if phone.get("ftc_complaints", 0) > 10:
combined += 15 # Known scam number
if phone.get("robocall_flagged"):
combined += 10
for url in url_results:
if url.get("urlhaus_listed"):
combined += 20 # Known malware URL
bi = url.get("brand_impersonation")
if bi and bi.get("detected"):
combined += 15 # Impersonating a brand
for addr in address_findings:
if addr.get("severity") == "high":
combined += 10 # CMRA address
return min(100, max(0, int(combined)))Create the Flask endpoint
Wire the screening pipeline into a web endpoint.
# app.py
import os
from flask import Flask, request, jsonify
from dotenv import load_dotenv
from scamverify import ScamVerifyClient, ScamVerifyError
from screener import screen_invoice
load_dotenv()
app = Flask(__name__)
client = ScamVerifyClient(os.environ["SCAMVERIFY_API_KEY"])
UPLOAD_DIR = "/tmp/invoice-screener"
os.makedirs(UPLOAD_DIR, exist_ok=True)
# Decision display metadata
DECISION_LABELS = {
"auto_approve": {
"label": "Approved",
"color": "green",
"action": "Invoice passed automated screening. Safe to process payment.",
},
"manual_review": {
"label": "Review Required",
"color": "yellow",
"action": "Invoice flagged for manual review before payment processing.",
},
"auto_reject": {
"label": "Rejected",
"color": "red",
"action": "Invoice failed screening. Do not process payment without independent verification.",
},
}
@app.route("/screen", methods=["POST"])
def screen():
if "file" not in request.files:
return jsonify({
"error": "No file uploaded. Send an invoice image in the 'file' field.",
}), 400
uploaded_file = request.files["file"]
if uploaded_file.filename == "":
return jsonify({"error": "Empty filename."}), 400
temp_path = os.path.join(UPLOAD_DIR, uploaded_file.filename)
uploaded_file.save(temp_path)
try:
result = screen_invoice(client, temp_path)
decision_info = DECISION_LABELS[result.decision]
return jsonify({
"success": True,
"decision": result.decision,
"decision_label": decision_info["label"],
"decision_action": decision_info["action"],
"combined_risk_score": result.combined_risk_score,
"document": {
"risk_score": result.document_risk_score,
"verdict": result.document_verdict,
"type": result.document_type,
"claimed_issuer": result.claimed_issuer,
"red_flags": result.red_flags,
"recommended_action": result.recommended_action,
},
"cross_references": {
"phones": result.phone_cross_references,
"urls": result.url_cross_references,
},
"address_findings": result.address_findings,
"evidence": result.entity_findings,
})
except ScamVerifyError as e:
return jsonify({"error": str(e)}), e.status_code or 500
except ValueError as e:
return jsonify({"error": str(e)}), 400
except Exception as e:
return jsonify({"error": "Screening failed. Please try again."}), 500
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
@app.route("/health")
def health():
return jsonify({"status": "ok"})
if __name__ == "__main__":
app.run(debug=True, port=3000)Test the screening pipeline
Start the server and submit an invoice.
python app.pyIn another terminal:
# Screen an invoice image
curl -X POST http://localhost:3000/screen \
-F "file=@suspicious-invoice.jpg"
# Screen a PDF invoice
curl -X POST http://localhost:3000/screen \
-F "file=@vendor-invoice.pdf"Add the Node.js alternative
Here is the multi-channel screening flow in Node.js.
// screener.js
const fs = require('fs');
const path = require('path');
const BASE_URL = 'https://scamverify.ai/api/v1';
async function screenInvoice(apiKey, filePath) {
const headers = { 'Authorization': `Bearer ${apiKey}` };
// Step 1: Upload invoice to document analysis (multipart/form-data)
const fileBuffer = fs.readFileSync(filePath);
const fileName = path.basename(filePath);
const formData = new FormData();
const blob = new Blob([fileBuffer]);
formData.append('file', blob, fileName);
const docResponse = await fetch(`${BASE_URL}/document/analyze`, {
method: 'POST',
headers,
body: formData,
});
if (!docResponse.ok) throw new Error(`Document analysis failed: ${docResponse.status}`);
const docResult = await docResponse.json();
const extracted = docResult.extracted_entities || {};
// Step 2: Cross-reference phone numbers
const phoneChecks = await Promise.allSettled(
(extracted.phone_numbers || []).slice(0, 3).map(async (phone) => {
const res = await fetch(`${BASE_URL}/phone/lookup`, {
method: 'POST',
headers: { ...headers, 'Content-Type': 'application/json' },
body: JSON.stringify({ phone_number: phone }),
});
if (!res.ok) throw new Error(`Phone lookup failed: ${res.status}`);
const data = await res.json();
return { number: phone, ...data };
})
);
// Step 3: Cross-reference URLs
const urlChecks = await Promise.allSettled(
(extracted.urls || []).slice(0, 3).map(async (url) => {
const res = await fetch(`${BASE_URL}/url/lookup`, {
method: 'POST',
headers: { ...headers, 'Content-Type': 'application/json' },
body: JSON.stringify({ url }),
});
if (!res.ok) throw new Error(`URL lookup failed: ${res.status}`);
const data = await res.json();
return { url, ...data };
})
);
// Step 4: Combine results
const phoneResults = phoneChecks
.filter(r => r.status === 'fulfilled')
.map(r => r.value);
const urlResults = urlChecks
.filter(r => r.status === 'fulfilled')
.map(r => r.value);
// Step 5: Compute decision
const combinedScore = computeScore(docResult, phoneResults, urlResults);
let decision;
if (combinedScore <= 25) decision = 'auto_approve';
else if (combinedScore <= 60) decision = 'manual_review';
else decision = 'auto_reject';
return {
decision,
combined_risk_score: combinedScore,
document: docResult,
phone_cross_references: phoneResults,
url_cross_references: urlResults,
};
}
function computeScore(docResult, phoneResults, urlResults) {
const docScore = docResult.risk_score || 50;
const avgPhone = phoneResults.length > 0
? phoneResults.reduce((sum, p) => sum + (p.risk_score || 0), 0) / phoneResults.length
: 0;
const avgUrl = urlResults.length > 0
? urlResults.reduce((sum, u) => sum + (u.risk_score || 0), 0) / urlResults.length
: 0;
let score = docScore * 0.5 + avgPhone * 0.25 + avgUrl * 0.25;
// Critical signal boosts
for (const p of phoneResults) {
if ((p.signals?.ftc_complaints || 0) > 10) score += 15;
if (p.signals?.robocall_flagged) score += 10;
}
for (const u of urlResults) {
if (u.signals?.urlhaus_listed) score += 20;
if (u.signals?.brand_impersonation?.detected) score += 15;
}
return Math.min(100, Math.max(0, Math.round(score)));
}
module.exports = { screenInvoice };Understand the risk decision matrix
The screening pipeline maps combined risk scores to payment actions:
| Combined Score | Decision | Action | Typical Scenario |
|---|---|---|---|
| 0 to 25 | Auto-approve | Process payment normally. | Verified business address, clean phone number, legitimate domain. |
| 26 to 60 | Manual review | Route to AP team for human review before payment. | New vendor, unverifiable address, or phone number with a few complaints. |
| 61 to 100 | Auto-reject | Block payment. Require independent vendor verification. | CMRA address, phone flagged for robocalls, URL on threat feeds. |
The combined score is computed from four channels:
| Channel | Weight | What It Catches |
|---|---|---|
| Document analysis | 50% | Fake logos, fabricated case numbers, payment pressure language, CMRA addresses. |
| Phone cross-reference | 20% | FTC/FCC complaints on contact numbers, robocall flags, VoIP carriers. |
| URL cross-reference | 20% | Payment URLs on threat feeds, brand impersonation, newly registered domains. |
| Address verification | 10% | Mailbox service addresses, vacant addresses, unverifiable institutions. |
Critical signals (FTC complaints over 10, URLhaus listings, brand impersonation) add fixed bonus points regardless of weight, ensuring that a single high-severity finding can push an invoice from "review" to "reject."
Complete Project Structure
invoice-screener/
.env # API key
scamverify.py # Multi-channel API client (document, phone, URL)
screener.py # Screening pipeline with risk scoring and decision logic
app.py # Flask server with /screen endpoint
requirements.txt # requests, flask, python-dotenvQuota usage for invoice screening. Each invoice screen consumes 1 document lookup plus up to 3 phone lookups and 3 URL lookups (depending on how many entities the invoice contains). Plan your quota accordingly. Document and phone/URL quotas are tracked separately. Check your current usage at the usage endpoint.
Next Steps
- Document Analysis API Reference for full request and response schemas
- Phone Lookup API Reference for phone cross-reference details
- Verify Suspicious Mail and Documents for interpreting entity verifications in depth
Build a QR Code Safety Scanner
Step-by-step tutorial for building a QR code safety scanner that decodes QR images and checks embedded URLs for scam indicators using the ScamVerify™ API.
Build a Slack Scam Detection Bot
Create a Slack bot that automatically scans phone numbers and URLs posted in channels using the ScamVerify™ API.