Enterprise API Gateway

Part 3: Rate Limiting & Circuit Breaking

software active Part 3 of 4
Technologies:
GoEnvoygRPCKubernetesPrometheus

In this part, we’ll implement critical reliability patterns to ensure our API gateway can handle high loads and gracefully degrade when upstream services are unhealthy.

Reliability Patterns Overview

We’ll implement several key patterns:

  • Rate Limiting: Protect against traffic spikes and abuse
  • Circuit Breaking: Prevent cascade failures
  • Bulkhead Isolation: Isolate different types of traffic
  • Graceful Degradation: Maintain service during partial failures

Distributed Rate Limiting

We’ll use Redis to implement distributed rate limiting across multiple gateway instances:

// internal/ratelimit/redis.go
package ratelimit

import (
    "context"
    "fmt"
    "time"
    
    "github.com/go-redis/redis/v8"
)

type RedisRateLimiter struct {
    client *redis.Client
}

func NewRedisRateLimiter(client *redis.Client) *RedisRateLimiter {
    return &RedisRateLimiter{client: client}
}

func (r *RedisRateLimiter) Allow(ctx context.Context, key string, limit int, window time.Duration) (bool, error) {
    // Use sliding window log algorithm
    now := time.Now()
    windowStart := now.Add(-window)
    
    pipe := r.client.Pipeline()
    
    // Remove expired entries
    pipe.ZRemRangeByScore(ctx, key, "0", fmt.Sprintf("%d", windowStart.UnixNano()))
    
    // Count current requests
    countCmd := pipe.ZCard(ctx, key)
    
    // Add current request
    pipe.ZAdd(ctx, key, &redis.Z{
        Score:  float64(now.UnixNano()),
        Member: fmt.Sprintf("%d", now.UnixNano()),
    })
    
    // Set expiration
    pipe.Expire(ctx, key, window)
    
    _, err := pipe.Exec(ctx)
    if err != nil {
        return false, err
    }
    
    return countCmd.Val() < int64(limit), nil
}

type RateLimitRule struct {
    Pattern string        // URL pattern
    Limit   int          // Requests per window
    Window  time.Duration // Time window
    Scope   string       // "user", "ip", "global"
}

type RateLimitService struct {
    limiter *RedisRateLimiter
    rules   []RateLimitRule
}

func NewRateLimitService(limiter *RedisRateLimiter) *RateLimitService {
    return &RateLimitService{
        limiter: limiter,
        rules: []RateLimitRule{
            {
                Pattern: "/api/v1/users",
                Limit:   1000,
                Window:  time.Hour,
                Scope:   "user",
            },
            {
                Pattern: "/api/v1/orders",
                Limit:   500,
                Window:  time.Hour,
                Scope:   "user",
            },
            {
                Pattern: "/api/v1/admin",
                Limit:   100,
                Window:  time.Hour,
                Scope:   "user",
            },
            {
                Pattern: "*",
                Limit:   10000,
                Window:  time.Hour,
                Scope:   "ip",
            },
        },
    }
}

func (s *RateLimitService) CheckLimit(ctx context.Context, path, userID, clientIP string) (*RateLimitResult, error) {
    for _, rule := range s.rules {
        if s.matchesPattern(path, rule.Pattern) {
            key := s.buildKey(rule, userID, clientIP)
            allowed, err := s.limiter.Allow(ctx, key, rule.Limit, rule.Window)
            if err != nil {
                return nil, err
            }
            
            if !allowed {
                return &RateLimitResult{
                    Allowed:   false,
                    Rule:      rule,
                    RetryAfter: rule.Window,
                }, nil
            }
        }
    }
    
    return &RateLimitResult{Allowed: true}, nil
}

type RateLimitResult struct {
    Allowed    bool
    Rule       RateLimitRule
    RetryAfter time.Duration
}

Circuit Breaker Implementation

Implement circuit breaker pattern to prevent cascade failures:

// internal/circuitbreaker/breaker.go
package circuitbreaker

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type State int

const (
    StateClosed State = iota
    StateOpen
    StateHalfOpen
)

type CircuitBreaker struct {
    mu                sync.RWMutex
    state             State
    failureCount      int
    successCount      int
    lastFailureTime   time.Time
    lastSuccessTime   time.Time
    
    // Configuration
    failureThreshold  int
    successThreshold  int
    timeout          time.Duration
}

func NewCircuitBreaker(failureThreshold, successThreshold int, timeout time.Duration) *CircuitBreaker {
    return &CircuitBreaker{
        state:            StateClosed,
        failureThreshold: failureThreshold,
        successThreshold: successThreshold,
        timeout:         timeout,
    }
}

func (cb *CircuitBreaker) Execute(ctx context.Context, fn func() error) error {
    cb.mu.Lock()
    state := cb.state
    cb.mu.Unlock()
    
    switch state {
    case StateOpen:
        if cb.shouldAttemptReset() {
            cb.setState(StateHalfOpen)
        } else {
            return fmt.Errorf("circuit breaker is open")
        }
    case StateHalfOpen:
        // Allow limited requests through
    case StateClosed:
        // Normal operation
    }
    
    err := fn()
    
    if err != nil {
        cb.recordFailure()
        return err
    }
    
    cb.recordSuccess()
    return nil
}

func (cb *CircuitBreaker) recordFailure() {
    cb.mu.Lock()
    defer cb.mu.Unlock()
    
    cb.failureCount++
    cb.lastFailureTime = time.Now()
    
    if cb.state == StateClosed && cb.failureCount >= cb.failureThreshold {
        cb.state = StateOpen
    } else if cb.state == StateHalfOpen {
        cb.state = StateOpen
    }
}

func (cb *CircuitBreaker) recordSuccess() {
    cb.mu.Lock()
    defer cb.mu.Unlock()
    
    cb.successCount++
    cb.lastSuccessTime = time.Now()
    
    if cb.state == StateHalfOpen && cb.successCount >= cb.successThreshold {
        cb.state = StateClosed
        cb.failureCount = 0
        cb.successCount = 0
    }
}

func (cb *CircuitBreaker) shouldAttemptReset() bool {
    cb.mu.RLock()
    defer cb.mu.RUnlock()
    
    return time.Since(cb.lastFailureTime) >= cb.timeout
}

func (cb *CircuitBreaker) setState(state State) {
    cb.mu.Lock()
    defer cb.mu.Unlock()
    
    cb.state = state
    if state == StateClosed {
        cb.failureCount = 0
        cb.successCount = 0
    }
}

Circuit Breaker Manager

Manage circuit breakers for different services:

// internal/circuitbreaker/manager.go
package circuitbreaker

import (
    "context"
    "sync"
    "time"
)

type Manager struct {
    mu       sync.RWMutex
    breakers map[string]*CircuitBreaker
    config   Config
}

type Config struct {
    FailureThreshold int
    SuccessThreshold int
    Timeout         time.Duration
}

func NewManager(config Config) *Manager {
    return &Manager{
        breakers: make(map[string]*CircuitBreaker),
        config:   config,
    }
}

func (m *Manager) GetBreaker(service string) *CircuitBreaker {
    m.mu.RLock()
    breaker, exists := m.breakers[service]
    m.mu.RUnlock()
    
    if exists {
        return breaker
    }
    
    m.mu.Lock()
    defer m.mu.Unlock()
    
    // Double-check pattern
    if breaker, exists := m.breakers[service]; exists {
        return breaker
    }
    
    breaker = NewCircuitBreaker(
        m.config.FailureThreshold,
        m.config.SuccessThreshold,
        m.config.Timeout,
    )
    
    m.breakers[service] = breaker
    return breaker
}

func (m *Manager) ExecuteWithBreaker(ctx context.Context, service string, fn func() error) error {
    breaker := m.GetBreaker(service)
    return breaker.Execute(ctx, fn)
}

Bulkhead Pattern

Implement bulkhead isolation using separate connection pools:

// internal/bulkhead/pool.go
package bulkhead

import (
    "context"
    "fmt"
    "net/http"
    "sync"
    "time"
)

type ConnectionPool struct {
    mu          sync.Mutex
    connections chan *http.Client
    maxConns    int
    timeout     time.Duration
}

func NewConnectionPool(maxConns int, timeout time.Duration) *ConnectionPool {
    pool := &ConnectionPool{
        connections: make(chan *http.Client, maxConns),
        maxConns:    maxConns,
        timeout:     timeout,
    }
    
    // Pre-fill the pool
    for i := 0; i < maxConns; i++ {
        client := &http.Client{
            Timeout: timeout,
        }
        pool.connections <- client
    }
    
    return pool
}

func (p *ConnectionPool) Get(ctx context.Context) (*http.Client, error) {
    select {
    case client := <-p.connections:
        return client, nil
    case <-ctx.Done():
        return nil, ctx.Err()
    case <-time.After(p.timeout):
        return nil, fmt.Errorf("timeout acquiring connection")
    }
}

func (p *ConnectionPool) Put(client *http.Client) {
    select {
    case p.connections <- client:
    default:
        // Pool is full, discard the connection
    }
}

type BulkheadManager struct {
    pools map[string]*ConnectionPool
    mu    sync.RWMutex
}

func NewBulkheadManager() *BulkheadManager {
    return &BulkheadManager{
        pools: make(map[string]*ConnectionPool),
    }
}

func (bm *BulkheadManager) GetPool(service string, maxConns int, timeout time.Duration) *ConnectionPool {
    bm.mu.RLock()
    pool, exists := bm.pools[service]
    bm.mu.RUnlock()
    
    if exists {
        return pool
    }
    
    bm.mu.Lock()
    defer bm.mu.Unlock()
    
    if pool, exists := bm.pools[service]; exists {
        return pool
    }
    
    pool = NewConnectionPool(maxConns, timeout)
    bm.pools[service] = pool
    return pool
}

Graceful Degradation

Implement fallback mechanisms for when services are unavailable:

// internal/fallback/service.go
package fallback

import (
    "context"
    "encoding/json"
    "fmt"
    "net/http"
)

type FallbackService struct {
    cacheService CacheService
    staticData   map[string]interface{}
}

type CacheService interface {
    Get(ctx context.Context, key string) ([]byte, error)
    Set(ctx context.Context, key string, value []byte, ttl time.Duration) error
}

func NewFallbackService(cache CacheService) *FallbackService {
    return &FallbackService{
        cacheService: cache,
        staticData: map[string]interface{}{
            "users": []map[string]interface{}{
                {"id": "1", "name": "Cached User", "email": "cached@example.com"},
            },
            "products": []map[string]interface{}{
                {"id": "1", "name": "Cached Product", "price": 99.99},
            },
        },
    }
}

func (fs *FallbackService) GetFallbackResponse(ctx context.Context, path string) (*http.Response, error) {
    // Try cache first
    if cached, err := fs.cacheService.Get(ctx, path); err == nil {
        return &http.Response{
            StatusCode: 200,
            Header: map[string][]string{
                "Content-Type":      {"application/json"},
                "X-Fallback-Source": {"cache"},
            },
            Body: &responseBody{data: cached},
        }, nil
    }
    
    // Fall back to static data
    if data, exists := fs.staticData[getResourceType(path)]; exists {
        jsonData, err := json.Marshal(data)
        if err != nil {
            return nil, err
        }
        
        return &http.Response{
            StatusCode: 200,
            Header: map[string][]string{
                "Content-Type":      {"application/json"},
                "X-Fallback-Source": {"static"},
            },
            Body: &responseBody{data: jsonData},
        }, nil
    }
    
    // Return service unavailable
    return &http.Response{
        StatusCode: 503,
        Header: map[string][]string{
            "Content-Type": {"application/json"},
        },
        Body: &responseBody{
            data: []byte(`{"error": "Service temporarily unavailable"}`),
        },
    }, nil
}

func getResourceType(path string) string {
    if strings.Contains(path, "/users") {
        return "users"
    }
    if strings.Contains(path, "/products") {
        return "products"
    }
    return "unknown"
}

Integration with Envoy

Configure Envoy to use our reliability patterns:

# envoy-reliability.yaml
static_resources:
  listeners:
  - name: listener_0
    address:
      socket_address:
        address: 0.0.0.0
        port_value: 8080
    filter_chains:
    - filters:
      - name: envoy.filters.network.http_connection_manager
        typed_config:
          "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
          stat_prefix: ingress_http
          http_filters:
          - name: envoy.filters.http.local_ratelimit
            typed_config:
              "@type": type.googleapis.com/udpa.type.v1.TypedStruct
              type_url: type.googleapis.com/envoy.extensions.filters.http.local_ratelimit.v3.LocalRateLimit
              value:
                stat_prefix: local_rate_limiter
                token_bucket:
                  max_tokens: 1000
                  tokens_per_fill: 1000
                  fill_interval: 1s
          - name: envoy.filters.http.ext_authz
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.http.ext_authz.v3.ExtAuthz
              grpc_service:
                envoy_grpc:
                  cluster_name: auth_service
          - name: envoy.filters.http.router
          route_config:
            name: local_route
            virtual_hosts:
            - name: local_service
              domains: ["*"]
              routes:
              - match:
                  prefix: "/"
                route:
                  cluster: backend_service
                  timeout: 5s
                  retry_policy:
                    retry_on: "5xx,gateway-error,connect-failure,refused-stream"
                    num_retries: 3
                    per_try_timeout: 2s

Monitoring and Metrics

Add metrics collection for our reliability patterns:

// internal/metrics/collector.go
package metrics

import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
)

var (
    RateLimitCounter = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "gateway_rate_limit_total",
            Help: "Total number of rate limit checks",
        },
        []string{"rule", "allowed"},
    )
    
    CircuitBreakerState = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "gateway_circuit_breaker_state",
            Help: "Current circuit breaker state (0=closed, 1=open, 2=half-open)",
        },
        []string{"service"},
    )
    
    FallbackRequests = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "gateway_fallback_requests_total",
            Help: "Total number of fallback responses served",
        },
        []string{"source", "path"},
    )
)

Testing Reliability Patterns

Test the reliability features:

# Test rate limiting
for i in {1..1001}; do
  curl http://localhost:8080/api/v1/users
done
# Should start returning 429 after 1000 requests

# Test circuit breaker
# Simulate service failures to trigger circuit breaker
curl http://localhost:8080/api/v1/failing-service

Next Steps

In Part 4, we’ll implement comprehensive observability with metrics, distributed tracing, and centralized logging to monitor our API gateway in production.

Topics will include:

  • Prometheus metrics collection
  • Jaeger distributed tracing
  • Structured logging with correlation IDs
  • Alert rules and dashboards