服务限流与熔断降级构建弹性微服务一、限流降级概述1.1 为什么需要限流降级在微服务架构中系统面临以下挑战流量突增促销活动、热点事件导致流量激增级联故障一个服务不可用导致整个系统崩溃资源耗尽连接池、线程池耗尽导致服务不可用用户体验系统过载时如何保证核心功能可用1.2 限流降级价值┌─────────────────────────────────────────────────────────────────────────┐ │ 限流降级架构 │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────┐ │ │ │ 用户 │ │ │ │ 请求 │ │ │ └────┬────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────┐ │ │ │ 限流层 │ │ │ │ ┌─────────┐ ┌─────────┐ │ │ │ │ │ 令牌桶 │ │ 滑动窗口│ │ │ │ │ │ 限流 │ │ 限流 │ │ │ │ │ └─────────┘ └─────────┘ │ │ │ └───────────────┬─────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────┐ │ │ │ 熔断层 │ │ │ │ ┌─────────┐ ┌─────────┐ │ │ │ │ │ 断路器 │ │ 舱壁隔离│ │ │ │ │ │ Circuit │ │ Bulkhead│ │ │ │ │ └─────────┘ └─────────┘ │ │ │ └───────────────┬─────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────┐ │ │ │ 降级层 │ │ │ │ ┌─────────┐ ┌─────────┐ │ │ │ │ │ 核心业务│ │ 兜底返回│ │ │ │ │ │ 优先 │ │ 默认值 │ │ │ │ │ └─────────┘ └─────────┘ │ │ │ └───────────────┬─────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────┐ │ │ │ 正常响应 │ │ │ └─────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┘二、限流算法2.1 令牌桶算法/** * 令牌桶限流器 */ public class TokenBucketRateLimiter { private final AtomicLong tokens; private final long maxTokens; private final long refillRate; // 每秒补充的令牌数 private final long refillIntervalMillis; private volatile long lastRefillTime; public TokenBucketRateLimiter(long maxTokens, long refillRate) { this.maxTokens maxTokens; this.refillRate refillRate; this.refillIntervalMillis 1000; this.tokens new AtomicLong(maxTokens); this.lastRefillTime System.currentTimeMillis(); } public boolean tryAcquire() { refill(); long current tokens.get(); if (current 0) { return tokens.compareAndSet(current, current - 1); } return false; } public boolean tryAcquire(int permits) { refill(); long current tokens.get(); if (current permits) { return tokens.compareAndSet(current, current - permits); } return false; } private void refill() { long now System.currentTimeMillis(); long elapsed now - lastRefillTime; if (elapsed refillIntervalMillis) { long tokensToAdd (elapsed / refillIntervalMillis) * refillRate; long newTokens Math.min(maxTokens, tokens.get() tokensToAdd); if (tokens.compareAndSet(tokens.get(), newTokens)) { lastRefillTime now; } } } public long availableTokens() { refill(); return tokens.get(); } }2.2 滑动窗口算法/** * 滑动窗口限流器 */ public class SlidingWindowRateLimiter { private final int windowSize; // 窗口大小秒 private final int maxRequests; // 窗口内最大请求数 private final DequeLong timestamps; public SlidingWindowRateLimiter(int windowSize, int maxRequests) { this.windowSize windowSize; this.maxRequests maxRequests; this.timestamps new ConcurrentLinkedDeque(); } public synchronized boolean tryAcquire() { long now System.currentTimeMillis(); long windowStart now - windowSize * 1000L; // 清理过期的请求记录 while (!timestamps.isEmpty() timestamps.peekFirst() windowStart) { timestamps.pollFirst(); } // 检查是否在限制内 if (timestamps.size() maxRequests) { timestamps.addLast(now); return true; } return false; } public synchronized int getCurrentCount() { long now System.currentTimeMillis(); long windowStart now - windowSize * 1000L; while (!timestamps.isEmpty() timestamps.peekFirst() windowStart) { timestamps.pollFirst(); } return timestamps.size(); } }2.3 漏桶算法/** * 漏桶限流器 */ public class LeakyBucketRateLimiter { private final long capacity; // 桶容量 private final long leakRate; // 漏桶速率每秒漏出的请求数 private volatile double water; // 当前水量 private volatile long lastLeakTime; private final Object lock new Object(); public LeakyBucketRateLimiter(long capacity, long leakRate) { this.capacity capacity; this.leakRate leakRate; this.water 0; this.lastLeakTime System.currentTimeMillis(); } public boolean tryAcquire() { synchronized (lock) { leak(); if (water capacity) { water; return true; } return false; } } private void leak() { long now System.currentTimeMillis(); long elapsed now - lastLeakTime; if (elapsed 0) { double leaked (elapsed / 1000.0) * leakRate; water Math.max(0, water - leaked); lastLeakTime now; } } public double getCurrentWater() { synchronized (lock) { leak(); return water; } } }三、Resilience4j限流3.1 依赖配置dependency groupIdio.github.resilience4j/groupId artifactIdresilience4j-spring-boot3/artifactId version2.2.0/version /dependency dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-aop/artifactId /dependency3.2 RateLimiter配置resilience4j: ratelimiter: instances: backendA: limitForPeriod: 10 # 每个周期最大请求数 limitRefreshPeriod: 1s # 周期刷新时间 timeoutDuration: 5s # 获取许可超时时间 eventConsumerBufferSize: 100 # 事件缓冲区大小 registerHealthIndicator: true # 注册健康检查 backendB: limitForPeriod: 100 limitRefreshPeriod: 1s timeoutDuration: 3s3.3 RateLimiter使用Service public class RateLimitedService { private final RateLimiter rateLimiter; public RateLimitedService(RateLimiterRegistry registry) { this.rateLimiter registry.rateLimiter(backendA); } public String getData() { RateLimiter.waitForPermission(rateLimiter); return restTemplate.getForObject(http://backend/api/data, String.class); } RateLimiter(name backendA, fallbackMethod getDataFallback) public String getDataWithAnnotation() { return restTemplate.getForObject(http://backend/api/data, String.class); } private String getDataFallback(Exception e) { return Fallback response - Service temporarily unavailable; } } RestController RateLimiter(name api, fallbackMethod fallback) public class ApiController { private final RateLimiter rateLimiter; GetMapping(/api/data) public ResponseEntityString getData() { return ResponseEntity.ok(externalService.getData()); } private ResponseEntityString fallback(Exception e) { return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE) .body(Rate limit exceeded. Please try again later.); } }四、熔断器模式4.1 熔断器原理关闭状态 (CLOSED) │ │ 失败率超过阈值 ▼ ┌───────────────────┐ │ 熔断打开 │ ◀──────────────┐ │ (OPEN) │ │ │ 所有请求快速失败 │ │ 经过超时时间 └───────────────────┘ │ │ │ │ 探测请求成功 │ ▼ │ ┌───────────────────┐ │ │ 半开状态 │────────────────┘ │ (HALF_OPEN) │ 允许探测请求 └───────────────────┘ │ │ 探测失败 └──────────▶ 回到OPEN状态4.2 Resilience4j熔断配置resilience4j: circuitbreaker: instances: backend: registerHealthIndicator: true slidingWindowType: COUNT_BASED # 或 TIME_BASED slidingWindowSize: 10 # 滑动窗口大小 minimumNumberOfCalls: 5 # 最小调用数 permittedNumberOfCallsInHalfOpenState: 3 automaticTransitionFromOpenToHalfOpenEnabled: true waitDurationInOpenState: 30s # 熔断持续时间 failureRateThreshold: 50 # 失败率阈值 eventConsumerBufferSize: 10 recordExceptions: - java.io.IOException - java.util.concurrent.TimeoutException - com.example.BusinessException ignoreExceptions: - java.lang.IllegalArgumentException4.3 熔断器使用Service public class CircuitBreakerService { private final CircuitBreakerRegistry registry; public CircuitBreakerService() { this.registry CircuitBreakerRegistry.ofDefaults(); } public String getData() { CircuitBreaker circuitBreaker registry.circuitBreaker(backend); SupplierString decoratedSupplier Decorators.ofSupplier(() - restTemplate.getForObject(http://backend/api/data, String.class)) .withCircuitBreaker(circuitBreaker) .withFallback(List.of(Exception.class), e - Fallback data) .decorate(); return decoratedSupplier.get(); } } Component public class CircuitBreakerAspect { Autowired private CircuitBreakerRegistry registry; Around(annotation(circuitBreaker)) public Object handleCircuitBreaker(ProceedingJoinPoint joinPoint, CircuitBreaker circuitBreaker) throws Throwable { CircuitBreaker breaker registry.circuitBreaker(default); return Decorators.ofSupplier(() - { try { return joinPoint.proceed(); } catch (Throwable e) { throw new RuntimeException(e); } }) .withCircuitBreaker(breaker) .withFallback(List.of(Exception.class), e - getFallback(joinPoint)) .decorate() .get(); } }4.4 熔断事件监听Component Slf4j public class CircuitBreakerEventListener { Autowired private CircuitBreakerRegistry registry; PostConstruct public void init() { registry.getAllCircuitBreakers().forEach(this::registerEventListener); } private void registerEventListener(CircuitBreaker circuitBreaker) { circuitBreaker.getEventPublisher() .onStateTransition(event - { CircuitBreakerStateTransition transition event.getStateTransition(); log.warn(CircuitBreaker {} state changed: {}, circuitBreaker.getName(), transition); // 发送告警 if (transition.getToState() CircuitBreaker.State.OPEN) { sendAlert(circuitBreaker.getName()); } }) .onError(event - { log.error(CircuitBreaker {} recorded error, circuitBreaker.getName(), event.getThrowable()); }) .onSuccess(event - { log.debug(CircuitBreaker {} call succeeded in {}ms, circuitBreaker.getName(), event.getElapsedDuration().toMillis()); }) .onCallNotPermitted(event - { log.warn(CircuitBreaker {} rejected call, circuitBreaker.getName()); }); } private void sendAlert(String serviceName) { // 发送告警通知 } }五、舱壁隔离5.1 舱壁模式原理无舱壁隔离 ┌─────────────────────────────────────────────────┐ │ 共享线程池 │ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │ │ │服务A │ │服务B │ │服务C │ │服务D │ │服务E │ │ │ └─────┘ └─────┘ └─────┘ └─────┘ └─────┘ │ │ 全部共享资源一个服务耗尽影响其他服务 │ └─────────────────────────────────────────────────┘ 舱壁隔离 ┌──────────────────┬──────────────────┬──────────────────┐ │ 服务A线程池 │ 服务B线程池 │ 服务C线程池 │ │ ┌─────┐ ┌─────┐│ ┌─────┐ ┌─────┐│ ┌─────┐ ┌─────┐│ │ │线程1│ │线程2││ │线程1│ │线程2││ │线程1│ │线程2││ │ └─────┘ └─────┘│ └─────┘ └─────┘│ └─────┘ └─────┘│ └──────────────────┴──────────────────┴──────────────────┘5.2 Bulkhead配置resilience4j: bulkhead: instances: backendA: maxConcurrentCalls: 10 # 最大并发调用数 maxWaitDuration: 100ms # 获取信号量的最大等待时间 backendB: maxConcurrentCalls: 100 maxWaitDuration: 500ms5.3 Bulkhead使用Service public class BulkheadService { private final BulkheadRegistry registry; public BulkheadService() { this.registry BulkheadRegistry.ofDefaults(); } public String getDataA() { Bulkhead bulkhead registry.bulkhead(backendA); return Decorators.ofSupplier(() - restTemplate.getForObject(http://backend-a/api/data, String.class)) .withBulkhead(bulkhead) .withFallback(List.of(BulkheadFullException.class), e - Service busy) .decorate() .get(); } }六、重试机制6.1 Retry配置resilience4j: retry: instances: backend: maxAttempts: 3 # 最大重试次数 waitDuration: 500ms # 重试间隔 enableExponentialBackoff: true # 指数退避 exponentialBackoffMultiplier: 2 # 退避倍数 retryExceptions: - java.io.IOException - java.util.concurrent.TimeoutException ignoreExceptions: - java.lang.IllegalArgumentException - com.example.BusinessException recordExceptions: - java.net.ConnectException failAfterMaxAttempts: true # 达到最大次数后失败6.2 Retry使用Service public class RetryableService { private final RetryRegistry registry; public RetryableService() { this.registry RetryRegistry.ofDefaults(); } public String getData() { Retry retry registry.retry(backend); Retry.Metrics metrics retry.getMetrics(); log.info(Retry stats: {}, metrics.getAttemptedCalls()); SupplierString decoratedSupplier Decorators.ofSupplier(() - restTemplate.getForObject(http://backend/api/data, String.class)) .withRetry(retry) .decorate(); return decoratedSupplier.get(); } } Retry(name backend, maxAttempts 3, waitDuration 500) public String getDataWithAnnotation() { return restTemplate.getForObject(http://backend/api/data, String.class); }6.3 重试事件监听Component Slf4j public class RetryEventListener { Autowired private RetryRegistry registry; PostConstruct public void init() { registry.getAllRetries().forEach(this::registerEventListener); } private void registerEventListener(Retry retry) { retry.getEventPublisher() .onRetry(event - { log.warn(Retry #{} for {} due to {}, event.getNumberOfRetryAttempts(), event.getName(), event.getLastThrowable().getMessage()); }) .onSuccess(event - { log.debug(Successful call after {} attempts, event.getNumberOfRetryAttempts()); }) .onError(event - { log.error(Failed after {} attempts, event.getNumberOfRetryAttempts(), event.getLastThrowable()); }); } }七、舱壁熔断组合7.1 组合模式配置resilience4j: circuitbreaker: instances: backend: slidingWindowSize: 10 failureRateThreshold: 50 waitDurationInOpenState: 30s bulkhead: instances: backend: maxConcurrentCalls: 20 retry: instances: backend: maxAttempts: 3 waitDuration: 500ms7.2 组合使用Service public class ResilientService { private final CircuitBreakerRegistry circuitBreakerRegistry; private final BulkheadRegistry bulkheadRegistry; private final RetryRegistry retryRegistry; public ResilientService( CircuitBreakerRegistry circuitBreakerRegistry, BulkheadRegistry bulkheadRegistry, RetryRegistry retryRegistry) { this.circuitBreakerRegistry circuitBreakerRegistry; this.bulkheadRegistry bulkheadRegistry; this.retryRegistry retryRegistry; } public String getData() { CircuitBreaker circuitBreaker circuitBreakerRegistry.circuitBreaker(backend); Bulkhead bulkhead bulkheadRegistry.bulkhead(backend); Retry retry retryRegistry.retry(backend); SupplierString supplier Decorators.ofSupplier(() - restTemplate.getForObject(http://backend/api/data, String.class)) .withRetry(retry) .withBulkhead(bulkhead) .withCircuitBreaker(circuitBreaker) .withFallback(List.of(Exception.class), e - getFallback(e)) .decorate(); return supplier.get(); } private String getFallback(Exception e) { log.error(All resilience patterns failed, e); return {\error\: \Service unavailable\}; } }八、降级策略8.1 降级方法设计Component Slf4j public class FallbackProvider { public String userServiceFallback(Exception e) { log.warn(User service fallback triggered, e); return {\users\: [], \fallback\: true}; } public String productServiceFallback(Exception e) { log.warn(Product service fallback triggered, e); return {\products\: [], \fallback\: true}; } public String orderServiceFallback(Exception e) { log.warn(Order service fallback triggered, e); return {\orders\: [], \fallback\: true, \message\: \Order service temporarily unavailable\}; } public User getDefaultUser() { return User.builder() .id(0L) .username(guest) .email(guestexample.com) .build(); } public ListProduct getDefaultProducts() { return Collections.emptyList(); } }8.2 多级降级Service public class MultiLevelFallbackService { public String getData() { try { // 第一级尝试正常调用 return callService(); } catch (CircuitBreakerOpenException e) { log.warn(Circuit breaker open, trying cache, e); return getFromCache(); } catch (Exception e) { log.error(Service call failed, using fallback, e); return getDefaultValue(); } } private String callService() { // 调用远程服务 return restTemplate.getForObject(http://service/api/data, String.class); } private String getFromCache() { String cached cacheService.get(data); if (cached ! null) { return cached; } return getDefaultValue(); } private String getDefaultValue() { return {\data\: \default\, \source\: \fallback\}; } }九、实战案例9.1 网关限流配置spring: cloud: gateway: routes: - id: user-service uri: http://user-service predicates: - Path/api/users/** filters: - name: RequestRateLimiter args: redis-rate-limiter.replenishRate: 100 redis-rate-limiter.burstCapacity: 200 redis-rate-limiter.requestedTokens: 1 - name: CircuitBreaker args: name: userServiceCircuitBreaker fallbackUri: forward:/fallback/user9.2 Sentinel集成spring: cloud: sentinel: enabled: true eager: true transport: dashboard: localhost:8080 port: 8719 filter: order: -2147483648 url-patterns: /** datasource: ds: file: file: classpath: flow-rule.json >Configuration public class SentinelConfig { PostConstruct public void initRules() { ListFlowRule rules new ArrayList(); FlowRule rule new FlowRule(userService) .setCount(100) .setGrade(RuleConstant.FLOW_GRADE_QPS) .setLimitApp(default) .setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT); rules.add(rule); FlowManager.loadRules(rules); } }十、最佳实践10.1 限流配置建议场景推荐策略参数建议API网关令牌桶/滑动窗口根据服务容量设置秒杀活动令牌桶预热预热时间根据流量预估文件上传漏桶限制上传速率数据库操作信号量根据连接池设置10.2 熔断配置建议参数建议值说明滑动窗口大小10-100统计样本量失败率阈值50%超过此值熔断熔断持续时间30-60s半开探测间隔最小调用数5-10统计门槛10.3 监控指标Aspect Component public class ResilienceMetrics { Autowired private MeterRegistry meterRegistry; Around(execution(* com.example.service..*.*(..))) public Object recordMetrics(ProceedingJoinPoint joinPoint) throws Throwable { Timer.Sample sample Timer.start(meterRegistry); try { Object result joinPoint.proceed(); meterRegistry.counter(resilience.success, service, joinPoint.getSignature().getDeclaringTypeName(), method, joinPoint.getSignature().getName()) .increment(); return result; } catch (Exception e) { meterRegistry.counter(resilience.failure, service, joinPoint.getSignature().getDeclaringTypeName(), method, joinPoint.getSignature().getName(), exception, e.getClass().getSimpleName()) .increment(); throw e; } finally { sample.stop(Timer.builder(resilience.latency) .tag(service, joinPoint.getSignature().getDeclaringTypeName()) .register(meterRegistry)); } } }十一、总结限流和熔断降级是构建弹性微服务系统的关键技术通过本文的介绍你可以限流算法令牌桶、滑动窗口、漏桶算法实现Resilience4j限流RateLimiter的使用和配置熔断器模式CircuitBreaker的原理和实现舱壁隔离Bulkhead模式保护系统资源重试机制Retry模式的配置和使用降级策略多级降级和降级方法设计组合模式限流熔断重试组合使用实战案例网关限流和Sentinel集成合理的限流和熔断降级策略可以有效保护系统免受过载和故障影响提高系统的可用性和稳定性。
服务限流与熔断降级:构建弹性微服务
发布时间:2026/5/15 23:11:10
服务限流与熔断降级构建弹性微服务一、限流降级概述1.1 为什么需要限流降级在微服务架构中系统面临以下挑战流量突增促销活动、热点事件导致流量激增级联故障一个服务不可用导致整个系统崩溃资源耗尽连接池、线程池耗尽导致服务不可用用户体验系统过载时如何保证核心功能可用1.2 限流降级价值┌─────────────────────────────────────────────────────────────────────────┐ │ 限流降级架构 │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────┐ │ │ │ 用户 │ │ │ │ 请求 │ │ │ └────┬────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────┐ │ │ │ 限流层 │ │ │ │ ┌─────────┐ ┌─────────┐ │ │ │ │ │ 令牌桶 │ │ 滑动窗口│ │ │ │ │ │ 限流 │ │ 限流 │ │ │ │ │ └─────────┘ └─────────┘ │ │ │ └───────────────┬─────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────┐ │ │ │ 熔断层 │ │ │ │ ┌─────────┐ ┌─────────┐ │ │ │ │ │ 断路器 │ │ 舱壁隔离│ │ │ │ │ │ Circuit │ │ Bulkhead│ │ │ │ │ └─────────┘ └─────────┘ │ │ │ └───────────────┬─────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────┐ │ │ │ 降级层 │ │ │ │ ┌─────────┐ ┌─────────┐ │ │ │ │ │ 核心业务│ │ 兜底返回│ │ │ │ │ │ 优先 │ │ 默认值 │ │ │ │ │ └─────────┘ └─────────┘ │ │ │ └───────────────┬─────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────┐ │ │ │ 正常响应 │ │ │ └─────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┘二、限流算法2.1 令牌桶算法/** * 令牌桶限流器 */ public class TokenBucketRateLimiter { private final AtomicLong tokens; private final long maxTokens; private final long refillRate; // 每秒补充的令牌数 private final long refillIntervalMillis; private volatile long lastRefillTime; public TokenBucketRateLimiter(long maxTokens, long refillRate) { this.maxTokens maxTokens; this.refillRate refillRate; this.refillIntervalMillis 1000; this.tokens new AtomicLong(maxTokens); this.lastRefillTime System.currentTimeMillis(); } public boolean tryAcquire() { refill(); long current tokens.get(); if (current 0) { return tokens.compareAndSet(current, current - 1); } return false; } public boolean tryAcquire(int permits) { refill(); long current tokens.get(); if (current permits) { return tokens.compareAndSet(current, current - permits); } return false; } private void refill() { long now System.currentTimeMillis(); long elapsed now - lastRefillTime; if (elapsed refillIntervalMillis) { long tokensToAdd (elapsed / refillIntervalMillis) * refillRate; long newTokens Math.min(maxTokens, tokens.get() tokensToAdd); if (tokens.compareAndSet(tokens.get(), newTokens)) { lastRefillTime now; } } } public long availableTokens() { refill(); return tokens.get(); } }2.2 滑动窗口算法/** * 滑动窗口限流器 */ public class SlidingWindowRateLimiter { private final int windowSize; // 窗口大小秒 private final int maxRequests; // 窗口内最大请求数 private final DequeLong timestamps; public SlidingWindowRateLimiter(int windowSize, int maxRequests) { this.windowSize windowSize; this.maxRequests maxRequests; this.timestamps new ConcurrentLinkedDeque(); } public synchronized boolean tryAcquire() { long now System.currentTimeMillis(); long windowStart now - windowSize * 1000L; // 清理过期的请求记录 while (!timestamps.isEmpty() timestamps.peekFirst() windowStart) { timestamps.pollFirst(); } // 检查是否在限制内 if (timestamps.size() maxRequests) { timestamps.addLast(now); return true; } return false; } public synchronized int getCurrentCount() { long now System.currentTimeMillis(); long windowStart now - windowSize * 1000L; while (!timestamps.isEmpty() timestamps.peekFirst() windowStart) { timestamps.pollFirst(); } return timestamps.size(); } }2.3 漏桶算法/** * 漏桶限流器 */ public class LeakyBucketRateLimiter { private final long capacity; // 桶容量 private final long leakRate; // 漏桶速率每秒漏出的请求数 private volatile double water; // 当前水量 private volatile long lastLeakTime; private final Object lock new Object(); public LeakyBucketRateLimiter(long capacity, long leakRate) { this.capacity capacity; this.leakRate leakRate; this.water 0; this.lastLeakTime System.currentTimeMillis(); } public boolean tryAcquire() { synchronized (lock) { leak(); if (water capacity) { water; return true; } return false; } } private void leak() { long now System.currentTimeMillis(); long elapsed now - lastLeakTime; if (elapsed 0) { double leaked (elapsed / 1000.0) * leakRate; water Math.max(0, water - leaked); lastLeakTime now; } } public double getCurrentWater() { synchronized (lock) { leak(); return water; } } }三、Resilience4j限流3.1 依赖配置dependency groupIdio.github.resilience4j/groupId artifactIdresilience4j-spring-boot3/artifactId version2.2.0/version /dependency dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-aop/artifactId /dependency3.2 RateLimiter配置resilience4j: ratelimiter: instances: backendA: limitForPeriod: 10 # 每个周期最大请求数 limitRefreshPeriod: 1s # 周期刷新时间 timeoutDuration: 5s # 获取许可超时时间 eventConsumerBufferSize: 100 # 事件缓冲区大小 registerHealthIndicator: true # 注册健康检查 backendB: limitForPeriod: 100 limitRefreshPeriod: 1s timeoutDuration: 3s3.3 RateLimiter使用Service public class RateLimitedService { private final RateLimiter rateLimiter; public RateLimitedService(RateLimiterRegistry registry) { this.rateLimiter registry.rateLimiter(backendA); } public String getData() { RateLimiter.waitForPermission(rateLimiter); return restTemplate.getForObject(http://backend/api/data, String.class); } RateLimiter(name backendA, fallbackMethod getDataFallback) public String getDataWithAnnotation() { return restTemplate.getForObject(http://backend/api/data, String.class); } private String getDataFallback(Exception e) { return Fallback response - Service temporarily unavailable; } } RestController RateLimiter(name api, fallbackMethod fallback) public class ApiController { private final RateLimiter rateLimiter; GetMapping(/api/data) public ResponseEntityString getData() { return ResponseEntity.ok(externalService.getData()); } private ResponseEntityString fallback(Exception e) { return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE) .body(Rate limit exceeded. Please try again later.); } }四、熔断器模式4.1 熔断器原理关闭状态 (CLOSED) │ │ 失败率超过阈值 ▼ ┌───────────────────┐ │ 熔断打开 │ ◀──────────────┐ │ (OPEN) │ │ │ 所有请求快速失败 │ │ 经过超时时间 └───────────────────┘ │ │ │ │ 探测请求成功 │ ▼ │ ┌───────────────────┐ │ │ 半开状态 │────────────────┘ │ (HALF_OPEN) │ 允许探测请求 └───────────────────┘ │ │ 探测失败 └──────────▶ 回到OPEN状态4.2 Resilience4j熔断配置resilience4j: circuitbreaker: instances: backend: registerHealthIndicator: true slidingWindowType: COUNT_BASED # 或 TIME_BASED slidingWindowSize: 10 # 滑动窗口大小 minimumNumberOfCalls: 5 # 最小调用数 permittedNumberOfCallsInHalfOpenState: 3 automaticTransitionFromOpenToHalfOpenEnabled: true waitDurationInOpenState: 30s # 熔断持续时间 failureRateThreshold: 50 # 失败率阈值 eventConsumerBufferSize: 10 recordExceptions: - java.io.IOException - java.util.concurrent.TimeoutException - com.example.BusinessException ignoreExceptions: - java.lang.IllegalArgumentException4.3 熔断器使用Service public class CircuitBreakerService { private final CircuitBreakerRegistry registry; public CircuitBreakerService() { this.registry CircuitBreakerRegistry.ofDefaults(); } public String getData() { CircuitBreaker circuitBreaker registry.circuitBreaker(backend); SupplierString decoratedSupplier Decorators.ofSupplier(() - restTemplate.getForObject(http://backend/api/data, String.class)) .withCircuitBreaker(circuitBreaker) .withFallback(List.of(Exception.class), e - Fallback data) .decorate(); return decoratedSupplier.get(); } } Component public class CircuitBreakerAspect { Autowired private CircuitBreakerRegistry registry; Around(annotation(circuitBreaker)) public Object handleCircuitBreaker(ProceedingJoinPoint joinPoint, CircuitBreaker circuitBreaker) throws Throwable { CircuitBreaker breaker registry.circuitBreaker(default); return Decorators.ofSupplier(() - { try { return joinPoint.proceed(); } catch (Throwable e) { throw new RuntimeException(e); } }) .withCircuitBreaker(breaker) .withFallback(List.of(Exception.class), e - getFallback(joinPoint)) .decorate() .get(); } }4.4 熔断事件监听Component Slf4j public class CircuitBreakerEventListener { Autowired private CircuitBreakerRegistry registry; PostConstruct public void init() { registry.getAllCircuitBreakers().forEach(this::registerEventListener); } private void registerEventListener(CircuitBreaker circuitBreaker) { circuitBreaker.getEventPublisher() .onStateTransition(event - { CircuitBreakerStateTransition transition event.getStateTransition(); log.warn(CircuitBreaker {} state changed: {}, circuitBreaker.getName(), transition); // 发送告警 if (transition.getToState() CircuitBreaker.State.OPEN) { sendAlert(circuitBreaker.getName()); } }) .onError(event - { log.error(CircuitBreaker {} recorded error, circuitBreaker.getName(), event.getThrowable()); }) .onSuccess(event - { log.debug(CircuitBreaker {} call succeeded in {}ms, circuitBreaker.getName(), event.getElapsedDuration().toMillis()); }) .onCallNotPermitted(event - { log.warn(CircuitBreaker {} rejected call, circuitBreaker.getName()); }); } private void sendAlert(String serviceName) { // 发送告警通知 } }五、舱壁隔离5.1 舱壁模式原理无舱壁隔离 ┌─────────────────────────────────────────────────┐ │ 共享线程池 │ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │ │ │服务A │ │服务B │ │服务C │ │服务D │ │服务E │ │ │ └─────┘ └─────┘ └─────┘ └─────┘ └─────┘ │ │ 全部共享资源一个服务耗尽影响其他服务 │ └─────────────────────────────────────────────────┘ 舱壁隔离 ┌──────────────────┬──────────────────┬──────────────────┐ │ 服务A线程池 │ 服务B线程池 │ 服务C线程池 │ │ ┌─────┐ ┌─────┐│ ┌─────┐ ┌─────┐│ ┌─────┐ ┌─────┐│ │ │线程1│ │线程2││ │线程1│ │线程2││ │线程1│ │线程2││ │ └─────┘ └─────┘│ └─────┘ └─────┘│ └─────┘ └─────┘│ └──────────────────┴──────────────────┴──────────────────┘5.2 Bulkhead配置resilience4j: bulkhead: instances: backendA: maxConcurrentCalls: 10 # 最大并发调用数 maxWaitDuration: 100ms # 获取信号量的最大等待时间 backendB: maxConcurrentCalls: 100 maxWaitDuration: 500ms5.3 Bulkhead使用Service public class BulkheadService { private final BulkheadRegistry registry; public BulkheadService() { this.registry BulkheadRegistry.ofDefaults(); } public String getDataA() { Bulkhead bulkhead registry.bulkhead(backendA); return Decorators.ofSupplier(() - restTemplate.getForObject(http://backend-a/api/data, String.class)) .withBulkhead(bulkhead) .withFallback(List.of(BulkheadFullException.class), e - Service busy) .decorate() .get(); } }六、重试机制6.1 Retry配置resilience4j: retry: instances: backend: maxAttempts: 3 # 最大重试次数 waitDuration: 500ms # 重试间隔 enableExponentialBackoff: true # 指数退避 exponentialBackoffMultiplier: 2 # 退避倍数 retryExceptions: - java.io.IOException - java.util.concurrent.TimeoutException ignoreExceptions: - java.lang.IllegalArgumentException - com.example.BusinessException recordExceptions: - java.net.ConnectException failAfterMaxAttempts: true # 达到最大次数后失败6.2 Retry使用Service public class RetryableService { private final RetryRegistry registry; public RetryableService() { this.registry RetryRegistry.ofDefaults(); } public String getData() { Retry retry registry.retry(backend); Retry.Metrics metrics retry.getMetrics(); log.info(Retry stats: {}, metrics.getAttemptedCalls()); SupplierString decoratedSupplier Decorators.ofSupplier(() - restTemplate.getForObject(http://backend/api/data, String.class)) .withRetry(retry) .decorate(); return decoratedSupplier.get(); } } Retry(name backend, maxAttempts 3, waitDuration 500) public String getDataWithAnnotation() { return restTemplate.getForObject(http://backend/api/data, String.class); }6.3 重试事件监听Component Slf4j public class RetryEventListener { Autowired private RetryRegistry registry; PostConstruct public void init() { registry.getAllRetries().forEach(this::registerEventListener); } private void registerEventListener(Retry retry) { retry.getEventPublisher() .onRetry(event - { log.warn(Retry #{} for {} due to {}, event.getNumberOfRetryAttempts(), event.getName(), event.getLastThrowable().getMessage()); }) .onSuccess(event - { log.debug(Successful call after {} attempts, event.getNumberOfRetryAttempts()); }) .onError(event - { log.error(Failed after {} attempts, event.getNumberOfRetryAttempts(), event.getLastThrowable()); }); } }七、舱壁熔断组合7.1 组合模式配置resilience4j: circuitbreaker: instances: backend: slidingWindowSize: 10 failureRateThreshold: 50 waitDurationInOpenState: 30s bulkhead: instances: backend: maxConcurrentCalls: 20 retry: instances: backend: maxAttempts: 3 waitDuration: 500ms7.2 组合使用Service public class ResilientService { private final CircuitBreakerRegistry circuitBreakerRegistry; private final BulkheadRegistry bulkheadRegistry; private final RetryRegistry retryRegistry; public ResilientService( CircuitBreakerRegistry circuitBreakerRegistry, BulkheadRegistry bulkheadRegistry, RetryRegistry retryRegistry) { this.circuitBreakerRegistry circuitBreakerRegistry; this.bulkheadRegistry bulkheadRegistry; this.retryRegistry retryRegistry; } public String getData() { CircuitBreaker circuitBreaker circuitBreakerRegistry.circuitBreaker(backend); Bulkhead bulkhead bulkheadRegistry.bulkhead(backend); Retry retry retryRegistry.retry(backend); SupplierString supplier Decorators.ofSupplier(() - restTemplate.getForObject(http://backend/api/data, String.class)) .withRetry(retry) .withBulkhead(bulkhead) .withCircuitBreaker(circuitBreaker) .withFallback(List.of(Exception.class), e - getFallback(e)) .decorate(); return supplier.get(); } private String getFallback(Exception e) { log.error(All resilience patterns failed, e); return {\error\: \Service unavailable\}; } }八、降级策略8.1 降级方法设计Component Slf4j public class FallbackProvider { public String userServiceFallback(Exception e) { log.warn(User service fallback triggered, e); return {\users\: [], \fallback\: true}; } public String productServiceFallback(Exception e) { log.warn(Product service fallback triggered, e); return {\products\: [], \fallback\: true}; } public String orderServiceFallback(Exception e) { log.warn(Order service fallback triggered, e); return {\orders\: [], \fallback\: true, \message\: \Order service temporarily unavailable\}; } public User getDefaultUser() { return User.builder() .id(0L) .username(guest) .email(guestexample.com) .build(); } public ListProduct getDefaultProducts() { return Collections.emptyList(); } }8.2 多级降级Service public class MultiLevelFallbackService { public String getData() { try { // 第一级尝试正常调用 return callService(); } catch (CircuitBreakerOpenException e) { log.warn(Circuit breaker open, trying cache, e); return getFromCache(); } catch (Exception e) { log.error(Service call failed, using fallback, e); return getDefaultValue(); } } private String callService() { // 调用远程服务 return restTemplate.getForObject(http://service/api/data, String.class); } private String getFromCache() { String cached cacheService.get(data); if (cached ! null) { return cached; } return getDefaultValue(); } private String getDefaultValue() { return {\data\: \default\, \source\: \fallback\}; } }九、实战案例9.1 网关限流配置spring: cloud: gateway: routes: - id: user-service uri: http://user-service predicates: - Path/api/users/** filters: - name: RequestRateLimiter args: redis-rate-limiter.replenishRate: 100 redis-rate-limiter.burstCapacity: 200 redis-rate-limiter.requestedTokens: 1 - name: CircuitBreaker args: name: userServiceCircuitBreaker fallbackUri: forward:/fallback/user9.2 Sentinel集成spring: cloud: sentinel: enabled: true eager: true transport: dashboard: localhost:8080 port: 8719 filter: order: -2147483648 url-patterns: /** datasource: ds: file: file: classpath: flow-rule.json >Configuration public class SentinelConfig { PostConstruct public void initRules() { ListFlowRule rules new ArrayList(); FlowRule rule new FlowRule(userService) .setCount(100) .setGrade(RuleConstant.FLOW_GRADE_QPS) .setLimitApp(default) .setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT); rules.add(rule); FlowManager.loadRules(rules); } }十、最佳实践10.1 限流配置建议场景推荐策略参数建议API网关令牌桶/滑动窗口根据服务容量设置秒杀活动令牌桶预热预热时间根据流量预估文件上传漏桶限制上传速率数据库操作信号量根据连接池设置10.2 熔断配置建议参数建议值说明滑动窗口大小10-100统计样本量失败率阈值50%超过此值熔断熔断持续时间30-60s半开探测间隔最小调用数5-10统计门槛10.3 监控指标Aspect Component public class ResilienceMetrics { Autowired private MeterRegistry meterRegistry; Around(execution(* com.example.service..*.*(..))) public Object recordMetrics(ProceedingJoinPoint joinPoint) throws Throwable { Timer.Sample sample Timer.start(meterRegistry); try { Object result joinPoint.proceed(); meterRegistry.counter(resilience.success, service, joinPoint.getSignature().getDeclaringTypeName(), method, joinPoint.getSignature().getName()) .increment(); return result; } catch (Exception e) { meterRegistry.counter(resilience.failure, service, joinPoint.getSignature().getDeclaringTypeName(), method, joinPoint.getSignature().getName(), exception, e.getClass().getSimpleName()) .increment(); throw e; } finally { sample.stop(Timer.builder(resilience.latency) .tag(service, joinPoint.getSignature().getDeclaringTypeName()) .register(meterRegistry)); } } }十一、总结限流和熔断降级是构建弹性微服务系统的关键技术通过本文的介绍你可以限流算法令牌桶、滑动窗口、漏桶算法实现Resilience4j限流RateLimiter的使用和配置熔断器模式CircuitBreaker的原理和实现舱壁隔离Bulkhead模式保护系统资源重试机制Retry模式的配置和使用降级策略多级降级和降级方法设计组合模式限流熔断重试组合使用实战案例网关限流和Sentinel集成合理的限流和熔断降级策略可以有效保护系统免受过载和故障影响提高系统的可用性和稳定性。