OAuth 2.0 and mTLS in Financial APIs
OAuth 2.0 and mTLS in Financial APIs
Standard OAuth 2.0 was designed for APIs where the worst-case breach is someone reading your social media posts. Financial APIs have a different threat model: a breach means unauthorized fund transfers, identity theft, or regulatory fines measured in percentage of global revenue. The Financial-grade API (FAPI) security profile closes the gaps.
Why Standard OAuth 2.0 Falls Short
Consider the authorization code flow: the bank redirects the user to redirect_uri?code=xyz&state=abc. An attacker with access to the browser history, a malicious browser extension, or a compromised redirect endpoint can steal the authorization code. In social media, this gives them read access to your posts. In banking, it gives them access to your account data and potentially payment initiation.
FAPI 2.0 addresses this with a combination of mandatory controls:
from dataclasses import dataclass
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
import json
import time
import base64
import hashlib
@dataclass
class FAPISecurityProfile:
"""
FAPI 2.0 Security Profile requirements.
This is a configuration checklist for a FAPI-compliant API.
Each requirement addresses a specific attack vector.
"""
# Mandatory: mTLS for all token endpoint requests
require_mtls: bool = True
# Mandatory: Certificate-bound access tokens (RFC 8705)
# Prevents token theft — a stolen token is useless without the certificate
require_certificate_bound_tokens: bool = True
# Mandatory: PKCE with S256 challenge method
# Prevents authorization code interception
require_pkce: bool = True
pkce_method: str = "S256"
# Mandatory: Pushed Authorization Requests (PAR)
# Authorization parameters sent server-to-server, not via browser
require_par: bool = True
# Mandatory: Signed request objects (JAR)
# Prevents parameter tampering
require_signed_requests: bool = True
# Recommended: sender-constrained refresh tokens
require_sender_constrained_refresh: bool = True
# Token lifetimes (seconds)
access_token_lifetime: int = 300 # 5 minutes
refresh_token_lifetime: int = 86400 # 24 hours
class MTLSTokenBinder:
"""
Certificate-bound access token implementation (RFC 8705).
When a TPP requests an access token, the bank's token endpoint
computes the SHA-256 thumbprint of the TPP's client certificate
(presented during the mTLS handshake). This thumbprint is embedded
in the access token.
When the TPP later presents the access token to a resource endpoint,
the bank re-computes the certificate thumbprint and verifies it
matches the one embedded in the token. A stolen token presented
from a different TLS connection (with a different certificate) is
rejected.
"""
@staticmethod
def compute_certificate_thumbprint(cert_pem: bytes) -> str:
"""
Compute the x5t#S256 thumbprint of a client certificate.
This is the SHA-256 hash of the DER-encoded certificate,
base64url-encoded.
"""
cert = x509.load_pem_x509_certificate(cert_pem)
der_bytes = cert.public_bytes(serialization.Encoding.DER)
digest = hashlib.sha256(der_bytes).digest()
return base64.urlsafe_b64encode(digest).rstrip(b'=').decode()
@staticmethod
def create_bound_token(
payload: dict, certificate_thumbprint: str
) -> dict:
"""
Create an access token bound to a client certificate.
The 'cnf' (confirmation) claim contains the certificate
thumbprint. Resource servers MUST verify this.
"""
payload["cnf"] = {
"x5t#S256": certificate_thumbprint
}
return payload
@staticmethod
def verify_token_binding(
token_payload: dict, presented_cert_pem: bytes
) -> bool:
"""
Verify that the presented certificate matches the token binding.
This check happens on EVERY API request. It must be fast.
In production, the thumbprint is cached per TLS session.
"""
expected = token_payload.get("cnf", {}).get("x5t#S256")
if not expected:
return False # Token is not certificate-bound
actual = MTLSTokenBinder.compute_certificate_thumbprint(
presented_cert_pem
)
# Constant-time comparison to prevent timing attacks
return hashlib.sha256(expected.encode()).digest() == \
hashlib.sha256(actual.encode()).digest()
Pushed Authorization Requests (PAR)
In standard OAuth, the authorization parameters (scope, redirect_uri, etc.) are sent as URL query parameters through the browser. This exposes them to browser history, referrer headers, and malicious browser extensions. PAR sends these parameters directly from the TPP’s server to the bank’s server:
import secrets
from datetime import datetime, timedelta
class PushedAuthorizationRequestHandler:
"""
PAR endpoint (RFC 9126): receives authorization parameters
server-to-server and returns a request_uri.
Flow:
1. TPP sends authorization parameters to the PAR endpoint (via mTLS)
2. Bank validates parameters and stores them server-side
3. Bank returns a request_uri (opaque reference)
4. TPP redirects the user to:
/authorize?client_id=X&request_uri=urn:ietf:params:oauth:request_uri:ABC
5. Bank loads the stored parameters using the request_uri
Benefits:
- Authorization parameters never pass through the browser
- Request_uri is single-use and short-lived (60 seconds)
- Bank can validate parameters before user interaction begins
"""
def __init__(self):
self._stored_requests: dict[str, dict] = {}
def handle_par_request(
self, client_id: str, request_params: dict,
client_certificate_thumbprint: str
) -> dict:
"""
Process a PAR request and return a request_uri.
"""
# Validate client identity via mTLS certificate
if not self._validate_client(client_id, client_certificate_thumbprint):
return {"error": "invalid_client"}
# Validate required FAPI parameters
required = {"scope", "redirect_uri", "code_challenge", "code_challenge_method"}
missing = required - set(request_params.keys())
if missing:
return {"error": "invalid_request", "missing": list(missing)}
if request_params.get("code_challenge_method") != "S256":
return {"error": "invalid_request", "detail": "Must use S256"}
# Generate request_uri
request_uri = f"urn:ietf:params:oauth:request_uri:{secrets.token_urlsafe(32)}"
self._stored_requests[request_uri] = {
"client_id": client_id,
"params": request_params,
"created_at": datetime.utcnow(),
"expires_at": datetime.utcnow() + timedelta(seconds=60),
"used": False,
}
return {
"request_uri": request_uri,
"expires_in": 60,
}
def retrieve_and_consume(self, request_uri: str) -> dict | None:
"""
Retrieve stored parameters and mark as used (single-use).
"""
stored = self._stored_requests.get(request_uri)
if not stored:
return None
if stored["used"]:
return None # Already consumed — replay attack
if datetime.utcnow() > stored["expires_at"]:
del self._stored_requests[request_uri]
return None # Expired
stored["used"] = True
return stored["params"]
def _validate_client(
self, client_id: str, cert_thumbprint: str
) -> bool:
"""
Validate that the client certificate matches the registered TPP.
In production, the bank maintains a registry of TPP certificates
(from the national competent authority or eIDAS trust framework).
"""
# Look up registered certificate for this client
return True # Simplified
class JWTRequestObjectBuilder:
"""
Build signed JWT request objects (JAR — RFC 9101).
A request object is a JWT containing all authorization parameters,
signed by the TPP's private key. This provides:
1. Integrity: parameters can't be modified in transit
2. Non-repudiation: the TPP can't deny making the request
3. Confidentiality: can be encrypted if needed (JWE)
"""
def __init__(self, client_id: str, private_key: ec.EllipticCurvePrivateKey):
self._client_id = client_id
self._private_key = private_key
def build_request_object(
self, authorization_params: dict,
audience: str # The bank's authorization endpoint
) -> str:
"""
Build a signed request object JWT.
The JWT header specifies the signing algorithm (ES256 for FAPI).
The payload contains all authorization parameters plus:
- iss: client_id (who created this request)
- aud: authorization server (who should accept it)
- exp: expiry (short-lived — 5 minutes max)
- jti: unique identifier (prevent replay)
- iat: issued-at timestamp
"""
header = {
"alg": "ES256", # ECDSA with P-256 (FAPI requirement)
"typ": "oauth-authz-req+jwt",
"kid": self._get_key_id(),
}
payload = {
**authorization_params,
"iss": self._client_id,
"aud": audience,
"exp": int(time.time()) + 300, # 5 minutes
"iat": int(time.time()),
"nbf": int(time.time()),
"jti": secrets.token_urlsafe(16),
}
# Sign with ES256
header_b64 = self._base64url(json.dumps(header).encode())
payload_b64 = self._base64url(json.dumps(payload).encode())
signing_input = f"{header_b64}.{payload_b64}"
signature = self._private_key.sign(
signing_input.encode(),
ec.ECDSA(hashes.SHA256())
)
signature_b64 = self._base64url(signature)
return f"{signing_input}.{signature_b64}"
def _base64url(self, data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b'=').decode()
def _get_key_id(self) -> str:
"""
Key ID derived from the public key's JWK thumbprint (RFC 7638).
"""
pub = self._private_key.public_key()
numbers = pub.public_numbers()
# JWK thumbprint computation
jwk_members = json.dumps({
"crv": "P-256",
"kty": "EC",
"x": self._base64url(numbers.x.to_bytes(32, 'big')),
"y": self._base64url(numbers.y.to_bytes(32, 'big')),
}, separators=(',', ':'), sort_keys=True)
return self._base64url(hashlib.sha256(jwk_members.encode()).digest())
Token Introspection and Revocation
Financial APIs require immediate token revocation — when a customer revokes consent, all associated tokens must become invalid immediately, not at their natural expiry:
class TokenIntrospectionEndpoint:
"""
OAuth 2.0 Token Introspection (RFC 7662).
Resource servers call this endpoint to verify token validity
and retrieve token metadata. In financial APIs, this is
preferred over local JWT validation because:
1. Tokens can be revoked immediately (no waiting for JWT expiry)
2. The introspection response can include real-time consent status
3. Token metadata doesn't need to be crammed into a JWT
Tradeoff: adds a network round-trip per API call (~2-5ms latency).
Mitigation: cache introspection results for short periods (5-30 seconds)
with cache invalidation on revocation events.
"""
def __init__(self, token_store: dict):
self._tokens = token_store
self._revoked: set[str] = set()
def introspect(
self, token: str, client_cert_thumbprint: str
) -> dict:
"""
Introspect a token and return its metadata.
"""
if token in self._revoked:
return {"active": False}
token_data = self._tokens.get(token)
if not token_data:
return {"active": False}
# Check expiry
if time.time() > token_data.get("exp", 0):
return {"active": False}
# Verify certificate binding
expected_thumbprint = token_data.get("cnf", {}).get("x5t#S256")
if expected_thumbprint and expected_thumbprint != client_cert_thumbprint:
return {"active": False}
return {
"active": True,
"scope": token_data.get("scope"),
"client_id": token_data.get("client_id"),
"exp": token_data.get("exp"),
"iat": token_data.get("iat"),
"consent_id": token_data.get("consent_id"),
"cnf": token_data.get("cnf"),
}
def revoke(self, token: str, client_id: str) -> bool:
"""
Revoke a token. Must propagate immediately.
In a distributed system, this requires either:
- A shared revocation list (Redis, database)
- Event-based propagation (publish revocation event)
- Short-lived tokens (5 min) with no revocation needed
FAPI recommends access token lifetimes of 5 minutes or less,
making revocation primarily relevant for refresh tokens.
"""
token_data = self._tokens.get(token)
if not token_data:
return False
if token_data.get("client_id") != client_id:
return False # Can only revoke own tokens
self._revoked.add(token)
return True
Implementation Checklist
When implementing or auditing a FAPI-compliant API:
| Control | Attack Prevented | Implementation |
|---|---|---|
| mTLS everywhere | Client impersonation | Require client cert on all endpoints |
| Certificate-bound tokens | Token theft/replay | Embed cert thumbprint in token, verify on every request |
| PKCE (S256) | Auth code interception | Generate verifier client-side, verify server-side |
| PAR | Parameter tampering via browser | Send params server-to-server before redirect |
| Signed requests (JAR) | Request modification | Sign all authorization parameters with client key |
| Short token lifetime (5 min) | Token window of exposure | Set exp to 300 seconds max |
| Introspection | Delayed revocation | Verify token status in real-time |
| JARM (signed responses) | Response spoofing | Bank signs authorization responses |
Each control adds latency and implementation complexity. The financial industry accepts this because the alternative — a successful API breach — costs orders of magnitude more than the engineering investment.