Kafka - 消费者承诺的偏移量在应用程序停止时关闭 + 从过去提交偏移量
Kafka - commiting offset before consumer is shut down on app stop + commiting offset from the past
我已经设置了 spring-kafka 消费者。它正在使用主题中的 avro 数据,映射值并写入 CSV 文件。一旦文件长度为 25000 条记录或每 5 分钟 - 以先到者为准,我手动提交偏移量。
由于patching/releases.
,我们重启应用程序时出现问题
我有这样的方法:
@PreDestroy
public void destroy() {
LOGGER.info("shutting down");
writeCsv(true);
acknowledgment.acknowledge(); // this normally commits the current offset
LOGGER.info("package commited: " + acknowledgment.toString());
LOGGER.info("shutting down completed");
}
所以我在那里添加了一些记录器,这就是日志的样子:
08:05:47 INFO KafkaMessageListenerContainer$ListenerConsumer - myManualConsumer: Consumer stopped
08:05:47 INFO CsvWriter - shutting down
08:05:47 INFO CsvWriter - created file: FEEDBACK1630476236079.csv
08:05:47 INFO CsvWriter - package commited: Acknowledgment for ConsumerRecord(topic = feedback-topic, partition = 1, leaderEpoch = 17, offset = 544, CreateTime = 1630415419703, serialized key size = -1, serialized value size = 156)
08:05:47 INFO CsvWriter - shutting down completed
由于消费者在调用 acknowledge() 方法之前停止工作,因此永远不会提交偏移量。日志中没有错误,我们在应用程序再次启动后得到重复。
- 有没有办法在消费者关闭之前调用方法?
还有一个问题:
我想像这样对消费者设置过滤器:
if(event.getValue().equals("GOOD") {
addCsvRecord(event)
} else {
acknowledgement.acknowledge() //to let it read next event
假设我得到偏移量 100 - 好事件来了,我将它添加到 csv 文件,文件等待更多记录并且偏移量尚未提交。
接下来出现 BAD 事件,它被过滤掉并立即提交偏移量 101。
然后文件达到超时并即将关闭并调用
acknowlegdment.acknowledge()
偏移量 100。
- 那里可能会发生什么?可以提交之前的offset吗?
@PreDestroy
在上下文生命周期中为时已晚 - 那时容器已经停止。
实施 SmartLifecycle
并在 stop()
中做出确认。
对于你的第二个问题,不要提交错误的偏移量;您仍将获得下一条记录。
Kafka维护了两个指针position
和committed
。它们是相关的,但对于 运行 应用程序是独立的。
我已经设置了 spring-kafka 消费者。它正在使用主题中的 avro 数据,映射值并写入 CSV 文件。一旦文件长度为 25000 条记录或每 5 分钟 - 以先到者为准,我手动提交偏移量。
由于patching/releases.
,我们重启应用程序时出现问题我有这样的方法:
@PreDestroy
public void destroy() {
LOGGER.info("shutting down");
writeCsv(true);
acknowledgment.acknowledge(); // this normally commits the current offset
LOGGER.info("package commited: " + acknowledgment.toString());
LOGGER.info("shutting down completed");
}
所以我在那里添加了一些记录器,这就是日志的样子:
08:05:47 INFO KafkaMessageListenerContainer$ListenerConsumer - myManualConsumer: Consumer stopped
08:05:47 INFO CsvWriter - shutting down
08:05:47 INFO CsvWriter - created file: FEEDBACK1630476236079.csv
08:05:47 INFO CsvWriter - package commited: Acknowledgment for ConsumerRecord(topic = feedback-topic, partition = 1, leaderEpoch = 17, offset = 544, CreateTime = 1630415419703, serialized key size = -1, serialized value size = 156)
08:05:47 INFO CsvWriter - shutting down completed
由于消费者在调用 acknowledge() 方法之前停止工作,因此永远不会提交偏移量。日志中没有错误,我们在应用程序再次启动后得到重复。
- 有没有办法在消费者关闭之前调用方法?
还有一个问题:
我想像这样对消费者设置过滤器:
if(event.getValue().equals("GOOD") {
addCsvRecord(event)
} else {
acknowledgement.acknowledge() //to let it read next event
假设我得到偏移量 100 - 好事件来了,我将它添加到 csv 文件,文件等待更多记录并且偏移量尚未提交。 接下来出现 BAD 事件,它被过滤掉并立即提交偏移量 101。 然后文件达到超时并即将关闭并调用
acknowlegdment.acknowledge()
偏移量 100。
- 那里可能会发生什么?可以提交之前的offset吗?
@PreDestroy
在上下文生命周期中为时已晚 - 那时容器已经停止。
实施 SmartLifecycle
并在 stop()
中做出确认。
对于你的第二个问题,不要提交错误的偏移量;您仍将获得下一条记录。
Kafka维护了两个指针position
和committed
。它们是相关的,但对于 运行 应用程序是独立的。