TLS and Mutual TLS (mTLS)¶
RestMachine supports TLS/SSL and mutual TLS authentication for secure communications. This guide covers TLS setup with ASGI servers, AWS integration, and client certificate authentication.
TLS with ASGI Servers¶
Uvicorn with TLS¶
Run Uvicorn with TLS certificates:
# Generate self-signed certificates for development
openssl req -x509 -newkey rsa:4096 -nodes \
-keyout key.pem -out cert.pem -days 365 \
-subj "/CN=localhost"
# Run with TLS
uvicorn app:asgi_app \
--host 0.0.0.0 \
--port 8443 \
--ssl-keyfile key.pem \
--ssl-certfile cert.pem
Production configuration:
# With Let's Encrypt certificates
uvicorn app:asgi_app \
--host 0.0.0.0 \
--port 443 \
--ssl-keyfile /etc/letsencrypt/live/example.com/privkey.pem \
--ssl-certfile /etc/letsencrypt/live/example.com/fullchain.pem \
--workers 4
Hypercorn with TLS¶
Run Hypercorn with TLS support:
# Basic TLS
hypercorn app:asgi_app \
--bind 0.0.0.0:8443 \
--certfile cert.pem \
--keyfile key.pem
# Production with HTTP/2
hypercorn app:asgi_app \
--bind 0.0.0.0:443 \
--certfile /etc/letsencrypt/live/example.com/fullchain.pem \
--keyfile /etc/letsencrypt/live/example.com/privkey.pem \
--workers 4 \
--worker-class uvloop
Hypercorn Configuration File¶
Use a configuration file for advanced settings:
# hypercorn_config.toml
bind = ["0.0.0.0:443"]
certfile = "/etc/letsencrypt/live/example.com/fullchain.pem"
keyfile = "/etc/letsencrypt/live/example.com/privkey.pem"
workers = 4
worker_class = "uvloop"
accesslog = "-"
errorlog = "-"
Run with config:
Mutual TLS (mTLS)¶
Client Certificate Authentication¶
Enable client certificate verification:
Uvicorn with mTLS¶
# Run with client certificate verification
uvicorn app:asgi_app \
--host 0.0.0.0 \
--port 8443 \
--ssl-keyfile server-key.pem \
--ssl-certfile server-cert.pem \
--ssl-ca-certs ca-cert.pem \
--ssl-cert-reqs 2 # CERT_REQUIRED
Hypercorn with mTLS¶
# hypercorn_mtls_config.toml
bind = ["0.0.0.0:8443"]
certfile = "server-cert.pem"
keyfile = "server-key.pem"
ca_certs = "ca-cert.pem"
verify_mode = "CERT_REQUIRED"
Accessing Client Certificate¶
RestMachine provides client certificate information through the TLS extension:
from restmachine import RestApplication, Request
from restmachine.extensions.tls import TLSExtension
app = RestApplication()
# Enable TLS extension
app.add_extension(TLSExtension())
@app.dependency()
def client_cert(request: Request):
"""Extract client certificate from request."""
tls = request.extensions.get('tls')
if not tls:
raise ValueError("TLS information not available")
client_cert = tls.get('client_cert')
if not client_cert:
raise ValueError("Client certificate required")
return client_cert
@app.dependency()
def authenticated_user(client_cert):
"""Extract user identity from certificate."""
subject = client_cert.get('subject', {})
# Extract Common Name (CN)
cn = None
for rdn in subject.get('rdnSequence', []):
for attr in rdn:
if attr.get('type') == 'commonName':
cn = attr.get('value')
break
if not cn:
raise ValueError("Certificate missing Common Name")
return {
'id': cn,
'cert_serial': client_cert.get('serialNumber'),
'cert_issuer': client_cert.get('issuer')
}
@app.get('/api/secure')
def secure_endpoint(authenticated_user):
"""Endpoint requiring client certificate."""
return {
"message": f"Hello, {authenticated_user['id']}",
"cert_serial": authenticated_user['cert_serial']
}
Certificate Validation¶
Implement custom certificate validation:
from datetime import datetime
@app.dependency()
def validate_client_cert(client_cert):
"""Validate client certificate."""
# Check expiration
not_after = client_cert.get('notAfter')
if not_after:
expiry = datetime.strptime(not_after, '%b %d %H:%M:%S %Y %Z')
if expiry < datetime.now():
raise ValueError("Certificate expired")
# Check issuer
issuer = client_cert.get('issuer', {})
org = None
for rdn in issuer.get('rdnSequence', []):
for attr in rdn:
if attr.get('type') == 'organizationName':
org = attr.get('value')
break
if org != 'Trusted CA Inc':
raise ValueError("Certificate from untrusted CA")
return client_cert
@app.get('/api/validated')
def validated_endpoint(validate_client_cert):
"""Endpoint with certificate validation."""
subject = validate_client_cert.get('subject', {})
return {"status": "authenticated", "subject": subject}
AWS Integration¶
Application Load Balancer (ALB) with TLS¶
ALB terminates TLS and forwards requests to RestMachine:
from restmachine import RestApplication, Request
app = RestApplication()
@app.dependency()
def client_cert_from_alb(request: Request):
"""Extract client certificate from ALB headers."""
# ALB adds client cert to X-Amzn-Mtls-Clientcert header
cert_header = request.headers.get('x-amzn-mtls-clientcert')
if not cert_header:
raise ValueError("Client certificate required")
# Decode URL-encoded certificate
import urllib.parse
cert_pem = urllib.parse.unquote(cert_header)
# Parse certificate
from cryptography import x509
from cryptography.hazmat.backends import default_backend
cert = x509.load_pem_x509_certificate(
cert_pem.encode(),
default_backend()
)
return {
'subject': cert.subject.rfc4514_string(),
'issuer': cert.issuer.rfc4514_string(),
'serial': str(cert.serial_number),
'not_before': cert.not_valid_before.isoformat(),
'not_after': cert.not_valid_after.isoformat()
}
@app.dependency()
def authenticated_user_alb(client_cert_from_alb):
"""Get user from ALB client certificate."""
# Extract CN from subject
subject = client_cert_from_alb['subject']
cn = None
for part in subject.split(','):
if part.strip().startswith('CN='):
cn = part.strip()[3:]
break
if not cn:
raise ValueError("Certificate missing CN")
return {'id': cn, 'cert': client_cert_from_alb}
@app.get('/api/alb-secure')
def alb_secure_endpoint(authenticated_user_alb):
"""Endpoint with ALB mTLS."""
return {
"message": f"Hello from ALB, {authenticated_user_alb['id']}",
"cert_info": authenticated_user_alb['cert']
}
ALB Configuration¶
Configure ALB for mTLS:
# CloudFormation/Terraform example
Listener:
Port: 443
Protocol: HTTPS
Certificates:
- CertificateArn: arn:aws:acm:region:account:certificate/id
MutualAuthentication:
Mode: verify # or 'passthrough'
TrustStoreArn: arn:aws:elasticloadbalancing:region:account:truststore/name/id
DefaultActions:
- Type: forward
TargetGroupArn: !Ref TargetGroup
API Gateway with TLS¶
API Gateway with mutual TLS:
from restmachine import RestApplication, Request
app = RestApplication()
@app.dependency()
def client_cert_from_apigw(request: Request):
"""Extract client certificate from API Gateway."""
# API Gateway adds certificate info to request context
# Accessed via APIGW-specific event structure
# For RestMachine with API Gateway, certificate info
# is in the request context
context = getattr(request, 'request_context', {})
identity = context.get('identity', {})
cert_info = identity.get('clientCert', {})
if not cert_info:
raise ValueError("Client certificate required")
return {
'subject_dn': cert_info.get('subjectDN'),
'issuer_dn': cert_info.get('issuerDN'),
'serial': cert_info.get('serialNumber'),
'validity': {
'not_before': cert_info.get('validity', {}).get('notBefore'),
'not_after': cert_info.get('validity', {}).get('notAfter')
}
}
@app.dependency()
def authenticated_user_apigw(client_cert_from_apigw):
"""Get user from API Gateway client certificate."""
subject_dn = client_cert_from_apigw['subject_dn']
# Parse subject DN (e.g., "CN=user,OU=dept,O=org")
cn = None
for part in subject_dn.split(','):
if part.strip().startswith('CN='):
cn = part.strip()[3:]
break
if not cn:
raise ValueError("Certificate missing CN")
return {'id': cn, 'cert': client_cert_from_apigw}
@app.get('/api/apigw-secure')
def apigw_secure_endpoint(authenticated_user_apigw):
"""Endpoint with API Gateway mTLS."""
return {
"message": f"Hello from API Gateway, {authenticated_user_apigw['id']}",
"cert_info": authenticated_user_apigw['cert']
}
# For AWS Lambda with API Gateway
from restmachine_aws import AwsApiGatewayAdapter
adapter = AwsApiGatewayAdapter(app)
def lambda_handler(event, context):
return adapter.handle_event(event, context)
API Gateway Configuration¶
Configure API Gateway for mTLS:
# API Gateway REST API with mTLS
DomainName:
DomainName: api.example.com
MutualTlsAuthentication:
TruststoreUri: s3://bucket-name/truststore.pem
TruststoreVersion: version-id
RegionalCertificateArn: arn:aws:acm:region:account:certificate/id
BasePathMapping:
DomainName: !Ref DomainName
RestApiId: !Ref RestApi
Stage: prod
Certificate-Based Authorization¶
Role-Based Access from Certificates¶
Map certificate attributes to roles:
from enum import Enum
class Role(str, Enum):
ADMIN = "admin"
USER = "user"
SERVICE = "service"
# Certificate OU to Role mapping
OU_TO_ROLE = {
'Administrators': Role.ADMIN,
'Users': Role.USER,
'Services': Role.SERVICE
}
@app.dependency()
def user_role(client_cert):
"""Extract role from certificate OU."""
subject = client_cert.get('subject', {})
# Find OU (Organizational Unit)
ou = None
for rdn in subject.get('rdnSequence', []):
for attr in rdn:
if attr.get('type') == 'organizationalUnitName':
ou = attr.get('value')
break
if not ou:
return Role.USER # Default role
return OU_TO_ROLE.get(ou, Role.USER)
@app.dependency()
def require_admin_cert(user_role: Role):
"""Require admin certificate."""
if user_role != Role.ADMIN:
raise PermissionError(f"Admin certificate required, got: {user_role}")
return True
@app.get('/admin/users')
def admin_endpoint(require_admin_cert, authenticated_user):
"""Admin-only endpoint requiring admin certificate."""
return {
"message": "Admin access granted",
"user": authenticated_user['id']
}
Certificate Pinning¶
Validate specific certificate fingerprints:
import hashlib
# Allowed certificate fingerprints (SHA-256)
ALLOWED_CERT_FINGERPRINTS = {
'aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99:aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99',
'11:22:33:44:55:66:77:88:99:aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99:aa:bb:cc:dd:ee:ff'
}
@app.dependency()
def pinned_certificate(client_cert):
"""Validate certificate against pinned fingerprints."""
# Get DER-encoded certificate
der = client_cert.get('der')
if not der:
raise ValueError("Certificate DER encoding not available")
# Calculate SHA-256 fingerprint
fingerprint = hashlib.sha256(der).hexdigest()
fingerprint_formatted = ':'.join(
fingerprint[i:i+2] for i in range(0, len(fingerprint), 2)
)
if fingerprint_formatted not in ALLOWED_CERT_FINGERPRINTS:
raise ValueError("Certificate not in allowed list")
return client_cert
@app.get('/api/pinned')
def pinned_endpoint(pinned_certificate, authenticated_user):
"""Endpoint requiring pinned certificate."""
return {
"message": "Certificate pinning successful",
"user": authenticated_user['id']
}
Complete Examples¶
Full mTLS Application¶
from restmachine import RestApplication, Request
from restmachine.extensions.tls import TLSExtension
from datetime import datetime
from typing import Dict, Any
import logging
app = RestApplication()
# Enable TLS extension
app.add_extension(TLSExtension())
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Certificate validation
@app.dependency()
def client_cert(request: Request) -> Dict[str, Any]:
"""Extract and validate client certificate."""
tls = request.extensions.get('tls')
if not tls:
raise ValueError("TLS not enabled")
cert = tls.get('client_cert')
if not cert:
raise ValueError("Client certificate required")
# Validate expiration
not_after = cert.get('notAfter')
if not_after:
try:
expiry = datetime.strptime(not_after, '%b %d %H:%M:%S %Y %Z')
if expiry < datetime.now():
raise ValueError("Certificate expired")
except ValueError as e:
logger.warning(f"Could not parse certificate expiry: {e}")
return cert
@app.dependency()
def authenticated_user(client_cert: Dict[str, Any]) -> Dict[str, Any]:
"""Extract user from certificate."""
subject = client_cert.get('subject', {})
# Extract attributes
cn = None
email = None
ou = None
for rdn in subject.get('rdnSequence', []):
for attr in rdn:
attr_type = attr.get('type')
if attr_type == 'commonName':
cn = attr.get('value')
elif attr_type == 'emailAddress':
email = attr.get('value')
elif attr_type == 'organizationalUnitName':
ou = attr.get('value')
if not cn:
raise ValueError("Certificate missing Common Name")
user = {
'id': cn,
'email': email,
'department': ou,
'cert_serial': client_cert.get('serialNumber')
}
logger.info(f"User authenticated via mTLS: {cn}")
return user
# Role-based access
@app.dependency()
def user_role(authenticated_user: Dict[str, Any]) -> str:
"""Determine user role from department."""
dept = authenticated_user.get('department', '')
if dept == 'Administrators':
return 'admin'
elif dept == 'Services':
return 'service'
else:
return 'user'
@app.dependency()
def require_admin(user_role: str):
"""Require admin role."""
if user_role != 'admin':
raise PermissionError("Admin access required")
return True
# Routes
@app.get('/api/profile')
def get_profile(authenticated_user: Dict[str, Any]):
"""Get user profile from certificate."""
return {
"user": authenticated_user,
"authenticated_via": "mTLS"
}
@app.get('/api/admin/status')
def admin_status(require_admin, authenticated_user: Dict[str, Any]):
"""Admin endpoint requiring admin certificate."""
return {
"status": "OK",
"admin": authenticated_user['id'],
"timestamp": datetime.now().isoformat()
}
@app.get('/api/cert-info')
def cert_info(client_cert: Dict[str, Any]):
"""Get detailed certificate information."""
return {
"subject": client_cert.get('subject'),
"issuer": client_cert.get('issuer'),
"serial": client_cert.get('serialNumber'),
"validity": {
"not_before": client_cert.get('notBefore'),
"not_after": client_cert.get('notAfter')
}
}
# Error handlers
@app.error_handler(401)
def unauthorized(request, message, **kwargs):
logger.warning(f"Unauthorized access attempt: {message}")
return {
"error": "Unauthorized",
"message": message,
"hint": "Valid client certificate required"
}
@app.error_handler(403)
def forbidden(request, message, **kwargs):
logger.warning(f"Forbidden access attempt: {message}")
return {
"error": "Forbidden",
"message": message
}
# ASGI
from restmachine import ASGIAdapter
asgi_app = ASGIAdapter(app)
if __name__ == '__main__':
# Run with uvicorn
# uvicorn app:asgi_app --host 0.0.0.0 --port 8443 \
# --ssl-keyfile server-key.pem \
# --ssl-certfile server-cert.pem \
# --ssl-ca-certs ca-cert.pem \
# --ssl-cert-reqs 2
pass
AWS ALB mTLS Example¶
from restmachine import RestApplication, Request
from cryptography import x509
from cryptography.hazmat.backends import default_backend
import urllib.parse
import logging
app = RestApplication()
logger = logging.getLogger(__name__)
@app.dependency()
def alb_client_cert(request: Request):
"""Extract client certificate from ALB."""
cert_header = request.headers.get('x-amzn-mtls-clientcert')
if not cert_header:
raise ValueError("Client certificate required")
# Decode URL-encoded certificate
cert_pem = urllib.parse.unquote(cert_header)
# Parse certificate
cert = x509.load_pem_x509_certificate(
cert_pem.encode(),
default_backend()
)
return {
'subject': cert.subject.rfc4514_string(),
'issuer': cert.issuer.rfc4514_string(),
'serial': str(cert.serial_number),
'not_before': cert.not_valid_before,
'not_after': cert.not_valid_after,
'raw_cert': cert
}
@app.dependency()
def validate_alb_cert(alb_client_cert):
"""Validate ALB client certificate."""
from datetime import datetime, timezone
# Check expiration
now = datetime.now(timezone.utc)
if alb_client_cert['not_after'] < now:
raise ValueError("Certificate expired")
if alb_client_cert['not_before'] > now:
raise ValueError("Certificate not yet valid")
# Extract CN
subject = alb_client_cert['subject']
cn = None
for part in subject.split(','):
if part.strip().startswith('CN='):
cn = part.strip()[3:]
break
if not cn:
raise ValueError("Certificate missing CN")
return {'id': cn, 'cert': alb_client_cert}
@app.get('/api/alb-mtls')
def alb_mtls_endpoint(validate_alb_cert):
"""ALB mTLS endpoint."""
return {
"message": f"Authenticated via ALB mTLS",
"user": validate_alb_cert['id'],
"cert_serial": validate_alb_cert['cert']['serial']
}
# ASGI for ALB
from restmachine import ASGIAdapter
asgi_app = ASGIAdapter(app)
Testing mTLS Locally¶
Generate Test Certificates¶
#!/bin/bash
# generate-mtls-certs.sh
# Create CA
openssl req -x509 -newkey rsa:4096 -nodes \
-keyout ca-key.pem -out ca-cert.pem -days 365 \
-subj "/CN=Test CA"
# Create server certificate
openssl req -newkey rsa:4096 -nodes \
-keyout server-key.pem -out server-req.pem \
-subj "/CN=localhost"
openssl x509 -req -in server-req.pem -days 365 \
-CA ca-cert.pem -CAkey ca-key.pem -CAcreateserial \
-out server-cert.pem
# Create client certificate (admin)
openssl req -newkey rsa:4096 -nodes \
-keyout client-admin-key.pem -out client-admin-req.pem \
-subj "/CN=admin/OU=Administrators/emailAddress=admin@example.com"
openssl x509 -req -in client-admin-req.pem -days 365 \
-CA ca-cert.pem -CAkey ca-key.pem -CAcreateserial \
-out client-admin-cert.pem
# Create client certificate (user)
openssl req -newkey rsa:4096 -nodes \
-keyout client-user-key.pem -out client-user-req.pem \
-subj "/CN=user1/OU=Users/emailAddress=user1@example.com"
openssl x509 -req -in client-user-req.pem -days 365 \
-CA ca-cert.pem -CAkey ca-key.pem -CAcreateserial \
-out client-user-cert.pem
echo "Certificates generated successfully"
Test with cURL¶
# Test with admin certificate
curl https://localhost:8443/api/profile \
--cert client-admin-cert.pem \
--key client-admin-key.pem \
--cacert ca-cert.pem
# Test admin endpoint
curl https://localhost:8443/api/admin/status \
--cert client-admin-cert.pem \
--key client-admin-key.pem \
--cacert ca-cert.pem
# Test with user certificate (should fail for admin endpoint)
curl https://localhost:8443/api/admin/status \
--cert client-user-cert.pem \
--key client-user-key.pem \
--cacert ca-cert.pem
# Test without certificate (should fail)
curl https://localhost:8443/api/profile \
--cacert ca-cert.pem
Best Practices¶
1. Always Validate Certificate Expiration¶
@app.dependency()
def validate_cert_expiry(client_cert):
from datetime import datetime
not_after = client_cert.get('notAfter')
if not_after:
expiry = datetime.strptime(not_after, '%b %d %H:%M:%S %Y %Z')
if expiry < datetime.now():
raise ValueError("Certificate expired")
return client_cert
2. Use Certificate Revocation Lists (CRL)¶
@app.dependency()
def check_revocation(client_cert):
"""Check if certificate is revoked."""
serial = client_cert.get('serialNumber')
# Check against CRL or OCSP
if is_revoked(serial):
raise ValueError("Certificate has been revoked")
return client_cert
3. Log Certificate Usage¶
@app.dependency()
def log_cert_usage(authenticated_user, request: Request):
"""Log certificate usage for audit."""
logger.info(
"Certificate authentication",
extra={
'user': authenticated_user['id'],
'path': request.path,
'cert_serial': authenticated_user.get('cert_serial'),
'timestamp': datetime.now().isoformat()
}
)
return authenticated_user
4. Use Strong Cipher Suites¶
# Uvicorn with strong ciphers
uvicorn app:asgi_app \
--ssl-ciphers "ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS"
5. Implement Certificate Rotation¶
@app.dependency()
def check_cert_age(client_cert):
"""Warn if certificate is old."""
from datetime import datetime, timedelta
not_after = client_cert.get('notAfter')
if not_after:
expiry = datetime.strptime(not_after, '%b %d %H:%M:%S %Y %Z')
days_until_expiry = (expiry - datetime.now()).days
if days_until_expiry < 30:
logger.warning(
f"Certificate expires in {days_until_expiry} days. "
f"Serial: {client_cert.get('serialNumber')}"
)
return client_cert
Next Steps¶
- Authentication → - Combine with other auth methods
- Error Handling → - Handle certificate errors
- Deployment → - Deploy with mTLS
- Testing → - Test mTLS endpoints