如何让 RetryAdvice 为 KafkaProducerMessageHandler 工作

How to get RetryAdvice working for KafkaProducerMessageHandler

我正在尝试为 Kafka 处理程序编写 RetryAdvice;然后返回到 MongoDB 保存为 RecoveryCallback.

@Bean(name = "kafkaSuccessChannel")
public ExecutorChannel kafkaSuccessChannel() {
    return MessageChannels.executor("kafkaSuccessChannel", asyncExecutor()).get();
}

@Bean(name = "kafkaErrorChannel")
public ExecutorChannel kafkaErrorChannel() {
    return MessageChannels.executor("kafkaSuccessChannel", asyncExecutor()).get();
}

@Bean
@ServiceActivator(inputChannel = "kafkaPublishChannel")
public KafkaProducerMessageHandler<String, String> kafkaProducerMessageHandler(
        @Autowired ExecutorChannel kafkaSuccessChannel,
        @Autowired RequestHandlerRetryAdvice retryAdvice) {
    KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
    handler.setHeaderMapper(mapper());
    handler.setLoggingEnabled(TRUE);
    handler.setTopicExpression(
            new SpelExpressionParser()
                    .parseExpression(
                            "headers['" + upstreamTypeHeader + "'] + '_' + headers['" + upstreamInstanceHeader + "']"));
    handler.setSendSuccessChannel(kafkaSuccessChannel);
    handler.setAdviceChain(Arrays.asList(retryAdvice));
    // sync true implies that this Kafka handler will wait for results of kafka operations; to be used only for testing purposes.
    handler.setSync(testMode);
    return handler;
}

然后我在同一个class

中配置如下建议
@Bean
public RequestHandlerRetryAdvice retryAdvice(@Autowired RetryTemplate retryTemplate,
                                             @Autowired ExecutorChannel kafkaErrorChannel) {
    RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
    retryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(kafkaErrorChannel));
    retryAdvice.setRetryTemplate(retryTemplate);
    return retryAdvice;
}

@Bean
public RetryTemplate retryTemplate() {
    return new RetryTemplateBuilder().maxAttempts(3).exponentialBackoff(1000, 3.0, 30000)
            .retryOn(MessageHandlingException.class).build();
}

最后我有一个 Mongo 处理程序可以将失败的消息保存到某个集合

@Bean
@ServiceActivator(inputChannel = "kafkaErrorChannel")
public MongoDbStoringMessageHandler kafkaFailureHandler(@Autowired MongoDatabaseFactory mongoDbFactory,
                                                        @Autowired MongoConverter mongoConverter) {
    String collectionExpressionString = "headers['" + upstreamTypeHeader + "'] + '_'+ headers['" + upstreamInstanceHeader + "']+ '_FAIL'";
    return getMongoDbStoringMessageHandler(mongoDbFactory, mongoConverter, collectionExpressionString);
}

我很难弄清楚我是否正确连接了所有这些,因为测试似乎从未起作用,在测试中 class 我 没有 设置任何嵌入式 kafka 或连接到 kafka 以便消息发布失败,期望这会触发重试建议并最终保存到 mongo.

中的死信集合
@Test
void testFailedKafkaPublish() {

    //Dummy message
    Map<String, String> map = new HashMap<>();
    map.put("key", "value");
    // Publish Message
    Message<Map<String, String>> message = MessageBuilder.withPayload(map)
            .setHeader("X-UPSTREAM-TYPE", "alm")
            .setHeader("X-INSTANCE-HEADER", "jira")
            .build();

    kafkaGateway.publish(message);

    //assert successful message is saved in FAIL collection
    assertThat(mongoTemplate.findAll(DBObject.class, "alm_jira_FAIL"))
            .extracting("key")
            .containsOnly("value");
}

我读到我们必须 setSync 到 Kafka 处理程序才能等待 kafka 操作的结果,所以我介绍了

@Value("${digite.swiftalk.kafka.test-mode:false}")
private boolean testMode;

Kafka 配置;在上面的测试中,我通过 @TestPropertySource 注释将其设置为 true:

@TestPropertySource(properties = {
        "spring.main.banner-mode=off",
        "spring.data.mongodb.database=swiftalk_db",
        "spring.data.mongodb.port=29019",
        "spring.data.mongodb.host=localhost",
        "digite.swiftalk.kafka.test-mode=true",

})

我仍然看不到执行重试建议的任何注销或 Mongo 中保存的失败消息。另一个想法是使用 Awaitility 但我不确定我应该在 until() 方法中加入什么条件才能使其工作。

更新

为 Kafka 添加了调试日志,我注意到生产者进入了一个循环,试图在单独的线程中重新连接 Kafka

2021-03-25 10:56:02.640 DEBUG 66997 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Initiating connection to node localhost:9999 (id: -1 rack: null) using address localhost/127.0.0.1
2021-03-25 10:56:02.641 DEBUG 66997 --- [dPoolExecutor-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Kafka producer started
2021-03-25 10:56:02.666 DEBUG 66997 --- [ad | producer-1] o.apache.kafka.common.network.Selector   : [Producer clientId=producer-1] Connection with localhost/127.0.0.1 disconnected

java.net.ConnectException: Connection refused
    at java.base/sun.nio.ch.Net.pollConnect(Native Method) ~[na:na]
    at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:660) ~[na:na]
    at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:875) ~[na:na]
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:219) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:530) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.common.network.Selector.poll(Selector.java:485) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:544) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) ~[kafka-clients-2.6.0.jar:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

当测试到达断言并因此失败时

    //assert successful message is saved in FAIL collection
    assertThat(mongoTemplate.findAll(DBObject.class, "alm_jira_FAIL"))
            .extracting("key")
            .containsOnly("value");

所以看起来重试建议并没有首先接管前两次失败。

更新 2

更新了配置 class 以添加 属性

@Value("${spring.kafka.producer.properties.max.block.ms:1000}")
private Integer productMaxBlockDurationMs;

并在 kafkaTemplate 配置方法中添加了以下行

props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, productMaxBlockDurationMs);

修复了它。

更新 3

正如 Gary 所说,我们可以跳过必须完全添加所有这些道具等;我从 class

中删除了以下方法
@Bean
KafkaTemplate<String, String> kafkaTemplate() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, productMaxBlockDurationMs);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
}

并且像这样从属性中简单地注入 kafka 配置,从而不必编写 Kafka 模板 bean

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.properties.max.block.ms=1000
spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

KafkaProducers 在失败前默认阻塞 60 秒。

尝试减少 max.block.ms 生产者 属性。

https://kafka.apache.org/documentation/#producerconfigs_max.block.ms

编辑

这是一个例子:

@SpringBootApplication
public class So66768745Application {

    public static void main(String[] args) {
        SpringApplication.run(So66768745Application.class, args);
    }

    @Bean
    IntegrationFlow flow(KafkaTemplate<String, String> template, RequestHandlerRetryAdvice retryAdvice) {
        return IntegrationFlows.from(Gate.class)
                .handle(Kafka.outboundChannelAdapter(template)
                            .topic("testTopic"), e -> e
                        .advice(retryAdvice))
                .get();
    }

    @Bean
    RequestHandlerRetryAdvice retryAdvice(QueueChannel channel) {
        RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
        advice.setRecoveryCallback(new ErrorMessageSendingRecoverer(channel));
        return advice;
    }

    @Bean
    QueueChannel channel() {
        return new QueueChannel();
    }

}

interface Gate {

    void sendToKafka(String out);

}
@SpringBootTest
@TestPropertySource(properties = {
        "spring.kafka.bootstrap-servers: localhost:9999",
        "spring.kafka.producer.properties.max.block.ms: 500" })
class So66768745ApplicationTests {

    @Autowired
    Gate gate;

    @Autowired
    QueueChannel channel;

    @Test
    void test() {
        this.gate.sendToKafka("test");
        Message<?> em = this.channel.receive(60_000);
        assertThat(em).isNotNull();
        System.out.println(em);
    }

}
2021-03-23 15:16:13.908 ERROR 2668 --- [           main] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='test' to topic testTopic:

org.apache.kafka.common.errors.TimeoutException: Topic testTopic not present in metadata after 500 ms.

2021-03-23 15:16:14.343  WARN 2668 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9999) could not be established. Broker may not be available.
2021-03-23 15:16:14.343  WARN 2668 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Bootstrap broker localhost:9999 (id: -1 rack: null) disconnected
2021-03-23 15:16:14.415 ERROR 2668 --- [           main] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='test' to topic testTopic:

org.apache.kafka.common.errors.TimeoutException: Topic testTopic not present in metadata after 500 ms.

2021-03-23 15:16:14.921 ERROR 2668 --- [           main] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='test' to topic testTopic:

org.apache.kafka.common.errors.TimeoutException: Topic testTopic not present in metadata after 500 ms.

ErrorMessage [payload=org.springframework.messaging.MessagingException: Failed to handle; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic testTopic not present in metadata after 500 ms., failedMessage=GenericMessage [payload=test, headers={replyChannel=nullChannel, errorChannel=, id=d8ce277a-3d9a-b0bc-c14b-80d63ca13858, timestamp=1616526973218}], headers={id=1a6c29d2-f8d8-adf0-7569-db7610b020ef, timestamp=1616526974921}]