Kafka Java API 偏移操作说明

Kafka Java API offset operations clarification

我正在尝试使用低级消费者 Java API 手动管理偏移量,最新的 kafka_2.10-0.8.2.1。为了验证我 commit/read 来自 Kafka 的偏移量是否正确,我使用 kafka.tools.ConsumerOffsetChecker 工具。

这是我的 topic/consumer 组的输出示例:

./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group   elastic_search_group --zookeeper localhost:2181 --topic my_log_topic
Group           Topic                          Pid Offset          logSize         Lag             Owner
elastic_search_group my_log_topic              0   5               29              24              none

这是我对结果的解释:

Offset = 5 --> 这是我的 'elastic_search_group' consumer

的当前偏移量

logSize = 29 --> 这是最新的偏移量 - 下一条消息的偏移量 topic/partition

滞后 = 24 --> 29-5 - 我的 'elastic_search_group' 消费者

尚未处理多少消息

Pid - 分区 ID

Q1:这样正确吗?

现在,我想从我的 Java 消费者那里获得相同的信息。在这里,我发现我必须使用两个不同的 APIs:

kafka.javaapi.OffsetRequest 获取最早和最新的偏移量,但 kafka.javaapi.OffsetFetchRequest 获取当前偏移量。

为了获得最早(或最晚)的偏移量,我这样做:

TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
// OR for Latest: requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
long[] offsets = response.offsets(topic, partition);
long myEarliestOffset = offsets[0];
// OR for Latest: long myLatestOffset = offsets[0];

为了获得当前偏移量,我必须使用完全不同的 API:

short versionID = 0;
int correlationId = 0;
List<TopicAndPartition> topicPartitionList = new ArrayList<TopicAndPartition>(); 
TopicAndPartition myTopicAndPartition = new TopicAndPartition(myTopic, myPartition);
topicPartitionList.add(myTopicAndPartition); 
OffsetFetchRequest offsetFetchReq = new OffsetFetchRequest(
kafkaGroupId, topicPartitionList, versionID, correlationId, kafkaGroupId);
OffsetFetchResponse offsetFetchResponse = simpleConsumer.fetchOffsets(offsetFetchReq);
long currentOffset = offsetFetchResponse.offsets().get(myTopicAndPartition).offset();

Q2:是否正确?为什么有两个不同的 API 来获得非常相似的信息?

Q3:我在这里使用哪个versionId和correlationId有关系吗?我虽然 versionId 对于 0.8.2.1 之前的 kafka 应该是 0,对于 0.8.2.1 和更高版本应该是 1 - 但它似乎也适用于 0 对于 0.8.2.1 - 见下文 ?

所以,对于上面主题的示例状态,以及 ConsumerOffsetChecker 的上面输出,这是我从我的 Java代码:

currentOffset=5; earliestOffset=29; latestOffset=29

'currentOffset' 似乎可以,'latestOffset' 也正确,但是 'earliestOffset'?我希望它至少是“5”?

Q4:earliestOffset怎么会比currentOffset高呢? 我唯一的怀疑是,由于保留政策,可能来自该主题的消息被清除了……。还有其他可能发生的情况吗?

我正在寻找寻找分区滞后的方法。这涉及您已采取的相同步骤。到目前为止,根据我所学的,我可以给你答案。

  1. logSize 直接指向该特定分区中累积了多少消息。或者,它指定该分区中消息的最大偏移量。 Offset 是最后一次成功消费消息的偏移量。所以滞后只是日志大小和偏移量之间的差异。
  2. 是的,它是正确的。到目前为止,这是仅有的两种查找当前偏移量和最早或最新偏移量的方法
  3. 不知道为什么要指定versionId。您可以使用 kafka.api.OffsetRequest.CurrentVersion() 来获取 versionId。因此可以避免硬编码。您可以放心地将 correlationId 假定为 0。
  4. 这很奇怪。当我使用 EarliestTime() 时,即使我当前的偏移量已经取得了很大进展,我也会得到最早的偏移量 0。这意味着它是分区的开始。因此,当某些消息在未来某个时间过期时,这个最早的偏移量将是某个非零数。现在,如果消息因保留策略滞后而被清除,应该已经更改。我不确定这种行为。一种确定的方法是,运行 消费者在注意到此类阅读并检查其日志后。它应该显示像这样的行。

    2015-06-09 18:49:15 :: DEBUG :: PartitionTopicInfo:52 :: 重置 requests:2 的消耗偏移量:获取的偏移量 = 405952:消耗的偏移量 = 335372 到 335372 2015-06-09 18:49:15 :: DEBUG :: PartitionTopicInfo:52 :: 重置 requests:2 的消耗偏移量:获取的偏移量 = 405952:消耗的偏移量 = 335373 到 335373

请注意,在上面的日志行中,获取的偏移量保持不变,消耗的偏移量在增加。最后它将以

结尾

2015-06-09 18:49:16 :: DEBUG :: PartitionTopicInfo:52 :: 重置 requests:2 的消耗偏移量:获取的偏移量 = 405952:消耗的偏移量 = 405952 到 405952

那么这意味着由于日志保留策略,从 335372 到 405952 的偏移量已过期