Metrics & Observability¶
RestMachine provides a lightweight, platform-agnostic metrics collection system for monitoring application performance and tracking business metrics.
Overview¶
The metrics system is designed to be:
- Platform-agnostic - Core collection works anywhere
- Auto-configured for AWS Lambda - CloudWatch EMF enabled automatically when using
AwsApiGatewayAdapter
- Extensible - Easy to add publishers for any metrics platform
- Zero-overhead when disabled - Metrics collection can be bypassed
- Dependency-injected - Access via standard DI pattern
Platform Support¶
Platform | Auto-Detection | Support | Documentation |
---|---|---|---|
AWS Lambda | ✅ Automatic | CloudWatch EMF enabled by default | AWS Metrics Guide |
ASGI on AWS | ✅ Automatic | CloudWatch EMF enabled when AWS detected | ASGI Integration |
ASGI (Non-AWS) | ❌ Manual | Custom publisher required | ASGI Integration |
Other | ❌ Manual | Custom publisher required | Custom Publishers |
Quick Start¶
AWS Lambda (Auto-configured)¶
When running on AWS Lambda, metrics are automatically enabled and published to CloudWatch using EMF (Embedded Metric Format). No additional configuration is required.
The AwsApiGatewayAdapter
automatically detects that it's running in an AWS Lambda environment and:
- ✅ Creates a CloudWatch EMF publisher by default
- ✅ Configures logging to output EMF-formatted metrics
- ✅ Uses your Lambda function name as the service dimension
- ✅ Publishes metrics after each request
from restmachine import RestApplication
from restmachine_aws import AwsApiGatewayAdapter
app = RestApplication()
@app.get("/users/{id}")
def get_user(id: str, metrics):
# Add custom metrics
metrics.add_metric("users.fetched", 1, unit="Count")
return {"user": id}
# Metrics automatically enabled with CloudWatch EMF
# No explicit configuration needed!
adapter = AwsApiGatewayAdapter(app)
def lambda_handler(event, context):
return adapter.handle_event(event, context)
To customize the configuration or disable metrics, see AWS CloudWatch Metrics Guide.
Other Platforms¶
For non-AWS platforms, implement a custom publisher:
from restmachine import RestApplication
from restmachine.metrics import MetricsPublisher
class MyPublisher(MetricsPublisher):
def is_enabled(self) -> bool:
return True
def publish(self, collector, request=None, response=None, context=None):
# Send metrics to your platform
for name, values in collector.metrics.items():
for metric_value in values:
print(f"{name}: {metric_value.value}")
# Use custom publisher
publisher = MyPublisher()
# Integration depends on platform - see Custom Publishers section
Core Concepts¶
MetricsCollector¶
The MetricsCollector
is injected as a dependency and provides methods to record metrics:
@app.get("/endpoint")
def handler(metrics):
# Add metrics
metrics.add_metric("requests", 1, unit="Count")
# Add dimensions (low-cardinality grouping)
metrics.add_dimension("environment", "production")
# Add metadata (high-cardinality context)
metrics.add_metadata("request_id", request_id)
# Time operations
metrics.start_timer("operation")
do_work()
metrics.stop_timer("operation")
return {"ok": True}
MetricsPublisher¶
The abstract base class for publishing metrics to any platform:
from restmachine.metrics import MetricsPublisher
class MyPublisher(MetricsPublisher):
def is_enabled(self) -> bool:
"""Return True if publishing is enabled."""
return True
def publish(self, collector, request=None, response=None, context=None):
"""Publish collected metrics."""
# collector.metrics - Dict[str, List[MetricValue]]
# collector.get_all_dimensions() - Dict[str, str]
# collector.metadata - Dict[str, Any]
pass
Adding Metrics¶
Basic Metrics¶
@app.get("/orders")
def list_orders(metrics):
# Count metrics
metrics.add_metric("orders.listed", 1, unit="Count")
# Value metrics
metrics.add_metric("orders.total_value", 1250.00, unit="None")
# Size metrics
metrics.add_metric("response.size", 1024, unit="Bytes")
return orders
Using Timers¶
@app.get("/data")
def fetch_data(metrics):
metrics.start_timer("database.query")
data = db.query("SELECT * FROM users")
metrics.stop_timer("database.query") # Adds metric in milliseconds
return data
Multiple Values (Aggregation)¶
@app.get("/batch")
def process_batch(metrics):
for item in items:
# Each call adds to the metric
metrics.add_metric("items.processed", 1, unit="Count")
metrics.add_metric("processing.time", process(item), unit="Milliseconds")
# Publisher receives all values for aggregation
return {"processed": len(items)}
Dimensions vs Metadata¶
Dimensions are for grouping/filtering (low-cardinality):
# Good - few unique values
metrics.add_dimension("environment", "production") # ~3-5 values
metrics.add_dimension("region", "us-east-1") # ~10-20 values
metrics.add_dimension("user_type", "premium") # ~3-10 values
Metadata is for context/debugging (high-cardinality):
# Good - many unique values
metrics.add_metadata("user_id", "12345") # Thousands of values
metrics.add_metadata("request_id", "abc-def-...") # Unique per request
metrics.add_metadata("order_id", order_id) # Unique identifiers
⚠️ Some platforms (like CloudWatch) limit dimensions (max 30). Use metadata for high-cardinality data.
Metric Units¶
Available units from the MetricUnit
enum:
from restmachine.metrics import MetricUnit
# Time
MetricUnit.Seconds
MetricUnit.Milliseconds
MetricUnit.Microseconds
# Bytes
MetricUnit.Bytes
MetricUnit.Kilobytes
MetricUnit.Megabytes
MetricUnit.Gigabytes
# Count
MetricUnit.Count
# Rates
MetricUnit.CountPerSecond
MetricUnit.BytesPerSecond
# Other
MetricUnit.Percent
MetricUnit.None
Usage:
metrics.add_metric("api.latency", 45.2, unit=MetricUnit.Milliseconds)
metrics.add_metric("requests", 1, unit=MetricUnit.Count)
metrics.add_metric("response.size", 2048, unit=MetricUnit.Bytes)
Default Dimensions¶
Set dimensions that apply to all metrics in a request:
@app.get("/endpoint")
def handler(metrics):
# Apply to all metrics
metrics.set_default_dimensions(
environment="production",
version="v2"
)
metrics.add_metric("requests", 1)
# Includes: environment=production, version=v2
return {"ok": True}
Clear defaults if needed:
Isolated Metrics (Advanced)¶
Use EphemeralMetrics
for isolated metric collection:
from restmachine.metrics import EphemeralMetrics
@app.get("/data")
def get_data(metrics, tenant_id: str):
# Main request metrics
metrics.add_metric("api.requests", 1)
# Isolated tenant metrics (no shared dimensions)
tenant_metrics = EphemeralMetrics()
tenant_metrics.add_dimension("tenant_id", tenant_id)
tenant_metrics.add_metric("tenant.requests", 1)
# EphemeralMetrics won't be auto-published
# Use for custom processing
return data
Custom Publishers¶
Create publishers for any metrics platform.
Publisher Interface¶
from restmachine.metrics import MetricsPublisher, MetricsCollector
class MyPublisher(MetricsPublisher):
def is_enabled(self) -> bool:
"""Return True if publishing is enabled."""
return True
def publish(self, collector: MetricsCollector,
request=None, response=None, context=None):
"""Publish collected metrics.
Args:
collector: MetricsCollector with metrics/dimensions/metadata
request: Optional Request object
response: Optional Response object
context: Optional platform context
"""
# Access metrics
for name, values in collector.metrics.items():
for metric_value in values:
# metric_value.value - The numeric value
# metric_value.unit - MetricUnit enum
pass
# Access dimensions
dimensions = collector.get_all_dimensions()
# Access metadata
metadata = collector.metadata
Example: Datadog¶
from restmachine.metrics import MetricsPublisher
import datadog
class DatadogPublisher(MetricsPublisher):
def __init__(self, api_key: str):
datadog.initialize(api_key=api_key)
def is_enabled(self) -> bool:
return True
def publish(self, collector, request=None, response=None, context=None):
dimensions = collector.get_all_dimensions()
tags = [f"{k}:{v}" for k, v in dimensions.items()]
for name, values in collector.metrics.items():
for metric_value in values:
datadog.api.Metric.send(
metric=name,
points=[(int(time.time()), metric_value.value)],
tags=tags
)
Example: Prometheus¶
from restmachine.metrics import MetricsPublisher
from prometheus_client import Counter, Histogram
class PrometheusPublisher(MetricsPublisher):
def __init__(self):
self.counters = {}
self.histograms = {}
def is_enabled(self) -> bool:
return True
def publish(self, collector, request=None, response=None, context=None):
dimensions = collector.get_all_dimensions()
label_names = list(dimensions.keys())
for name, values in collector.metrics.items():
if 'time' in name or 'latency' in name:
# Use Histogram for timing
if name not in self.histograms:
self.histograms[name] = Histogram(
name.replace('.', '_'),
f'Metric {name}',
label_names
)
for value in values:
self.histograms[name].labels(**dimensions).observe(value.value)
else:
# Use Counter for counts
if name not in self.counters:
self.counters[name] = Counter(
name.replace('.', '_'),
f'Metric {name}',
label_names
)
for value in values:
self.counters[name].labels(**dimensions).inc(value.value)
Example: Multi-Publisher¶
Publish to multiple backends simultaneously:
from restmachine.metrics import MetricsPublisher
class MultiPublisher(MetricsPublisher):
def __init__(self, *publishers):
self.publishers = publishers
def is_enabled(self) -> bool:
return any(p.is_enabled() for p in self.publishers)
def publish(self, collector, request=None, response=None, context=None):
for publisher in self.publishers:
if publisher.is_enabled():
try:
publisher.publish(collector, request, response, context)
except Exception as e:
logging.error(f"Publisher {publisher} failed: {e}")
ASGI Integration¶
The ASGI adapter automatically detects AWS environments and enables CloudWatch EMF metrics!
When the ASGIAdapter
detects it's running on AWS (via environment variables like AWS_REGION
, AWS_EXECUTION_ENV
, or ECS metadata), it automatically configures CloudWatch EMF metrics - just like the Lambda adapter.
AWS Auto-Detection (ECS, App Runner, EC2, etc.)¶
If running on AWS infrastructure, metrics are automatically enabled with CloudWatch EMF:
from restmachine import RestApplication
from restmachine.adapters import create_asgi_app
app = RestApplication()
@app.get("/users/{id}")
def get_user(id: str, metrics):
# Metrics automatically available!
metrics.add_metric("users.fetched", 1, unit="Count")
return {"user": id}
# Auto-detects AWS and enables CloudWatch EMF
# Works on: ECS, App Runner, EC2, Lambda (via ASGI), etc.
asgi_app = create_asgi_app(app)
# Run with uvicorn, hypercorn, etc.
# uvicorn module:asgi_app
The adapter automatically detects AWS by checking for:
- AWS_REGION
environment variable
- AWS_EXECUTION_ENV
environment variable (Lambda)
- ECS_CONTAINER_METADATA_URI
(ECS/Fargate)
- AWS_DEFAULT_REGION
environment variable
When AWS is detected:
- ✅ CloudWatch EMF publisher is automatically configured
- ✅ Metrics are published to CloudWatch via logs
- ✅ Service name defaults to asgi-app
(customizable)
- ✅ Namespace defaults to RestMachine
(customizable)
Customizing AWS Configuration¶
from restmachine.adapters import create_asgi_app
# Custom namespace and service name
asgi_app = create_asgi_app(
app,
namespace="MyApp/Production",
service_name="user-api"
)
# High-resolution metrics (1-second)
asgi_app = create_asgi_app(
app,
namespace="MyApp/API",
metrics_resolution=1
)
Environment Variables¶
Configure via environment variables (useful for different environments):
Variable | Description | Default |
---|---|---|
RESTMACHINE_METRICS_ENABLED |
Force enable/disable metrics | Auto-detect AWS |
RESTMACHINE_METRICS_NAMESPACE |
CloudWatch namespace | RestMachine |
RESTMACHINE_SERVICE_NAME |
Service name dimension | asgi-app |
RESTMACHINE_METRICS_RESOLUTION |
Resolution (1 or 60 seconds) | 60 |
Non-AWS Environments¶
For non-AWS platforms (local dev, GCP, Azure, on-prem), provide a custom publisher:
from restmachine import RestApplication
from restmachine.adapters import create_asgi_app
from restmachine.metrics import MetricsPublisher
# Example: Prometheus publisher
class PrometheusPublisher(MetricsPublisher):
def is_enabled(self) -> bool:
return True
def publish(self, collector, request=None, response=None, context=None):
# Publish to Prometheus
pass
# Explicit custom publisher
asgi_app = create_asgi_app(app, metrics_publisher=PrometheusPublisher())
Disabling Metrics¶
# Explicitly disable (overrides auto-detection)
asgi_app = create_asgi_app(app, enable_metrics=False)
# Or via environment variable
# RESTMACHINE_METRICS_ENABLED=false
Priority Order¶
The adapter determines metrics configuration in this priority:
- Explicit
enable_metrics
parameter - Overrides everything RESTMACHINE_METRICS_ENABLED
env var - Overrides auto-detection- AWS auto-detection - Enables EMF if AWS detected
- Default: disabled - No metrics if not in AWS
Using with Server Drivers¶
The auto-detection also works when using RestMachine's server drivers (Uvicorn, Hypercorn):
from restmachine import RestApplication
from restmachine.servers import serve
app = RestApplication()
@app.get("/data")
def get_data(metrics):
metrics.add_metric("requests", 1)
return {"data": "value"}
# Auto-detects AWS and enables EMF
# Metrics parameters passed through to ASGIAdapter
serve(
app,
server="uvicorn",
host="0.0.0.0",
port=8000,
namespace="MyApp/API", # Passed to ASGIAdapter
service_name="api-server"
)
Best Practices¶
1. Use Descriptive Metric Names¶
# Good
metrics.add_metric("users.created", 1)
metrics.add_metric("db.query.latency", query_time)
# Avoid
metrics.add_metric("count", 1)
metrics.add_metric("time", query_time)
2. Consistent Dimensions¶
# Good - consistent dimensions across metrics
metrics.add_dimension("environment", env)
metrics.add_dimension("region", region)
3. Appropriate Units¶
# Correct units for clarity
metrics.add_metric("api.latency", 45.2, unit=MetricUnit.Milliseconds)
metrics.add_metric("db.connections", 5, unit=MetricUnit.Count)
metrics.add_metric("response.size", 1024, unit=MetricUnit.Bytes)
4. Timer Pattern¶
@app.get("/data")
def fetch_data(metrics):
metrics.start_timer("operation.total")
metrics.start_timer("operation.step1")
step1()
metrics.stop_timer("operation.step1")
metrics.start_timer("operation.step2")
step2()
metrics.stop_timer("operation.step2")
metrics.stop_timer("operation.total")
return result
5. Don't Over-dimension¶
# Avoid - too many dimensions
for key, value in request.headers.items():
metrics.add_dimension(key, value)
# Better - selective dimensions + metadata
metrics.add_dimension("user_type", user.type)
metrics.add_metadata("user_id", user.id)
Disabling Metrics¶
When metrics are disabled, the collector is still created but publishing is skipped:
# Disable via publisher
adapter = AwsApiGatewayAdapter(app, enable_metrics=False)
# Handler code works unchanged
@app.get("/test")
def handler(metrics):
# Metrics collected but not published
metrics.add_metric("requests", 1)
return {"ok": True}
This allows you to keep metrics in code and control publishing via configuration.
Platform-Specific Documentation¶
- AWS CloudWatch (EMF) - Auto-configured CloudWatch metrics for Lambda
- Logging Configuration - Custom logging setup
- Performance Optimization - Metrics overhead and optimization
Logging Configuration¶
Metrics use a custom METRICS
log level (25, between INFO and WARNING).
Custom Log Level¶
import logging
from restmachine.metrics import METRICS
# METRICS = 25 (between INFO=20 and WARNING=30)
Manual Configuration¶
import logging
from restmachine.metrics import METRICS
# Configure metrics logger
metrics_logger = logging.getLogger("restmachine.metrics.emf")
metrics_logger.setLevel(METRICS)
# Add handler
handler = logging.StreamHandler()
handler.setLevel(METRICS)
metrics_logger.addHandler(handler)
Control Metrics Output¶
# Disable metrics logging
logging.getLogger("restmachine.metrics.emf").setLevel(logging.WARNING)
# Re-enable
logging.getLogger("restmachine.metrics.emf").setLevel(METRICS)
Environment-Specific¶
import os
if os.environ.get("ENV") == "production":
logging.getLogger("restmachine.metrics.emf").setLevel(METRICS)
else:
# Disable in development
logging.getLogger("restmachine.metrics.emf").setLevel(logging.CRITICAL)
Examples¶
E-commerce API¶
@app.post("/orders")
def create_order(order_data, metrics):
# Dimensions for grouping
metrics.add_dimension("order_type", order_data.type)
metrics.add_dimension("payment_method", order_data.payment)
# Time validation
metrics.start_timer("order.validation")
validate(order_data)
metrics.stop_timer("order.validation")
# Time creation
metrics.start_timer("order.creation")
order = db.create_order(order_data)
metrics.stop_timer("order.creation")
# Business metrics
metrics.add_metric("orders.created", 1, unit=MetricUnit.Count)
metrics.add_metric("order.total", order.total, unit=MetricUnit.None)
# Context metadata
metrics.add_metadata("order_id", order.id)
metrics.add_metadata("customer_id", order.customer_id)
return order
Multi-Region API¶
@app.get("/data")
def get_data(metrics, region: str):
# Add region dimension
metrics.add_dimension("region", region)
# Track by region
metrics.start_timer(f"data.fetch.{region}")
data = fetch_from_region(region)
metrics.stop_timer(f"data.fetch.{region}")
# Count and size
metrics.add_metric("data.fetched", len(data), unit=MetricUnit.Count)
metrics.add_metric("data.size", sys.getsizeof(data), unit=MetricUnit.Bytes)
return data
Troubleshooting¶
Metrics dependency is None¶
The metrics dependency is always available. If you're getting None:
- Check that your platform adapter is properly configured
- Verify the dependency injection is working
Publisher not being called¶
- Check
publisher.is_enabled()
returns True - Verify publisher is passed to adapter
- Check logging configuration
Performance concerns¶
- Metrics collection is lightweight (~0.1-0.5ms per request)
- Publishing happens after response is sent (non-blocking)
- Disable in development to reduce noise
Related Documentation¶
- AWS CloudWatch Metrics - CloudWatch EMF configuration
- Performance Optimization - Overhead and optimization
- Dependency Injection - Understanding DI in RestMachine