Kafka中针对不同消费者的不同重试策略
Different retry strategies for different consumers in Kafka
我们正在处理这样一种情况,我们需要为同一应用程序中的不同消费者使用不同的重试策略。
请参考下图(简要架构图):
main_consumer
使用来自 main_topic
的负载并尝试将其发送到 API。如果 API 处理失败,我们将把这个失败的有效负载写入另一个名为 error_topic
的主题。有一个不同的消费者 (error_consumer)
从 error_topic
消费并通过 3 次重试再次将有效负载发送到 API。如果仍然失败,则 error_consumer
将此有效载荷推送到 DLQ
.
我们面临的问题:
我们需要 main_consumer
不重试失败,error_consumer
失败重试 3 次。我们将 maxAttempts
作为 main_consumer
的 1,将 maxAttempts
作为 error_consumer
的 3。但是使用此配置,main_consumer
重试 3 次,error_consumer
重试一次。它的工作与我们的预期完全相反。
P.S :我们尝试为两个消费者交换 maxAttempts
(这是不合逻辑的),但没有成功。
下面是我们使用的Spring云流应用配置:
我们是 运行 具有以下两个配置文件的应用程序。
应用-main.yml
spring:
cloud:
stream:
kafka:
bindings:
main-consumer-channel:
consumer:
autoCommitOffset: false
bindings:
main-consumer-channel:
destination: main_topic
consumer:
maxAttempts: 1
backOffInitialInterval: 5000
backOffMultiplier: 2
应用程序错误-retry.yml
spring:
cloud:
stream:
kafka:
bindings:
error-consumer-channel:
consumer:
autoCommitOffset: false
bindings:
error-consumer-channel:
destination: error_topic
consumer:
maxAttempts: 3
backOffInitialInterval: 5000
backOffMultiplier: 2
根据 spring 文档 - https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/_configuration_options.html,maxAttempts 配置应在
下
"spring.cloud.stream.bindings.<channelName>.consumer."
在你的配置中,它看起来像在
"spring.cloud.stream.kafka.bindings.<channelName>.consumer."
MaxAttempts 似乎不是 kafka 绑定道具的有效配置 - https://github.com/spring-cloud/spring-cloud-stream-binder-kafka
这对我来说很好...
@SpringBootApplication
@EnableBinding(Inputs.class)
public class So57522645Application {
public static void main(String[] args) {
SpringApplication.run(So57522645Application.class, args);
}
@StreamListener("input1")
public void listen1(String in) {
System.out.println("main: " + in);
throw new RuntimeException("fail");
}
@StreamListener("input2")
public void listen2(String in) {
System.out.println("error: " + in);
throw new RuntimeException("fail");
}
@StreamListener("input3")
public void listen3(String in) {
System.out.println("final: " + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> template.send("main", "test".getBytes());
}
}
interface Inputs {
@Input
MessageChannel input1();
@Input
MessageChannel input2();
@Input
MessageChannel input3();
}
spring:
cloud:
stream:
bindings:
input1:
consumer:
max-attempts: 1
destination: main
group: grp1
input2:
consumer:
max-attempts: 3
destination: error.main.grp1
group: grp2
input3:
destination: error.error.main.grp1.grp2
group: grp3
kafka:
bindings:
input1:
consumer:
enable-dlq: true
input2:
consumer:
enable-dlq: true
和
main: test
error: test
error: test
error: test
final: test
我们正在处理这样一种情况,我们需要为同一应用程序中的不同消费者使用不同的重试策略。
请参考下图(简要架构图):
main_consumer
使用来自 main_topic
的负载并尝试将其发送到 API。如果 API 处理失败,我们将把这个失败的有效负载写入另一个名为 error_topic
的主题。有一个不同的消费者 (error_consumer)
从 error_topic
消费并通过 3 次重试再次将有效负载发送到 API。如果仍然失败,则 error_consumer
将此有效载荷推送到 DLQ
.
我们面临的问题:
我们需要 main_consumer
不重试失败,error_consumer
失败重试 3 次。我们将 maxAttempts
作为 main_consumer
的 1,将 maxAttempts
作为 error_consumer
的 3。但是使用此配置,main_consumer
重试 3 次,error_consumer
重试一次。它的工作与我们的预期完全相反。
P.S :我们尝试为两个消费者交换 maxAttempts
(这是不合逻辑的),但没有成功。
下面是我们使用的Spring云流应用配置:
我们是 运行 具有以下两个配置文件的应用程序。
应用-main.yml
spring:
cloud:
stream:
kafka:
bindings:
main-consumer-channel:
consumer:
autoCommitOffset: false
bindings:
main-consumer-channel:
destination: main_topic
consumer:
maxAttempts: 1
backOffInitialInterval: 5000
backOffMultiplier: 2
应用程序错误-retry.yml
spring:
cloud:
stream:
kafka:
bindings:
error-consumer-channel:
consumer:
autoCommitOffset: false
bindings:
error-consumer-channel:
destination: error_topic
consumer:
maxAttempts: 3
backOffInitialInterval: 5000
backOffMultiplier: 2
根据 spring 文档 - https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/_configuration_options.html,maxAttempts 配置应在
下"spring.cloud.stream.bindings.<channelName>.consumer."
在你的配置中,它看起来像在
"spring.cloud.stream.kafka.bindings.<channelName>.consumer."
MaxAttempts 似乎不是 kafka 绑定道具的有效配置 - https://github.com/spring-cloud/spring-cloud-stream-binder-kafka
这对我来说很好...
@SpringBootApplication
@EnableBinding(Inputs.class)
public class So57522645Application {
public static void main(String[] args) {
SpringApplication.run(So57522645Application.class, args);
}
@StreamListener("input1")
public void listen1(String in) {
System.out.println("main: " + in);
throw new RuntimeException("fail");
}
@StreamListener("input2")
public void listen2(String in) {
System.out.println("error: " + in);
throw new RuntimeException("fail");
}
@StreamListener("input3")
public void listen3(String in) {
System.out.println("final: " + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> template.send("main", "test".getBytes());
}
}
interface Inputs {
@Input
MessageChannel input1();
@Input
MessageChannel input2();
@Input
MessageChannel input3();
}
spring:
cloud:
stream:
bindings:
input1:
consumer:
max-attempts: 1
destination: main
group: grp1
input2:
consumer:
max-attempts: 3
destination: error.main.grp1
group: grp2
input3:
destination: error.error.main.grp1.grp2
group: grp3
kafka:
bindings:
input1:
consumer:
enable-dlq: true
input2:
consumer:
enable-dlq: true
和
main: test
error: test
error: test
error: test
final: test