带有断路器的 Kafka 消费者,使用 Resilience4j 重试模式
Kafka Consumer with Circuit Breaker, Retry Patterns using Resilience4j
我需要一些帮助来了解如何使用 Spring boot、Kafka、Resilence4J 来实现来自我的 Kafka 消费者的微服务调用。假设如果微服务关闭,那么我需要使用断路器模式通知我的 Kafka 消费者停止获取 messages/events,直到微服务启动并且 运行.
如果您正在使用 Spring Kafka,您可以使用 ConcurrentMessageListenerContainer
class 的 pause
和 resume
方法。
您可以将 EventListener 附加到 CircuitBreaker,它会侦听状态转换并暂停或恢复事件处理。将 CircuitBreakerRegistry 注入您的 bean:
circuitBreakerRegistry.circuitBreaker("yourCBName").getEventPublisher().onStateTransition(
event -> {
switch (event.getStateTransition()) {
case CLOSED_TO_OPEN:
container.pause();
case OPEN_TO_HALF_OPEN:
container.resume();
case HALF_OPEN_TO_CLOSED:
container.resume();
case HALF_OPEN_TO_OPEN:
container.pause();
case CLOSED_TO_FORCED_OPEN:
container.pause();
case FORCED_OPEN_TO_CLOSED:
container.resume();
case FORCED_OPEN_TO_HALF_OPEN:
container.resume();
default:
}
}
);
使用 Spring Kafka,您可以根据 CircuitBreaker 状态转换使用 pause
和 resume
方法。我为此找到的最佳方法是使用 @Configuration 注释将其定义为“主管”。还使用了Resilience4j。
@Configuration
public class CircuitBreakerConsumerConfiguration {
public CircuitBreakerConsumerConfiguration(CircuitBreakerRegistry circuitBreakerRegistry, KafkaManager kafkaManager) {
circuitBreakerRegistry.circuitBreaker("yourCBName").getEventPublisher().onStateTransition(event -> {
switch (event.getStateTransition()) {
case CLOSED_TO_OPEN:
case CLOSED_TO_FORCED_OPEN:
case HALF_OPEN_TO_OPEN:
kafkaManager.pause();
break;
case OPEN_TO_HALF_OPEN:
case HALF_OPEN_TO_CLOSED:
case FORCED_OPEN_TO_CLOSED:
case FORCED_OPEN_TO_HALF_OPEN:
kafkaManager.resume();
break;
default:
throw new IllegalStateException("Unknown transition state: " + event.getStateTransition());
}
});
}
}
这是我结合使用 @Component
注释的 KafkaManager 所使用的。
@Component
public class KafkaManager {
private final KafkaListenerEndpointRegistry registry;
public KafkaManager(KafkaListenerEndpointRegistry registry) {
this.registry = registry;
}
public void pause() {
registry.getListenerContainers().forEach(MessageListenerContainer::pause);
}
public void resume() {
registry.getListenerContainers().forEach(MessageListenerContainer::resume);
}
}
另外我的消费者服务是这样的:
@KafkaListener(topics = "#{'${topic.name}'}", concurrency = "1", id = "CBListener")
public void receive(final ConsumerRecord<String, ReplayData> replayData, Acknowledgment acknowledgment) throws
Exception {
try {
httpClientServiceCB.receiveHandleCircuitBreaker(replayData);
acknowledgement.acknowledge();
} catch (Exception e) {
acknowledgment.nack(1000);
}
}
和@CircuitBreaker
注解:
@CircuitBreaker(name = "yourCBName")
public void receiveHandleCircuitBreaker(ConsumerRecord<String, ReplayData> replayData) throws
Exception {
try {
String response = restTemplate.getForObject("http://localhost:8081/item", String.class);
} catch (Exception e ) {
// throwing the exception is needed to trigger the Circuit Breaker state change
throw new Exception();
}
}
另外补充如下application.properties
resilience4j.circuitbreaker.instances.yourCBName.failure-rate-threshold=80
resilience4j.circuitbreaker.instances.yourCBName.sliding-window-type=COUNT_BASED
resilience4j.circuitbreaker.instances.yourCBName.sliding-window-size=5
resilience4j.circuitbreaker.instances.yourCBName.wait-duration-in-open-state=10000
resilience4j.circuitbreaker.instances.yourCBName.automatic-transition-from-open-to-half-open-enabled=true
spring.kafka.consumer.enable.auto.commit = false
spring.kafka.listener.ack-mode = MANUAL_IMMEDIATE
我需要一些帮助来了解如何使用 Spring boot、Kafka、Resilence4J 来实现来自我的 Kafka 消费者的微服务调用。假设如果微服务关闭,那么我需要使用断路器模式通知我的 Kafka 消费者停止获取 messages/events,直到微服务启动并且 运行.
如果您正在使用 Spring Kafka,您可以使用 ConcurrentMessageListenerContainer
class 的 pause
和 resume
方法。
您可以将 EventListener 附加到 CircuitBreaker,它会侦听状态转换并暂停或恢复事件处理。将 CircuitBreakerRegistry 注入您的 bean:
circuitBreakerRegistry.circuitBreaker("yourCBName").getEventPublisher().onStateTransition(
event -> {
switch (event.getStateTransition()) {
case CLOSED_TO_OPEN:
container.pause();
case OPEN_TO_HALF_OPEN:
container.resume();
case HALF_OPEN_TO_CLOSED:
container.resume();
case HALF_OPEN_TO_OPEN:
container.pause();
case CLOSED_TO_FORCED_OPEN:
container.pause();
case FORCED_OPEN_TO_CLOSED:
container.resume();
case FORCED_OPEN_TO_HALF_OPEN:
container.resume();
default:
}
}
);
使用 Spring Kafka,您可以根据 CircuitBreaker 状态转换使用 pause
和 resume
方法。我为此找到的最佳方法是使用 @Configuration 注释将其定义为“主管”。还使用了Resilience4j。
@Configuration
public class CircuitBreakerConsumerConfiguration {
public CircuitBreakerConsumerConfiguration(CircuitBreakerRegistry circuitBreakerRegistry, KafkaManager kafkaManager) {
circuitBreakerRegistry.circuitBreaker("yourCBName").getEventPublisher().onStateTransition(event -> {
switch (event.getStateTransition()) {
case CLOSED_TO_OPEN:
case CLOSED_TO_FORCED_OPEN:
case HALF_OPEN_TO_OPEN:
kafkaManager.pause();
break;
case OPEN_TO_HALF_OPEN:
case HALF_OPEN_TO_CLOSED:
case FORCED_OPEN_TO_CLOSED:
case FORCED_OPEN_TO_HALF_OPEN:
kafkaManager.resume();
break;
default:
throw new IllegalStateException("Unknown transition state: " + event.getStateTransition());
}
});
}
}
这是我结合使用 @Component
注释的 KafkaManager 所使用的。
@Component
public class KafkaManager {
private final KafkaListenerEndpointRegistry registry;
public KafkaManager(KafkaListenerEndpointRegistry registry) {
this.registry = registry;
}
public void pause() {
registry.getListenerContainers().forEach(MessageListenerContainer::pause);
}
public void resume() {
registry.getListenerContainers().forEach(MessageListenerContainer::resume);
}
}
另外我的消费者服务是这样的:
@KafkaListener(topics = "#{'${topic.name}'}", concurrency = "1", id = "CBListener")
public void receive(final ConsumerRecord<String, ReplayData> replayData, Acknowledgment acknowledgment) throws
Exception {
try {
httpClientServiceCB.receiveHandleCircuitBreaker(replayData);
acknowledgement.acknowledge();
} catch (Exception e) {
acknowledgment.nack(1000);
}
}
和@CircuitBreaker
注解:
@CircuitBreaker(name = "yourCBName")
public void receiveHandleCircuitBreaker(ConsumerRecord<String, ReplayData> replayData) throws
Exception {
try {
String response = restTemplate.getForObject("http://localhost:8081/item", String.class);
} catch (Exception e ) {
// throwing the exception is needed to trigger the Circuit Breaker state change
throw new Exception();
}
}
另外补充如下application.properties
resilience4j.circuitbreaker.instances.yourCBName.failure-rate-threshold=80
resilience4j.circuitbreaker.instances.yourCBName.sliding-window-type=COUNT_BASED
resilience4j.circuitbreaker.instances.yourCBName.sliding-window-size=5
resilience4j.circuitbreaker.instances.yourCBName.wait-duration-in-open-state=10000
resilience4j.circuitbreaker.instances.yourCBName.automatic-transition-from-open-to-half-open-enabled=true
spring.kafka.consumer.enable.auto.commit = false
spring.kafka.listener.ack-mode = MANUAL_IMMEDIATE