Enterprise API Gateway
Part 3: Rate Limiting & Circuit Breaking
In this part, we’ll implement critical reliability patterns to ensure our API gateway can handle high loads and gracefully degrade when upstream services are unhealthy.
Reliability Patterns Overview
We’ll implement several key patterns:
- Rate Limiting: Protect against traffic spikes and abuse
- Circuit Breaking: Prevent cascade failures
- Bulkhead Isolation: Isolate different types of traffic
- Graceful Degradation: Maintain service during partial failures
Distributed Rate Limiting
We’ll use Redis to implement distributed rate limiting across multiple gateway instances:
// internal/ratelimit/redis.go
package ratelimit
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
type RedisRateLimiter struct {
client *redis.Client
}
func NewRedisRateLimiter(client *redis.Client) *RedisRateLimiter {
return &RedisRateLimiter{client: client}
}
func (r *RedisRateLimiter) Allow(ctx context.Context, key string, limit int, window time.Duration) (bool, error) {
// Use sliding window log algorithm
now := time.Now()
windowStart := now.Add(-window)
pipe := r.client.Pipeline()
// Remove expired entries
pipe.ZRemRangeByScore(ctx, key, "0", fmt.Sprintf("%d", windowStart.UnixNano()))
// Count current requests
countCmd := pipe.ZCard(ctx, key)
// Add current request
pipe.ZAdd(ctx, key, &redis.Z{
Score: float64(now.UnixNano()),
Member: fmt.Sprintf("%d", now.UnixNano()),
})
// Set expiration
pipe.Expire(ctx, key, window)
_, err := pipe.Exec(ctx)
if err != nil {
return false, err
}
return countCmd.Val() < int64(limit), nil
}
type RateLimitRule struct {
Pattern string // URL pattern
Limit int // Requests per window
Window time.Duration // Time window
Scope string // "user", "ip", "global"
}
type RateLimitService struct {
limiter *RedisRateLimiter
rules []RateLimitRule
}
func NewRateLimitService(limiter *RedisRateLimiter) *RateLimitService {
return &RateLimitService{
limiter: limiter,
rules: []RateLimitRule{
{
Pattern: "/api/v1/users",
Limit: 1000,
Window: time.Hour,
Scope: "user",
},
{
Pattern: "/api/v1/orders",
Limit: 500,
Window: time.Hour,
Scope: "user",
},
{
Pattern: "/api/v1/admin",
Limit: 100,
Window: time.Hour,
Scope: "user",
},
{
Pattern: "*",
Limit: 10000,
Window: time.Hour,
Scope: "ip",
},
},
}
}
func (s *RateLimitService) CheckLimit(ctx context.Context, path, userID, clientIP string) (*RateLimitResult, error) {
for _, rule := range s.rules {
if s.matchesPattern(path, rule.Pattern) {
key := s.buildKey(rule, userID, clientIP)
allowed, err := s.limiter.Allow(ctx, key, rule.Limit, rule.Window)
if err != nil {
return nil, err
}
if !allowed {
return &RateLimitResult{
Allowed: false,
Rule: rule,
RetryAfter: rule.Window,
}, nil
}
}
}
return &RateLimitResult{Allowed: true}, nil
}
type RateLimitResult struct {
Allowed bool
Rule RateLimitRule
RetryAfter time.Duration
}
Circuit Breaker Implementation
Implement circuit breaker pattern to prevent cascade failures:
// internal/circuitbreaker/breaker.go
package circuitbreaker
import (
"context"
"fmt"
"sync"
"time"
)
type State int
const (
StateClosed State = iota
StateOpen
StateHalfOpen
)
type CircuitBreaker struct {
mu sync.RWMutex
state State
failureCount int
successCount int
lastFailureTime time.Time
lastSuccessTime time.Time
// Configuration
failureThreshold int
successThreshold int
timeout time.Duration
}
func NewCircuitBreaker(failureThreshold, successThreshold int, timeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
state: StateClosed,
failureThreshold: failureThreshold,
successThreshold: successThreshold,
timeout: timeout,
}
}
func (cb *CircuitBreaker) Execute(ctx context.Context, fn func() error) error {
cb.mu.Lock()
state := cb.state
cb.mu.Unlock()
switch state {
case StateOpen:
if cb.shouldAttemptReset() {
cb.setState(StateHalfOpen)
} else {
return fmt.Errorf("circuit breaker is open")
}
case StateHalfOpen:
// Allow limited requests through
case StateClosed:
// Normal operation
}
err := fn()
if err != nil {
cb.recordFailure()
return err
}
cb.recordSuccess()
return nil
}
func (cb *CircuitBreaker) recordFailure() {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.failureCount++
cb.lastFailureTime = time.Now()
if cb.state == StateClosed && cb.failureCount >= cb.failureThreshold {
cb.state = StateOpen
} else if cb.state == StateHalfOpen {
cb.state = StateOpen
}
}
func (cb *CircuitBreaker) recordSuccess() {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.successCount++
cb.lastSuccessTime = time.Now()
if cb.state == StateHalfOpen && cb.successCount >= cb.successThreshold {
cb.state = StateClosed
cb.failureCount = 0
cb.successCount = 0
}
}
func (cb *CircuitBreaker) shouldAttemptReset() bool {
cb.mu.RLock()
defer cb.mu.RUnlock()
return time.Since(cb.lastFailureTime) >= cb.timeout
}
func (cb *CircuitBreaker) setState(state State) {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.state = state
if state == StateClosed {
cb.failureCount = 0
cb.successCount = 0
}
}
Circuit Breaker Manager
Manage circuit breakers for different services:
// internal/circuitbreaker/manager.go
package circuitbreaker
import (
"context"
"sync"
"time"
)
type Manager struct {
mu sync.RWMutex
breakers map[string]*CircuitBreaker
config Config
}
type Config struct {
FailureThreshold int
SuccessThreshold int
Timeout time.Duration
}
func NewManager(config Config) *Manager {
return &Manager{
breakers: make(map[string]*CircuitBreaker),
config: config,
}
}
func (m *Manager) GetBreaker(service string) *CircuitBreaker {
m.mu.RLock()
breaker, exists := m.breakers[service]
m.mu.RUnlock()
if exists {
return breaker
}
m.mu.Lock()
defer m.mu.Unlock()
// Double-check pattern
if breaker, exists := m.breakers[service]; exists {
return breaker
}
breaker = NewCircuitBreaker(
m.config.FailureThreshold,
m.config.SuccessThreshold,
m.config.Timeout,
)
m.breakers[service] = breaker
return breaker
}
func (m *Manager) ExecuteWithBreaker(ctx context.Context, service string, fn func() error) error {
breaker := m.GetBreaker(service)
return breaker.Execute(ctx, fn)
}
Bulkhead Pattern
Implement bulkhead isolation using separate connection pools:
// internal/bulkhead/pool.go
package bulkhead
import (
"context"
"fmt"
"net/http"
"sync"
"time"
)
type ConnectionPool struct {
mu sync.Mutex
connections chan *http.Client
maxConns int
timeout time.Duration
}
func NewConnectionPool(maxConns int, timeout time.Duration) *ConnectionPool {
pool := &ConnectionPool{
connections: make(chan *http.Client, maxConns),
maxConns: maxConns,
timeout: timeout,
}
// Pre-fill the pool
for i := 0; i < maxConns; i++ {
client := &http.Client{
Timeout: timeout,
}
pool.connections <- client
}
return pool
}
func (p *ConnectionPool) Get(ctx context.Context) (*http.Client, error) {
select {
case client := <-p.connections:
return client, nil
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(p.timeout):
return nil, fmt.Errorf("timeout acquiring connection")
}
}
func (p *ConnectionPool) Put(client *http.Client) {
select {
case p.connections <- client:
default:
// Pool is full, discard the connection
}
}
type BulkheadManager struct {
pools map[string]*ConnectionPool
mu sync.RWMutex
}
func NewBulkheadManager() *BulkheadManager {
return &BulkheadManager{
pools: make(map[string]*ConnectionPool),
}
}
func (bm *BulkheadManager) GetPool(service string, maxConns int, timeout time.Duration) *ConnectionPool {
bm.mu.RLock()
pool, exists := bm.pools[service]
bm.mu.RUnlock()
if exists {
return pool
}
bm.mu.Lock()
defer bm.mu.Unlock()
if pool, exists := bm.pools[service]; exists {
return pool
}
pool = NewConnectionPool(maxConns, timeout)
bm.pools[service] = pool
return pool
}
Graceful Degradation
Implement fallback mechanisms for when services are unavailable:
// internal/fallback/service.go
package fallback
import (
"context"
"encoding/json"
"fmt"
"net/http"
)
type FallbackService struct {
cacheService CacheService
staticData map[string]interface{}
}
type CacheService interface {
Get(ctx context.Context, key string) ([]byte, error)
Set(ctx context.Context, key string, value []byte, ttl time.Duration) error
}
func NewFallbackService(cache CacheService) *FallbackService {
return &FallbackService{
cacheService: cache,
staticData: map[string]interface{}{
"users": []map[string]interface{}{
{"id": "1", "name": "Cached User", "email": "cached@example.com"},
},
"products": []map[string]interface{}{
{"id": "1", "name": "Cached Product", "price": 99.99},
},
},
}
}
func (fs *FallbackService) GetFallbackResponse(ctx context.Context, path string) (*http.Response, error) {
// Try cache first
if cached, err := fs.cacheService.Get(ctx, path); err == nil {
return &http.Response{
StatusCode: 200,
Header: map[string][]string{
"Content-Type": {"application/json"},
"X-Fallback-Source": {"cache"},
},
Body: &responseBody{data: cached},
}, nil
}
// Fall back to static data
if data, exists := fs.staticData[getResourceType(path)]; exists {
jsonData, err := json.Marshal(data)
if err != nil {
return nil, err
}
return &http.Response{
StatusCode: 200,
Header: map[string][]string{
"Content-Type": {"application/json"},
"X-Fallback-Source": {"static"},
},
Body: &responseBody{data: jsonData},
}, nil
}
// Return service unavailable
return &http.Response{
StatusCode: 503,
Header: map[string][]string{
"Content-Type": {"application/json"},
},
Body: &responseBody{
data: []byte(`{"error": "Service temporarily unavailable"}`),
},
}, nil
}
func getResourceType(path string) string {
if strings.Contains(path, "/users") {
return "users"
}
if strings.Contains(path, "/products") {
return "products"
}
return "unknown"
}
Integration with Envoy
Configure Envoy to use our reliability patterns:
# envoy-reliability.yaml
static_resources:
listeners:
- name: listener_0
address:
socket_address:
address: 0.0.0.0
port_value: 8080
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: ingress_http
http_filters:
- name: envoy.filters.http.local_ratelimit
typed_config:
"@type": type.googleapis.com/udpa.type.v1.TypedStruct
type_url: type.googleapis.com/envoy.extensions.filters.http.local_ratelimit.v3.LocalRateLimit
value:
stat_prefix: local_rate_limiter
token_bucket:
max_tokens: 1000
tokens_per_fill: 1000
fill_interval: 1s
- name: envoy.filters.http.ext_authz
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.ext_authz.v3.ExtAuthz
grpc_service:
envoy_grpc:
cluster_name: auth_service
- name: envoy.filters.http.router
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
- match:
prefix: "/"
route:
cluster: backend_service
timeout: 5s
retry_policy:
retry_on: "5xx,gateway-error,connect-failure,refused-stream"
num_retries: 3
per_try_timeout: 2s
Monitoring and Metrics
Add metrics collection for our reliability patterns:
// internal/metrics/collector.go
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
RateLimitCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "gateway_rate_limit_total",
Help: "Total number of rate limit checks",
},
[]string{"rule", "allowed"},
)
CircuitBreakerState = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "gateway_circuit_breaker_state",
Help: "Current circuit breaker state (0=closed, 1=open, 2=half-open)",
},
[]string{"service"},
)
FallbackRequests = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "gateway_fallback_requests_total",
Help: "Total number of fallback responses served",
},
[]string{"source", "path"},
)
)
Testing Reliability Patterns
Test the reliability features:
# Test rate limiting
for i in {1..1001}; do
curl http://localhost:8080/api/v1/users
done
# Should start returning 429 after 1000 requests
# Test circuit breaker
# Simulate service failures to trigger circuit breaker
curl http://localhost:8080/api/v1/failing-service
Next Steps
In Part 4, we’ll implement comprehensive observability with metrics, distributed tracing, and centralized logging to monitor our API gateway in production.
Topics will include:
- Prometheus metrics collection
- Jaeger distributed tracing
- Structured logging with correlation IDs
- Alert rules and dashboards