Kafka Java API 偏移操作说明
Kafka Java API offset operations clarification
我正在尝试使用低级消费者 Java API 手动管理偏移量,最新的 kafka_2.10-0.8.2.1。为了验证我 commit/read 来自 Kafka 的偏移量是否正确,我使用 kafka.tools.ConsumerOffsetChecker 工具。
这是我的 topic/consumer 组的输出示例:
./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group elastic_search_group --zookeeper localhost:2181 --topic my_log_topic
Group Topic Pid Offset logSize Lag Owner
elastic_search_group my_log_topic 0 5 29 24 none
这是我对结果的解释:
Offset = 5 --> 这是我的 'elastic_search_group' consumer
的当前偏移量
logSize = 29 --> 这是最新的偏移量 - 下一条消息的偏移量 topic/partition
滞后 = 24 --> 29-5 - 我的 'elastic_search_group' 消费者
尚未处理多少消息
Pid - 分区 ID
Q1:这样正确吗?
现在,我想从我的 Java 消费者那里获得相同的信息。在这里,我发现我必须使用两个不同的 APIs:
kafka.javaapi.OffsetRequest 获取最早和最新的偏移量,但 kafka.javaapi.OffsetFetchRequest 获取当前偏移量。
为了获得最早(或最晚)的偏移量,我这样做:
TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
// OR for Latest: requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
long[] offsets = response.offsets(topic, partition);
long myEarliestOffset = offsets[0];
// OR for Latest: long myLatestOffset = offsets[0];
为了获得当前偏移量,我必须使用完全不同的 API:
short versionID = 0;
int correlationId = 0;
List<TopicAndPartition> topicPartitionList = new ArrayList<TopicAndPartition>();
TopicAndPartition myTopicAndPartition = new TopicAndPartition(myTopic, myPartition);
topicPartitionList.add(myTopicAndPartition);
OffsetFetchRequest offsetFetchReq = new OffsetFetchRequest(
kafkaGroupId, topicPartitionList, versionID, correlationId, kafkaGroupId);
OffsetFetchResponse offsetFetchResponse = simpleConsumer.fetchOffsets(offsetFetchReq);
long currentOffset = offsetFetchResponse.offsets().get(myTopicAndPartition).offset();
Q2:是否正确?为什么有两个不同的 API 来获得非常相似的信息?
Q3:我在这里使用哪个versionId和correlationId有关系吗?我虽然 versionId 对于 0.8.2.1 之前的 kafka 应该是 0,对于 0.8.2.1 和更高版本应该是 1 - 但它似乎也适用于 0 对于 0.8.2.1 - 见下文 ?
所以,对于上面主题的示例状态,以及 ConsumerOffsetChecker 的上面输出,这是我从我的
Java代码:
currentOffset=5; earliestOffset=29; latestOffset=29
'currentOffset' 似乎可以,'latestOffset' 也正确,但是 'earliestOffset'?我希望它至少是“5”?
Q4:earliestOffset怎么会比currentOffset高呢?
我唯一的怀疑是,由于保留政策,可能来自该主题的消息被清除了……。还有其他可能发生的情况吗?
我正在寻找寻找分区滞后的方法。这涉及您已采取的相同步骤。到目前为止,根据我所学的,我可以给你答案。
- logSize 直接指向该特定分区中累积了多少消息。或者,它指定该分区中消息的最大偏移量。 Offset 是最后一次成功消费消息的偏移量。所以滞后只是日志大小和偏移量之间的差异。
- 是的,它是正确的。到目前为止,这是仅有的两种查找当前偏移量和最早或最新偏移量的方法
- 不知道为什么要指定versionId。您可以使用
kafka.api.OffsetRequest.CurrentVersion()
来获取 versionId。因此可以避免硬编码。您可以放心地将 correlationId 假定为 0。
这很奇怪。当我使用 EarliestTime() 时,即使我当前的偏移量已经取得了很大进展,我也会得到最早的偏移量 0。这意味着它是分区的开始。因此,当某些消息在未来某个时间过期时,这个最早的偏移量将是某个非零数。现在,如果消息因保留策略滞后而被清除,应该已经更改。我不确定这种行为。一种确定的方法是,运行 消费者在注意到此类阅读并检查其日志后。它应该显示像这样的行。
2015-06-09 18:49:15 :: DEBUG :: PartitionTopicInfo:52 :: 重置 requests:2 的消耗偏移量:获取的偏移量 = 405952:消耗的偏移量 = 335372 到 335372
2015-06-09 18:49:15 :: DEBUG :: PartitionTopicInfo:52 :: 重置 requests:2 的消耗偏移量:获取的偏移量 = 405952:消耗的偏移量 = 335373 到 335373
请注意,在上面的日志行中,获取的偏移量保持不变,消耗的偏移量在增加。最后它将以
结尾
2015-06-09 18:49:16 :: DEBUG :: PartitionTopicInfo:52 :: 重置 requests:2 的消耗偏移量:获取的偏移量 = 405952:消耗的偏移量 = 405952 到 405952
那么这意味着由于日志保留策略,从 335372 到 405952 的偏移量已过期
我正在尝试使用低级消费者 Java API 手动管理偏移量,最新的 kafka_2.10-0.8.2.1。为了验证我 commit/read 来自 Kafka 的偏移量是否正确,我使用 kafka.tools.ConsumerOffsetChecker 工具。
这是我的 topic/consumer 组的输出示例:
./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group elastic_search_group --zookeeper localhost:2181 --topic my_log_topic
Group Topic Pid Offset logSize Lag Owner
elastic_search_group my_log_topic 0 5 29 24 none
这是我对结果的解释:
Offset = 5 --> 这是我的 'elastic_search_group' consumer
的当前偏移量logSize = 29 --> 这是最新的偏移量 - 下一条消息的偏移量 topic/partition
滞后 = 24 --> 29-5 - 我的 'elastic_search_group' 消费者
尚未处理多少消息Pid - 分区 ID
Q1:这样正确吗?
现在,我想从我的 Java 消费者那里获得相同的信息。在这里,我发现我必须使用两个不同的 APIs:
kafka.javaapi.OffsetRequest 获取最早和最新的偏移量,但 kafka.javaapi.OffsetFetchRequest 获取当前偏移量。
为了获得最早(或最晚)的偏移量,我这样做:
TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
// OR for Latest: requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
long[] offsets = response.offsets(topic, partition);
long myEarliestOffset = offsets[0];
// OR for Latest: long myLatestOffset = offsets[0];
为了获得当前偏移量,我必须使用完全不同的 API:
short versionID = 0;
int correlationId = 0;
List<TopicAndPartition> topicPartitionList = new ArrayList<TopicAndPartition>();
TopicAndPartition myTopicAndPartition = new TopicAndPartition(myTopic, myPartition);
topicPartitionList.add(myTopicAndPartition);
OffsetFetchRequest offsetFetchReq = new OffsetFetchRequest(
kafkaGroupId, topicPartitionList, versionID, correlationId, kafkaGroupId);
OffsetFetchResponse offsetFetchResponse = simpleConsumer.fetchOffsets(offsetFetchReq);
long currentOffset = offsetFetchResponse.offsets().get(myTopicAndPartition).offset();
Q2:是否正确?为什么有两个不同的 API 来获得非常相似的信息?
Q3:我在这里使用哪个versionId和correlationId有关系吗?我虽然 versionId 对于 0.8.2.1 之前的 kafka 应该是 0,对于 0.8.2.1 和更高版本应该是 1 - 但它似乎也适用于 0 对于 0.8.2.1 - 见下文 ?
所以,对于上面主题的示例状态,以及 ConsumerOffsetChecker 的上面输出,这是我从我的 Java代码:
currentOffset=5; earliestOffset=29; latestOffset=29
'currentOffset' 似乎可以,'latestOffset' 也正确,但是 'earliestOffset'?我希望它至少是“5”?
Q4:earliestOffset怎么会比currentOffset高呢? 我唯一的怀疑是,由于保留政策,可能来自该主题的消息被清除了……。还有其他可能发生的情况吗?
我正在寻找寻找分区滞后的方法。这涉及您已采取的相同步骤。到目前为止,根据我所学的,我可以给你答案。
- logSize 直接指向该特定分区中累积了多少消息。或者,它指定该分区中消息的最大偏移量。 Offset 是最后一次成功消费消息的偏移量。所以滞后只是日志大小和偏移量之间的差异。
- 是的,它是正确的。到目前为止,这是仅有的两种查找当前偏移量和最早或最新偏移量的方法
- 不知道为什么要指定versionId。您可以使用
kafka.api.OffsetRequest.CurrentVersion()
来获取 versionId。因此可以避免硬编码。您可以放心地将 correlationId 假定为 0。 这很奇怪。当我使用 EarliestTime() 时,即使我当前的偏移量已经取得了很大进展,我也会得到最早的偏移量 0。这意味着它是分区的开始。因此,当某些消息在未来某个时间过期时,这个最早的偏移量将是某个非零数。现在,如果消息因保留策略滞后而被清除,应该已经更改。我不确定这种行为。一种确定的方法是,运行 消费者在注意到此类阅读并检查其日志后。它应该显示像这样的行。
2015-06-09 18:49:15 :: DEBUG :: PartitionTopicInfo:52 :: 重置 requests:2 的消耗偏移量:获取的偏移量 = 405952:消耗的偏移量 = 335372 到 335372 2015-06-09 18:49:15 :: DEBUG :: PartitionTopicInfo:52 :: 重置 requests:2 的消耗偏移量:获取的偏移量 = 405952:消耗的偏移量 = 335373 到 335373
请注意,在上面的日志行中,获取的偏移量保持不变,消耗的偏移量在增加。最后它将以
结尾2015-06-09 18:49:16 :: DEBUG :: PartitionTopicInfo:52 :: 重置 requests:2 的消耗偏移量:获取的偏移量 = 405952:消耗的偏移量 = 405952 到 405952
那么这意味着由于日志保留策略,从 335372 到 405952 的偏移量已过期