Performance Optimization¶
RestMachine is designed for high performance with minimal overhead. This guide covers optimization techniques, caching strategies, and best practices for building fast APIs.
Dependency Caching¶
Understanding Dependency Scope¶
RestMachine caches dependencies per request by default:
from restmachine import RestApplication
app = RestApplication()
@app.dependency()
def expensive_computation():
"""Cached per request."""
import time
print("Computing...")
time.sleep(0.1) # Simulate expensive operation
return {"result": "value"}
@app.dependency()
def uses_computation(expensive_computation):
"""Reuses cached computation."""
return {"data": expensive_computation["result"]}
@app.get('/api/data')
def get_data(expensive_computation, uses_computation):
"""Both dependencies use same cached computation."""
# expensive_computation only runs once per request
return {
"direct": expensive_computation,
"indirect": uses_computation
}
Session-Scoped Dependencies¶
Use session scope for expensive resources:
@app.on_startup
def database_pool():
"""Created once at startup, reused across all requests."""
import psycopg2.pool
return psycopg2.pool.SimpleConnectionPool(
minconn=1,
maxconn=20,
host='localhost',
database='mydb'
)
@app.on_shutdown
def close_pool(database_pool):
"""Clean up on shutdown."""
database_pool.closeall()
@app.dependency()
def database_connection(database_pool):
"""Get connection from pool (per request)."""
conn = database_pool.getconn()
try:
yield conn
finally:
database_pool.putconn(conn)
@app.get('/users')
def list_users(database_connection):
"""Use pooled connection."""
with database_connection.cursor() as cur:
cur.execute("SELECT * FROM users")
return {"users": cur.fetchall()}
State Machine Optimization¶
Bypass State Machine for Simple Routes¶
For high-performance endpoints, consider bypassing the state machine:
# Standard route (with state machine)
@app.get('/api/data')
def get_data():
return {"message": "Hello"}
# Direct route (bypass state machine for maximum performance)
@app.get('/api/fast', bypass_state_machine=True)
def fast_endpoint():
"""Ultra-fast endpoint without state machine overhead."""
return {"message": "Fast!"}, 200, {'Content-Type': 'application/json'}
Optimize State Machine Decorators¶
Minimize state machine processing by using specific decorators:
from restmachine.decorators import (
skip_content_negotiation,
skip_auth_check,
cache_response
)
@app.get('/api/public-data')
@skip_content_negotiation # Only return JSON
@skip_auth_check # No authentication needed
@cache_response(max_age=3600) # Cache for 1 hour
def public_data():
"""Optimized public endpoint."""
return {"data": [...]}
ASGI Server Tuning¶
Uvicorn Configuration¶
Optimize Uvicorn for production:
# Production configuration
uvicorn app:asgi_app \
--host 0.0.0.0 \
--port 8000 \
--workers 4 \
--loop uvloop \
--http httptools \
--log-level warning \
--no-access-log \
--limit-concurrency 1000 \
--backlog 2048
Python configuration:
# uvicorn_config.py
import multiprocessing
bind = "0.0.0.0:8000"
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"
loop = "uvloop"
http = "httptools"
log_level = "warning"
access_log = False
limit_concurrency = 1000
backlog = 2048
Run with config:
Hypercorn Configuration¶
Optimize Hypercorn for high performance:
# hypercorn_config.toml
bind = ["0.0.0.0:8000"]
workers = 4
worker_class = "uvloop"
keep_alive = 5
graceful_timeout = 10
# Performance tuning
backlog = 2048
h11_max_incomplete_size = 16384
# Logging
accesslog = "-"
errorlog = "-"
loglevel = "warning"
Connection Pooling¶
Database Connection Pool¶
Use connection pooling for databases:
from contextlib import contextmanager
import psycopg2.pool
@app.on_startup
def db_pool():
"""Create connection pool."""
return psycopg2.pool.ThreadedConnectionPool(
minconn=5,
maxconn=20,
host='localhost',
database='mydb',
user='user',
password='password',
connect_timeout=3
)
@contextmanager
def get_db_connection(db_pool):
"""Get connection from pool with context manager."""
conn = db_pool.getconn()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
db_pool.putconn(conn)
@app.get('/users')
def list_users(db_pool):
"""Use pooled connection efficiently."""
with get_db_connection(db_pool) as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM users LIMIT 100")
return {"users": cur.fetchall()}
HTTP Client Pooling¶
Reuse HTTP connections:
import httpx
@app.on_startup
def http_client():
"""Create persistent HTTP client."""
return httpx.AsyncClient(
timeout=10.0,
limits=httpx.Limits(
max_keepalive_connections=20,
max_connections=100
)
)
@app.on_shutdown
async def close_http_client(http_client):
"""Close HTTP client."""
await http_client.aclose()
@app.get('/api/external')
async def call_external_api(http_client):
"""Use pooled HTTP client."""
response = await http_client.get('https://api.example.com/data')
return response.json()
Caching Strategies¶
In-Memory Caching¶
Implement simple in-memory cache:
from functools import lru_cache
from datetime import datetime, timedelta
# LRU cache for expensive computations
@lru_cache(maxsize=1000)
def expensive_function(param: str):
"""Cached with LRU eviction."""
# Expensive computation
return {"result": param.upper()}
# Time-based cache
CACHE = {}
CACHE_TTL = timedelta(minutes=5)
@app.dependency()
def get_cached_data():
"""Data cached with TTL."""
now = datetime.now()
# Check cache
if 'data' in CACHE:
cached_at, value = CACHE['data']
if now - cached_at < CACHE_TTL:
return value
# Fetch fresh data
data = fetch_expensive_data()
CACHE['data'] = (now, data)
return data
@app.get('/api/cached')
def cached_endpoint(get_cached_data):
return get_cached_data
Redis Caching¶
Use Redis for distributed caching:
import redis
import json
from datetime import timedelta
@app.on_startup
def redis_client():
"""Create Redis connection pool."""
return redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True,
max_connections=20
)
@app.dependency()
def cache(redis_client):
"""Cache operations."""
class Cache:
def __init__(self, client):
self.client = client
def get(self, key: str):
value = self.client.get(key)
return json.loads(value) if value else None
def set(self, key: str, value, ttl: int = 300):
self.client.setex(
key,
ttl,
json.dumps(value)
)
def delete(self, key: str):
self.client.delete(key)
return Cache(redis_client)
@app.get('/users/{user_id}')
def get_user(request, cache, database):
"""Get user with caching."""
user_id = request.path_params['user_id']
cache_key = f"user:{user_id}"
# Try cache first
cached = cache.get(cache_key)
if cached:
return cached
# Fetch from database
user = database.get_user(user_id)
# Cache for 5 minutes
cache.set(cache_key, user, ttl=300)
return user
Response Compression¶
Enable Compression in ASGI Server¶
Configure compression in Uvicorn/Hypercorn:
# With middleware
from starlette.middleware.gzip import GZipMiddleware
app = RestApplication()
# Add compression middleware to ASGI app
from restmachine import ASGIAdapter
asgi_app = ASGIAdapter(app)
asgi_app = GZipMiddleware(asgi_app, minimum_size=1000)
Or configure in reverse proxy (Nginx):
# nginx.conf
http {
gzip on;
gzip_vary on;
gzip_min_length 1000;
gzip_types
text/plain
text/css
text/javascript
application/json
application/javascript
application/xml+rss;
}
Async Operations¶
Async Dependencies¶
Use async for I/O-bound operations:
import asyncio
import httpx
@app.on_startup
async def async_http_client():
"""Async HTTP client."""
return httpx.AsyncClient(timeout=10.0)
@app.dependency()
async def fetch_user_data(async_http_client, request):
"""Async dependency."""
user_id = request.path_params['user_id']
# Parallel requests
user_response, posts_response = await asyncio.gather(
async_http_client.get(f'https://api.example.com/users/{user_id}'),
async_http_client.get(f'https://api.example.com/users/{user_id}/posts')
)
return {
"user": user_response.json(),
"posts": posts_response.json()
}
@app.get('/users/{user_id}/profile')
async def user_profile(fetch_user_data):
"""Async endpoint."""
return fetch_user_data
Background Tasks¶
Offload work to background:
from concurrent.futures import ThreadPoolExecutor
import threading
@app.on_startup
def executor():
"""Thread pool for background tasks."""
return ThreadPoolExecutor(max_workers=10)
@app.dependency()
def background_tasks(executor):
"""Background task runner."""
tasks = []
def add_task(func, *args, **kwargs):
future = executor.submit(func, *args, **kwargs)
tasks.append(future)
def wait_all():
for future in tasks:
future.result()
return add_task, wait_all
@app.post('/users')
def create_user(validate_user, database, background_tasks):
"""Create user with background email."""
add_task, wait_all = background_tasks
# Create user
user = database.create_user(validate_user.model_dump())
# Send email in background
add_task(send_welcome_email, user['email'])
# Don't wait for background tasks
return user, 201
Query Optimization¶
Efficient Database Queries¶
Optimize database access:
@app.get('/users')
def list_users(request, database):
"""Optimized user listing."""
# Use query parameters for filtering
limit = min(int(request.query_params.get('limit', '20')), 100)
offset = int(request.query_params.get('offset', '0'))
# Efficient query with LIMIT/OFFSET
with database.cursor() as cur:
cur.execute(
"""
SELECT id, name, email, created_at
FROM users
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""",
(limit, offset)
)
users = cur.fetchall()
return {
"users": users,
"limit": limit,
"offset": offset
}
N+1 Query Prevention¶
Avoid N+1 queries with eager loading:
@app.get('/posts')
def list_posts(database):
"""List posts with authors (avoid N+1)."""
with database.cursor() as cur:
# Single query with JOIN
cur.execute(
"""
SELECT
p.id, p.title, p.content,
u.id as author_id, u.name as author_name
FROM posts p
JOIN users u ON p.author_id = u.id
ORDER BY p.created_at DESC
LIMIT 20
"""
)
rows = cur.fetchall()
posts = [
{
"id": row[0],
"title": row[1],
"content": row[2],
"author": {
"id": row[3],
"name": row[4]
}
}
for row in rows
]
return {"posts": posts}
Response Optimization¶
Pagination¶
Implement efficient pagination:
from typing import Optional
class PaginationParams:
def __init__(self, page: int = 1, per_page: int = 20):
self.page = max(1, page)
self.per_page = min(per_page, 100) # Max 100 items
self.offset = (self.page - 1) * self.per_page
@app.dependency()
def pagination(request) -> PaginationParams:
"""Parse pagination parameters."""
page = int(request.query_params.get('page', '1'))
per_page = int(request.query_params.get('per_page', '20'))
return PaginationParams(page, per_page)
@app.get('/users')
def list_users(pagination: PaginationParams, database):
"""Paginated user list."""
with database.cursor() as cur:
# Get total count
cur.execute("SELECT COUNT(*) FROM users")
total = cur.fetchone()[0]
# Get page of results
cur.execute(
"SELECT * FROM users LIMIT %s OFFSET %s",
(pagination.per_page, pagination.offset)
)
users = cur.fetchall()
return {
"users": users,
"pagination": {
"page": pagination.page,
"per_page": pagination.per_page,
"total": total,
"pages": (total + pagination.per_page - 1) // pagination.per_page
}
}
Field Selection¶
Allow clients to select fields:
@app.get('/users/{user_id}')
def get_user(request, database):
"""Get user with field selection."""
user_id = request.path_params['user_id']
# Parse fields parameter
fields_param = request.query_params.get('fields', '')
allowed_fields = {'id', 'name', 'email', 'created_at', 'bio'}
if fields_param:
selected_fields = set(fields_param.split(',')) & allowed_fields
else:
selected_fields = allowed_fields
# Build query
field_list = ', '.join(selected_fields)
with database.cursor() as cur:
cur.execute(
f"SELECT {field_list} FROM users WHERE id = %s",
(user_id,)
)
row = cur.fetchone()
if not row:
from restmachine import Response
return Response(404, '{"error": "Not found"}')
# Build response with selected fields
user = dict(zip(selected_fields, row))
return user
Monitoring and Profiling¶
Request Timing¶
Track request performance:
import time
import logging
logger = logging.getLogger(__name__)
@app.dependency()
def request_timer(request):
"""Track request timing."""
start_time = time.time()
yield
duration = time.time() - start_time
# Log slow requests
if duration > 1.0:
logger.warning(
f"Slow request: {request.method} {request.path} took {duration:.2f}s"
)
# Add timing header
return {'X-Response-Time': f"{duration:.3f}"}
@app.get('/api/data')
def get_data(request_timer):
"""Endpoint with timing."""
# Simulate work
time.sleep(0.1)
return {"message": "Hello"}, 200, request_timer
Memory Profiling¶
Monitor memory usage:
import tracemalloc
import logging
logger = logging.getLogger(__name__)
@app.on_startup
def start_memory_profiling():
"""Start memory profiling."""
tracemalloc.start()
@app.dependency()
def memory_monitor():
"""Monitor memory for request."""
snapshot1 = tracemalloc.take_snapshot()
yield
snapshot2 = tracemalloc.take_snapshot()
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
# Log top memory allocations
for stat in top_stats[:3]:
logger.debug(f"Memory: {stat}")
@app.get('/api/data')
def get_data(memory_monitor):
"""Endpoint with memory monitoring."""
data = fetch_large_dataset()
return data
Complete Example¶
Here's a complete high-performance application:
from restmachine import RestApplication, Request
from functools import lru_cache
import psycopg2.pool
import redis
import json
from datetime import datetime
app = RestApplication()
# Connection pools
@app.on_startup
def db_pool():
return psycopg2.pool.ThreadedConnectionPool(
minconn=5, maxconn=20,
host='localhost', database='mydb'
)
@app.on_startup
def redis_client():
return redis.Redis(
host='localhost', decode_responses=True,
max_connections=20
)
# Caching dependency
@app.dependency()
def cache(redis_client):
class Cache:
def get(self, key):
val = redis_client.get(key)
return json.loads(val) if val else None
def set(self, key, value, ttl=300):
redis_client.setex(key, ttl, json.dumps(value))
return Cache()
# Optimized user endpoint
@app.get('/users/{user_id}')
def get_user(request, db_pool, cache):
"""High-performance user endpoint."""
user_id = request.path_params['user_id']
cache_key = f"user:{user_id}"
# Try cache
user = cache.get(cache_key)
if user:
return user, 200, {'X-Cache': 'HIT'}
# Database query
conn = db_pool.getconn()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT id, name, email FROM users WHERE id = %s",
(user_id,)
)
row = cur.fetchone()
if not row:
from restmachine import Response
return Response(404, '{"error": "Not found"}')
user = {"id": row[0], "name": row[1], "email": row[2]}
# Cache result
cache.set(cache_key, user, ttl=300)
return user, 200, {'X-Cache': 'MISS'}
finally:
db_pool.putconn(conn)
# ASGI with optimizations
from restmachine import ASGIAdapter
from starlette.middleware.gzip import GZipMiddleware
asgi_app = ASGIAdapter(app)
asgi_app = GZipMiddleware(asgi_app, minimum_size=1000)
Best Practices¶
1. Use Connection Pooling¶
Always pool database and HTTP connections:
# Good: Connection pool
@app.on_startup
def db_pool():
return create_pool(min=5, max=20)
# Bad: New connection per request
@app.dependency()
def database():
return create_connection() # Don't do this!
2. Cache Expensive Operations¶
Cache computations and queries:
# Good: Cached
@lru_cache(maxsize=1000)
def expensive_computation(param):
return result
# Bad: Recompute every time
def expensive_computation(param):
return result # No caching
3. Use Async for I/O¶
Use async for I/O-bound operations:
# Good: Async I/O
@app.get('/data')
async def get_data(http_client):
return await http_client.get('https://api.example.com')
# Less optimal: Sync I/O (blocks worker)
@app.get('/data')
def get_data():
return requests.get('https://api.example.com')
4. Implement Pagination¶
Always paginate large result sets:
# Good: Paginated
@app.get('/users')
def list_users(pagination):
return db.query().limit(pagination.per_page).offset(pagination.offset)
# Bad: Return all results
@app.get('/users')
def list_users():
return db.query().all() # Could be millions!
5. Monitor Performance¶
Track and log performance metrics:
import time
@app.dependency()
def perf_monitor(request):
start = time.time()
yield
duration = time.time() - start
if duration > 1.0:
logger.warning(f"Slow: {request.path} {duration:.2f}s")
Next Steps¶
- Lifecycle → - Manage application lifecycle
- State Machine → - Understand request flow
- Testing → - Performance testing
- Deployment → - Production deployment