我可以在不检索所有消息的情况下检索 Kafka 分区的最新可用偏移量吗?
Can I retrieve the latest available offset for a Kafka partition without retrieving all the messages?
查看最新的 (v0.10) Kafka 消费者 documentation:
"The position of the consumer gives the offset of the next record that will be given out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances every time the consumer receives data calls poll(long) and receives messages."
有没有办法在服务器端查询分区可用的最大偏移量,无需检索所有消息?
我尝试实现的逻辑如下:
- 每秒查询主题中待处理消息的数量 (A)
- 如果 A > 阈值,则唤醒将继续检索所有消息并处理它们的处理器
- 否则什么都不做(睡眠 1)
动机是我需要做一些批处理,但我希望处理器只有在有足够的数据时才被唤醒(我不想两次检索所有数据)。
遗憾的是,我看不出 0.10 消费者如何做到这一点。
但是,如果您有任何较低级别的 Kafka 客户端,这是可行的(抱歉,但我不确定 JVM 是否存在,但其他语言有很多)。
因此,如果您有一些时间和灵感来实现这一点,那就是方法 - 每个 FetchResponse
(这是对每个 "give me messages" 请求的响应)包含一个名为 [=11 的字段=],本质上是分区末尾的偏移量(https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse)。这里的技巧是发送一个 FetchRequest
,它将立即 return (例如不会阻塞等待)除了 HighwaterMarkOffset.
为此,您的 FetchRequest
应该:
MaxWaitTime
设置为 0
,这意味着 "return immediately if cannot fetch at least MinBytes bytes".
MinBytes
设置为0
,即"I'm OK if you return me an empty response".
FetchOffset
在这种情况下无关紧要,如果我没记错的话,它甚至可能是无效偏移量,但最好是有效偏移量。
MaxBytes
设置为 0
,即 "give me no more than 0 bytes of data",例如没什么。
这样,此请求将立即 return 没有数据,但仍然将 highwatermark 偏移设置为适当的值。获得 highwatermark 偏移量后,您可以将其与当前偏移量进行比较,并计算出您落后了多少。
希望对您有所帮助。
您可以使用Consumer.seekToEnd()
方法,运行 Consumer.poll(0)
使它生效,但return立即生效,然后Consumer.position()
找到位置所有订阅(或分配)的主题分区。这些将是所有分区的当前最终偏移量。这也将开始从经纪人那里获取这些抵消的一些数据,但是如果您随后返回到不同的位置,任何 returned 数据都将被忽略。
目前的替代方法,如 serejja 所提到的,是使用旧的简单消费者,尽管这个过程要复杂得多,因为您需要手动为每个分区找到领导者。
您可以使用下面 API 中的方法 public OffsetAndMetadata committed(TopicPartition partition)
来获取最后提交的偏移量
查看最新的 (v0.10) Kafka 消费者 documentation:
"The position of the consumer gives the offset of the next record that will be given out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances every time the consumer receives data calls poll(long) and receives messages."
有没有办法在服务器端查询分区可用的最大偏移量,无需检索所有消息?
我尝试实现的逻辑如下:
- 每秒查询主题中待处理消息的数量 (A)
- 如果 A > 阈值,则唤醒将继续检索所有消息并处理它们的处理器
- 否则什么都不做(睡眠 1)
动机是我需要做一些批处理,但我希望处理器只有在有足够的数据时才被唤醒(我不想两次检索所有数据)。
遗憾的是,我看不出 0.10 消费者如何做到这一点。
但是,如果您有任何较低级别的 Kafka 客户端,这是可行的(抱歉,但我不确定 JVM 是否存在,但其他语言有很多)。
因此,如果您有一些时间和灵感来实现这一点,那就是方法 - 每个 FetchResponse
(这是对每个 "give me messages" 请求的响应)包含一个名为 [=11 的字段=],本质上是分区末尾的偏移量(https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse)。这里的技巧是发送一个 FetchRequest
,它将立即 return (例如不会阻塞等待)除了 HighwaterMarkOffset.
为此,您的 FetchRequest
应该:
MaxWaitTime
设置为0
,这意味着 "return immediately if cannot fetch at least MinBytes bytes".MinBytes
设置为0
,即"I'm OK if you return me an empty response".FetchOffset
在这种情况下无关紧要,如果我没记错的话,它甚至可能是无效偏移量,但最好是有效偏移量。MaxBytes
设置为0
,即 "give me no more than 0 bytes of data",例如没什么。
这样,此请求将立即 return 没有数据,但仍然将 highwatermark 偏移设置为适当的值。获得 highwatermark 偏移量后,您可以将其与当前偏移量进行比较,并计算出您落后了多少。
希望对您有所帮助。
您可以使用Consumer.seekToEnd()
方法,运行 Consumer.poll(0)
使它生效,但return立即生效,然后Consumer.position()
找到位置所有订阅(或分配)的主题分区。这些将是所有分区的当前最终偏移量。这也将开始从经纪人那里获取这些抵消的一些数据,但是如果您随后返回到不同的位置,任何 returned 数据都将被忽略。
目前的替代方法,如 serejja 所提到的,是使用旧的简单消费者,尽管这个过程要复杂得多,因为您需要手动为每个分区找到领导者。
您可以使用下面 API 中的方法 public OffsetAndMetadata committed(TopicPartition partition)
来获取最后提交的偏移量