kafka批处理监听器的正确错误处理

Proper error handling for kafka batch listener

我正在研究错误处理实现并有一个问题。
让我解释一下问题:
我收到一批消息,我在 for 循环中对每个消息进行数据库查找,然后我需要收集列表中所有查找的对象,并使用此对象列表调用批量插入存储过程和批量更新存储过程。

现在假设在查找过程中发生了一些异常。我想重试此消息。对于这种情况,我尝试使用 DefaultErrorHandler。但是有一个问题,根据文档,当抛出带有元素索引的 BatchListenerFailedException 时,它会在索引之前提交记录的偏移量。但正如我所说,我需要在查找后执行批量插入和更新,所以我不想在索引之前提交偏移量,这些记录还没有 inserted/updated 到数据库。

这是否意味着我唯一的选择是使用每次重试整个批次的 RetryingBatchErrorHandler?我能否以某种方式继续处理不会产生错误的消息?

此外,如果 RetryingBatchErrorHandler 是唯一的选择,我如何确定在退避期较长(指数退避)的情况下,kafka 不会杀死我的消费者并且不会启动重新平衡?

我当前的实现:

RetryingBatchErrorHandler retryingBatchErrorHandler =
                new RetryingBatchErrorHandler(backoff,
                        (consumerRecord, e) ->
                                log.error("Backoff attempts exhausted for the record with offset={}, partition={}, value={}, offset committed.",
                consumerRecord.offset(), consumerRecord.partition(), consumerRecord.value()));

factory.setBatchErrorHandler(retryingBatchErrorHandler);

更新:查看 Artem 回答中的评论。
这就是查找步骤可以包装到 retryTemplate

中的方式
LookedUpRequest lookedUpRequest = retryTemplate.execute(ctx -> {
   //Lookup step
   return lookup.process(request);
});

如果失败,它将进一步抛出批处理错误处理程序的异常,其中RetryingBatchErrorHandler根据其策略重试批处理

查看其 JavaDocs:

/**
 * A batch error handler that invokes the listener according to the supplied
 * {@link BackOff}. The consumer is paused/polled/resumed before each retry in order to
 * avoid a rebalance. If/when retries are exhausted, the provided
 * {@link ConsumerRecordRecoverer} is invoked for each record in the batch. If the
 * recoverer throws an exception, or the thread is interrupted while sleeping, seeks are
 * performed so that the batch will be redelivered on the next poll.
 *
 * @author Gary Russell
 * @since 2.3.7
 *
 */
public class RetryingBatchErrorHandler extends KafkaExceptionLogLevelAware
        implements ListenerInvokingBatchErrorHandler {

所以,没有重新平衡,因为消费者在两者之间暂停。

您可能需要考虑为您的查找部分缓存一些东西,这样您就不会用失败前已经请求的那些记录给数据库带来压力。

您也可以考虑自己围绕该查找重试。参见 RetryTemplate。但在这种情况下,您需要确保整个操作的时间不够长(请参阅 max.poll.interval.ms)以使消费者离开其组。