Building a real-time KYC API: from selfie to decision in 412ms
Architecture and code for a real-time KYC API - async orchestration, queue-and-fanout, GPU-bound face match, decision engine and the latency budget that makes it work.
A modern KYC verification is a coordinated dance of seven or eight independent operations: document capture, OCR, MRZ parsing, face detection, liveness, face match, AML screening, and the decision engine that combines them. Done sequentially, it takes 3–4 seconds. Done in parallel, with the right architecture, it takes around 400 milliseconds - which is the difference between an onboarding flow that converts and one that doesn't.
This article walks through the architecture we ship at Ogowkey: how the API is laid out, where the parallelism comes from, where the bottlenecks live, and how the decision engine fuses noisy signals into a single defensible outcome. Real code in Python, real numbers from production traffic. If you're building something similar - or evaluating a vendor for the same job - this is the blueprint.
The latency budget
Decide the budget first. Everything follows from it.
For a retail onboarding flow, the ceiling is roughly 800 ms p95 end-to-end before users notice and abandon. Plan for half of that to be network round-trip plus image upload from a phone on 4G; that leaves ~400 ms for the actual server-side verification.
Our target distribution:
| Phase | Budget (ms) |
|---|---|
| Ingress + auth | 20 |
| Image decode + pre-process | 60 |
| OCR (VLM, async) | 400 parallel |
| Face detect + landmarks | 80 parallel |
| Liveness | 60 parallel |
| Face match | 90 parallel |
| AML screen | 120 parallel |
| Decision engine | 30 |
| Audit log + response | 40 |
The numbers in parallel run concurrently. End-to-end wall clock is therefore dominated by the slowest parallel operation - OCR, at ~400 ms - plus the serial overhead of ~150 ms = ~550 ms target, ~400 ms observed median.
The high-level architecture
The orchestrator owns the per-request state. It fans out six parallel tasks, gathers their results, hands them to the decision engine, and returns. Audit and webhook fan-out happen after the response is sent - they're side effects.
The framework choice
We run on FastAPI (Python 3.13) for the orchestrator. Reasons in descending order of importance:
- Async-first, which is mandatory when six of the seven tasks are I/O-bound to external services or workers.
- Mature ecosystem (
httpx,asyncpg,redis-asyncio). - Typed schemas via
pydantic, which we re-export as the public API contract. - Easy to dockerise, easy to run behind any reverse proxy.
Other choices that work: Litestar (faster startup but less mature), aiohttp (more bare-metal), Node.js with Fastify (fine, slightly worse ML ecosystem). Go and Rust are appealing for the worker layer but Python is hard to beat for the orchestrator when half the work is calling Python ML libs.
The endpoint
The orchestrator endpoint is a thin coroutine over a asyncio.gather:
from fastapi import FastAPI, UploadFile, File
from pydantic import BaseModel
import asyncio
from app.workers import ocr, mrz, face, liveness, aml
from app.services import decision, audit
app = FastAPI()
class VerifyResponse(BaseModel):
case_id: str
outcome: str
score: float
fields: dict
similarity: float
aml_status: str
duration_ms: int
@app.post("/v1/identity/verify", response_model=VerifyResponse)
async def verify(
id_image: UploadFile = File(...),
selfie_image: UploadFile = File(...),
document_type: str = "national_id",
country: str = "SO",
):
case_id = await audit.start_case(...)
id_bytes = await id_image.read()
selfie_bytes = await selfie_image.read()
# Fan-out: six concurrent tasks.
ocr_res, mrz_res, face_res, live_res, match_res, aml_res = await asyncio.gather(
ocr.extract_fields(id_bytes, country=country),
mrz.parse_back(id_bytes),
face.detect(selfie_bytes),
liveness.check(selfie_bytes),
face.match(id_bytes, selfie_bytes),
aml.screen_placeholder(), # initial; refined below with extracted name
return_exceptions=True,
)
# Refine AML with the OCR'd name (small extra latency, big quality win).
if not isinstance(ocr_res, Exception):
aml_res = await aml.screen(
full_name=ocr_res.full_name,
dob=ocr_res.date_of_birth,
country=country,
)
outcome = decision.evaluate(
ocr=ocr_res, mrz=mrz_res, face=face_res,
liveness=live_res, match=match_res, aml=aml_res,
)
await audit.complete_case(case_id, outcome)
return outcome.to_response(case_id)
A few choices worth flagging:
return_exceptions=True- one failing worker shouldn't fail the whole verification. The decision engine handles a partial result (mark the missing signal asunknownand downgrade the outcome accordingly).- AML runs twice, sort of. We kick off a placeholder
aml.screen_placeholder()in parallel just to keep the dependency graph clean, then refine it with the OCR'd name. The refinement runs only after OCR completes; the placeholder is a no-op cost. - Audit starts before the fan-out. If anything in the pipeline catches fire we still have a row in the audit log showing the case existed.
The OCR worker
The OCR worker calls a vision-language model with a structured-output prompt. We covered the model choice in Vision-language models for Somali ID OCR. The actual code:
import anthropic, base64, json
_client = anthropic.AsyncAnthropic()
_PROMPT = """Extract these fields as JSON, no commentary:
{ "surname": str, "given_names": str, "national_id": str,
"date_of_birth": "YYYY-MM-DD", "sex": "M"|"F",
"date_of_issue": "YYYY-MM-DD", "date_of_expiry": "YYYY-MM-DD",
"address": str|null }
Use null for fields you cannot read with high confidence."""
async def extract_fields(image_bytes: bytes, country: str) -> "OCRResult":
b64 = base64.b64encode(image_bytes).decode()
msg = await _client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{
"role": "user",
"content": [
{"type": "image",
"source": {"type": "base64", "media_type": "image/jpeg", "data": b64}},
{"type": "text", "text": _PROMPT},
],
}],
)
fields = json.loads(msg.content[0].text)
return OCRResult(**fields)
For Somali documents, this hits the 78% raw-VLM baseline we benchmarked. The 15-point lift to 93% comes from the validators and MRZ cross-check, not the model.
The face-match worker
Face match is the GPU-bound step. Two production options:
- Use a hosted API - AWS Rekognition
CompareFaces, or any vendor. - Run your own model on GPU - typically ArcFace or a successor, served via Triton or vLLM-Vision.
We started with Rekognition for fast time-to-launch, then moved to a self-hosted model fine-tuned on East African data for accuracy. The interface looks the same regardless:
import httpx
class MatchResult(BaseModel):
similarity: float
source_bbox: list[int] | None
target_bbox: list[int] | None
landmarks: list[dict] | None
async def match(id_image: bytes, selfie_image: bytes) -> MatchResult:
# AWS Rekognition path
res = await rekognition_async.compare_faces(
SourceImage={"Bytes": id_image},
TargetImage={"Bytes": selfie_image},
SimilarityThreshold=70.0,
QualityFilter="MEDIUM",
)
if res["FaceMatches"]:
m = res["FaceMatches"][0]
return MatchResult(
similarity=m["Similarity"] / 100,
source_bbox=bbox(res.get("SourceImageFace", {})),
target_bbox=bbox(m["Face"]),
landmarks=m["Face"].get("Landmarks", []),
)
return MatchResult(similarity=0.0, source_bbox=None, target_bbox=None, landmarks=None)
For demographic-aware operating points and the threshold trade-offs, see Face match accuracy across skin tones.
The decision engine
Six noisy signals come in. One decision goes out. The decision engine is the most important piece of code in the system because it's the load-bearing piece that converts probabilities into a yes/no.
We use a weighted-rule engine with explicit thresholds rather than an opaque ML classifier. Three reasons:
- Auditability. When a regulator asks "why was this customer rejected?", you need to be able to point at specific signals and thresholds, not "the model said so."
- Tunability. Specific signals are easier to tune (raise the AML threshold, lower the face-match threshold) than retraining a classifier.
- Composability. Tenants can override specific thresholds for their risk appetite.
The core loop:
from dataclasses import dataclass
from enum import StrEnum
class Outcome(StrEnum):
approved = "approved"
review = "manual_review"
rejected = "rejected"
@dataclass
class Signals:
ocr_confidence: float # 0..1
mrz_valid: bool # check digits parsed and passed
fields_consistent: bool # VIZ vs MRZ agreement
face_detected: bool
liveness_score: float # 0..1
similarity: float # 0..1, face match
aml_status: str # "clear" | "review" | "hit"
def decide(s: Signals) -> tuple[Outcome, float, list[str]]:
reasons: list[str] = []
risk = 0.0
if not s.face_detected:
return Outcome.rejected, 1.0, ["no_face_detected"]
if s.aml_status == "hit":
return Outcome.rejected, 1.0, ["aml_hit"]
if s.liveness_score < 0.5:
risk += 0.45; reasons.append("liveness_low")
elif s.liveness_score < 0.7:
risk += 0.15; reasons.append("liveness_borderline")
if s.similarity < 0.6:
risk += 0.55; reasons.append("face_mismatch")
elif s.similarity < 0.8:
risk += 0.20; reasons.append("face_borderline")
if not s.mrz_valid:
risk += 0.15; reasons.append("mrz_invalid")
if not s.fields_consistent:
risk += 0.20; reasons.append("viz_mrz_mismatch")
if s.ocr_confidence < 0.85:
risk += 0.10; reasons.append("ocr_low_confidence")
if s.aml_status == "review":
risk += 0.25; reasons.append("aml_review")
if risk >= 0.6:
return Outcome.rejected, risk, reasons
if risk >= 0.25:
return Outcome.review, risk, reasons
return Outcome.approved, risk, reasons
This is the minimal viable engine. In production, the weights and thresholds are loaded from per-tenant config so each customer can dial it for their risk appetite. The audit log records the exact weight set used for a given decision so a regulator can reproduce it.
Audit, post-response
Audit happens after we send the response. Two paths:
import asyncio
from fastapi import BackgroundTasks
@app.post("/v1/identity/verify", response_model=VerifyResponse)
async def verify(
background: BackgroundTasks,
...,
):
# ... orchestration ...
response = outcome.to_response(case_id)
background.add_task(audit.complete_case, case_id, outcome, signals=...)
background.add_task(webhooks.fan_out, tenant_id, "case.completed", response)
return response
BackgroundTasks runs after the response is sent. The 40 ms we'd otherwise wait for the audit DB write becomes 0 ms perceived latency. The trade-off: if the worker crashes between response and audit, you have a gap. Mitigate with periodic reconciliation against the cases table.
Webhooks: at-least-once delivery, signed
Webhook fan-out lives in an arq worker (Redis-backed task queue). The pattern:
import hmac, hashlib, time, json
import httpx
SECRET = b"your-tenant-webhook-secret"
async def fan_out(tenant_id: str, event: str, payload: dict) -> None:
targets = await db.fetch_webhooks(tenant_id, event=event)
body = json.dumps(payload, separators=(",", ":")).encode()
ts = str(int(time.time()))
sig = hmac.new(SECRET, ts.encode() + b"." + body, hashlib.sha256).hexdigest()
headers = {
"X-Ogowkey-Signature": f"t={ts}, v1={sig}",
"X-Ogowkey-Event": event,
"content-type": "application/json",
}
async with httpx.AsyncClient(timeout=10) as c:
for t in targets:
try:
res = await c.post(t.url, content=body, headers=headers)
res.raise_for_status()
await db.mark_webhook_sent(t.id, payload["case_id"])
except Exception:
# Retry with exponential backoff up to 24 hours.
await schedule_retry(t, payload, delay_seconds=60)
Idempotency at the receiver is the customer's responsibility, so we sign and timestamp every delivery and ship redelivery on the same signature.
Cost control
For a verification at 412 ms of wall clock, the cost components are:
- VLM OCR call - ~$0.001 (this is the largest variable cost).
- Face match via Rekognition - ~$0.001.
- AML screening - effectively free (in-process against the OpenSanctions dataset).
- Compute - fractions of a cent on a moderately-sized box.
- Storage - pennies per thousand for the audit log.
Total marginal cost per verification: roughly $0.003–0.005 depending on which model tier you're calling. The pricing model that closes the gap to a healthy margin is per-verification billing, not seat-based.
Operating numbers from production
Some real numbers from our production traffic, taken from a recent week:
- p50 wall-clock latency: 392 ms
- p95 latency: 612 ms
- p99 latency: 1.18 s (dominated by VLM tail latency)
- Success rate (excluding intentionally-rejected customers): 99.7%
- Manual review rate for retail tier: 4.1%
- Reject rate for retail tier: 2.3%
The shape of the distribution is the lesson: the median is good and the 99th percentile is where the work is. Most of the 99th-percentile cases are VLM cold starts or transient network blips on the AML call. Caching warm connections and pre-warming model endpoints cuts the long tail meaningfully.
What to watch out for
Three things that bit us during scale-up:
asyncio.gatherswallows exceptions by default. Usereturn_exceptions=Trueand explicitly handle each result, or one failing worker will kill the whole verification.- HTTPX client connections must be reused (pool them at module level). Re-creating an
AsyncClientper request adds 30–80 ms for TLS handshake. - PII in logs is the recurring boring failure mode. Every team running
printf("user %s logged in")produces an audit-log violation eventually. Strip PII at the logger, not at the call site.
Closing
A real-time KYC API is achievable in 400 ms when you fan out aggressively, treat the decision engine as the load-bearing piece, and push everything that doesn't have to be on the critical path off it. The ML models are commodity now; the orchestration is the product.
For the document-side detail, see Somali national ID verification. For the OCR model picker, see Vision-language models for Somali ID OCR. For the registry call this all leads up to, see Integrating with NIRA.
If you're shipping the same thing and want a sanity check, run a verification through the playground or [get in touch](mailto:olow304@gmail.com?subject=Ogowkey%20 - %20architecture).