初始消费后的 Kafka 保留

Kafka retention AFTER initial consuming

我有一个只有一个消费者的 Kafka 集群,它每天处理 TB 的数据。一旦消息被消费和提交,它可以立即删除(或保留几分钟后)。

看起来 log.retention.byteslog.retention.hours 配置从消息创建中计数。这对我不好。

如果消费者因 maintenance/incident 而宕机,我想保留数据直到它重新联机。如果我碰巧从 space 中 运行,我想拒绝接受来自生产者的新数据,并且不删除尚未使用的数据(所以 log.retention.bytes 没有帮助我)。

有什么想法吗?

如果您可以确保您的消息具有唯一键,则可以将您的主题配置为使用压缩而不是 timed-retention 策略。然后让您的消费者在处理每条消息后使用消息键但空值将消息发送回同一主题。 Kafka 会压缩这些消息。您可以根据需要调整压缩参数(和日志段文件大小,因为头段从不压缩,如果您希望压缩更快启动,您可能需要将其设置为较小的大小)。

但是,正如我之前提到的,这仅在消息具有唯一键时才有效,否则您不能简单地打开压缩,因为这会导致在您的消费者停机期间丢失具有相同键的先前消息(或者已经落后头部段)。