Kafka - 消费者承诺的偏移量在应用程序停止时关闭 + 从过去提交偏移量

Kafka - commiting offset before consumer is shut down on app stop + commiting offset from the past

我已经设置了 spring-kafka 消费者。它正在使用主题中的 avro 数据,映射值并写入 CSV 文件。一旦文件长度为 25000 条记录或每 5 分钟 - 以先到者为准,我手动提交偏移量。

由于patching/releases.

,我们重启应用程序时出现问题

我有这样的方法:

   @PreDestroy
    public void destroy() {
        LOGGER.info("shutting down");
        writeCsv(true); 
        acknowledgment.acknowledge(); // this normally commits the current offset
        LOGGER.info("package commited: " + acknowledgment.toString());
        LOGGER.info("shutting down completed");
    }

所以我在那里添加了一些记录器,这就是日志的样子:

08:05:47  INFO KafkaMessageListenerContainer$ListenerConsumer - myManualConsumer: Consumer stopped
08:05:47  INFO CsvWriter - shutting down
08:05:47  INFO CsvWriter - created file: FEEDBACK1630476236079.csv
08:05:47  INFO CsvWriter - package commited: Acknowledgment for ConsumerRecord(topic = feedback-topic, partition = 1, leaderEpoch = 17, offset = 544, CreateTime = 1630415419703, serialized key size = -1, serialized value size = 156)
08:05:47  INFO CsvWriter - shutting down completed

由于消费者在调用 acknowledge() 方法之前停止工作,因此永远不会提交偏移量。日志中没有错误,我们在应用程序再次启动后得到重复。

  1. 有没有办法在消费者关闭之前调用方法?

还有一个问题:

我想像这样对消费者设置过滤器:

if(event.getValue().equals("GOOD") {
addCsvRecord(event) 
} else {
acknowledgement.acknowledge() //to let it read next event

假设我得到偏移量 100 - 好事件来了,我将它添加到 csv 文件,文件等待更多记录并且偏移量尚未提交。 接下来出现 BAD 事件,它被过滤掉并立即提交偏移量 101。 然后文件达到超时并即将关闭并调用

acknowlegdment.acknowledge() 

偏移量 100。

  1. 那里可能会发生什么?可以提交之前的offset吗?

@PreDestroy 在上下文生命周期中为时已晚 - 那时容器已经停止。

实施 SmartLifecycle 并在 stop() 中做出确认。

对于你的第二个问题,不要提交错误的偏移量;您仍将获得下一条记录。

Kafka维护了两个指针positioncommitted。它们是相关的,但对于 运行 应用程序是独立的。