Java,如何在 apache kafka 中获取主题中的消息数
Java, How to get number of messages in a topic in apache kafka
我正在使用 apache kafka 进行消息传递。我已经在 Java 中实现了生产者和消费者。我们如何获取主题中的消息数?
从消费者的角度来看,想到的唯一方法是实际使用消息并计算它们。
Kafka 代理公开 JMX 计数器以记录自启动以来收到的消息数量,但您无法知道其中有多少已被清除。
在大多数常见情况下,最好将 Kafka 中的消息视为无限流,获取当前保存在磁盘上的消息数量的离散值并不重要。此外,在处理一组代理时,事情会变得更加复杂,这些代理都有一个主题中的消息子集。
我自己没试过this,不过好像有道理。
您也可以使用 kafka.tools.ConsumerOffsetChecker
(source)。
不是java,但可能有用
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list <broker>:<port> \
--topic <topic-name> \
| awk -F ":" '{sum += } END {print sum}'
使用https://prestodb.io/docs/current/connector/kafka-tutorial.html
一个超级 SQL 引擎,由 Facebook 提供,连接多个数据源(Cassandra、Kafka、JMX、Redis ...)。
PrestoDB 运行 作为一个带有可选 worker 的服务器(有一个没有额外 worker 的独立模式),然后你使用一个小的可执行 JAR(称为 presto CLI)来进行查询。
一旦你配置好了Presto服务器,你就可以使用traditionnal SQL:
SELECT count(*) FROM TOPIC_NAME;
我实际上用它来对我的 POC 进行基准测试。您要使用 ConsumerOffsetChecker 的项目。您可以使用如下 bash 脚本 运行 它。
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group testgroup
下面是结果:
红框上可以看到,999是当前主题中的消息数。
更新:ConsumerOffsetChecker 自 0.10.0 起已弃用,您可能想开始使用 ConsumerGroupCommand。
Apache Kafka 命令获取主题所有分区上未处理的消息:
kafka-run-class kafka.tools.ConsumerOffsetChecker
--topic test --zookeeper localhost:2181
--group test_group
打印:
Group Topic Pid Offset logSize Lag Owner
test_group test 0 11051 11053 2 none
test_group test 1 10810 10812 2 none
test_group test 2 11027 11028 1 none
第 6 列是 un-handled 条消息。像这样加起来:
kafka-run-class kafka.tools.ConsumerOffsetChecker
--topic test --zookeeper localhost:2181
--group test_group 2>/dev/null | awk 'NR>1 {sum += }
END {print sum}'
awk 读取行,跳过 header 行并将第 6 列相加,最后打印总和。
版画
5
要获取为该主题存储的所有消息,您可以在每个分区的流的开头和结尾寻找消费者并对结果求和
List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
.map(p -> new TopicPartition(topic, p.partition()))
.collect(Collectors.toList());
consumer.assign(partitions);
consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream()
.collect(Collectors.toMap(Function.identity(), consumer::position));
consumer.seekToBeginning(Collections.emptySet());
System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());
使用 Kafka 2.11-1.0.0 的 Java 客户端,你可以做以下事情:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// after each message, query the number of messages of the topic
Set<TopicPartition> partitions = consumer.assignment();
Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
for(TopicPartition partition : offsets.keySet()) {
System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
}
}
}
输出是这样的:
offset = 10, key = null, value = un
partition test is at 13
offset = 11, key = null, value = deux
partition test is at 13
offset = 12, key = null, value = trois
partition test is at 13
在最新版本的 Kafka Manager 中,有一列标题为 Summed Recent Offsets。
有时,人们感兴趣的是了解每个分区中的消息数量,例如,在测试自定义 partitioner.The 时,已经测试了后续步骤与来自 Confluent 3.2 的 Kafka 0.10.2.1-2 一起使用。给定一个 Kafka 主题,kt
和以下命令行:
$ kafka-run-class kafka.tools.GetOffsetShell \
--broker-list host01:9092,host02:9092,host02:9092 --topic kt
打印示例输出显示三个分区中的消息计数:
kt:2:6138
kt:1:6123
kt:0:6137
行数可能会更多或更少,具体取决于主题的分区数。
运行 以下(假设 kafka-console-consumer.sh
在路径上):
kafka-console-consumer.sh --from-beginning \
--bootstrap-server yourbroker:9092 --property print.key=true \
--property print.value=false --property print.partition \
--topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
Kafka 文档摘录
0.9.0.0 中的弃用
kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) 已被弃用。今后,请使用 kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) 来实现此功能。
我是 运行 Kafka 代理,为服务器和客户端启用了 SSL。下面的命令我使用
kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config
kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x
其中/tmp/ssl_config如下
security.protocol=SSL
ssl.truststore.location=truststore_file_path.jks
ssl.truststore.password=truststore_password
ssl.keystore.location=keystore_file_path.jks
ssl.keystore.password=keystore_password
ssl.key.password=key_password
如果您有权访问服务器的 JMX 接口,则开始和结束偏移量位于:
kafka.log:type=Log,name=LogStartOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
kafka.log:type=Log,name=LogEndOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
(您需要替换 TOPICNAME
& PARTITIONNUMBER
)。
请记住,您需要检查给定分区的每个副本,或者您需要找出哪个代理是 given 分区的领导者(这可以改变时间)。
或者,您可以使用 Kafka Consumer 方法 beginningOffsets
和 endOffsets
。
我发现的最简单的方法是使用 Kafdrop REST API /topic/topicName
并指定键:"Accept"
/ 值:"application/json"
header 以便返回 JSON 响应。
由于不再支持ConsumerOffsetChecker
,您可以使用此命令查看主题中的所有消息:
bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
--group my-group \
--bootstrap-server localhost:9092 \
--describe
其中 LAG
是主题分区中的消息数:
也可以尝试使用kafkacat。这是一个开源项目,可以帮助您从主题和分区中读取消息并将它们打印到标准输出。这是一个示例,它从 sample-kafka-topic
主题读取最后 10 条消息,然后退出:
kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e
我有同样的问题,这就是我在 Kotlin 中来自 KafkaConsumer 的方式:
val messageCount = consumer.listTopics().entries.filter { it.key == topicName }
.map {
it.value.map { topicInfo -> TopicPartition(topicInfo.topic(), topicInfo.partition()) }
}.map { consumer.endOffsets(it).values.sum() - consumer.beginningOffsets(it).values.sum()}
.first()
非常粗糙的代码,因为我刚刚让它工作,但基本上你想从结束偏移量中减去主题的开始偏移量,这将是该主题的当前消息数。
您不能仅仅依赖结束偏移量,因为其他配置(清理策略、保留时间等)可能最终导致从您的主题中删除旧消息。
仅向前偏移 "move",因此开始偏移将向前移动更接近结束偏移(或最终移动到相同的值,如果主题现在不包含消息)。
基本上,结束偏移量表示通过该主题的消息总数,两者之差表示该主题现在包含的消息数。
如果您需要为一个消费者组中的所有消费者(或不同的消费者组)计算结果,另一种选择是使用管理客户端并从 topic/partition 中减去消费者组偏移量偏移量,Kotlin 中的代码示例:
val topicName = "someTopic"
val groupId = "theGroupId"
val admin = Admin.create(kafkaProps.buildAdminProperties()) // Spring KafkaProperties
val parts = admin.describeTopics(listOf(topicName)).values()[topicName]!!.get().partitions()
val topicPartitionOffsets = admin.listOffsets(parts.associate { TopicPartition(topicName, it.partition()) to OffsetSpec.latest() }).all().get()
val consumerGroupOffsets = admin.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata().get()
val highWaterMark = topicPartitionOffsets.map { it.value.offset() }.sum()
val consumerPos = consumerGroupOffsets.map { it.value.offset() }.sum()
val unProcessedMessages = highWaterMark - consumerPos
这里还有一个 LeYAUable 示例代码的工作版本,它仅使用常规 (non-admin) 客户端:
val partitions = consumer.partitionsFor("topicName")
.map { TopicPartition(it.topic(), it.partition()) }
val highWaterMark = consumer.endOffsets(partitions).values.sum()
val consumerPosition = consumer.beginningOffsets(partitions).values.sum()
val msgCount = highWaterMark - consumerPosition
不过,这只会为您提供针对该特定消费者的补偿!通常需要注意的是,当主题被压缩时,这是不精确的。
我正在使用 apache kafka 进行消息传递。我已经在 Java 中实现了生产者和消费者。我们如何获取主题中的消息数?
从消费者的角度来看,想到的唯一方法是实际使用消息并计算它们。
Kafka 代理公开 JMX 计数器以记录自启动以来收到的消息数量,但您无法知道其中有多少已被清除。
在大多数常见情况下,最好将 Kafka 中的消息视为无限流,获取当前保存在磁盘上的消息数量的离散值并不重要。此外,在处理一组代理时,事情会变得更加复杂,这些代理都有一个主题中的消息子集。
我自己没试过this,不过好像有道理。
您也可以使用 kafka.tools.ConsumerOffsetChecker
(source)。
不是java,但可能有用
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list <broker>:<port> \
--topic <topic-name> \
| awk -F ":" '{sum += } END {print sum}'
使用https://prestodb.io/docs/current/connector/kafka-tutorial.html
一个超级 SQL 引擎,由 Facebook 提供,连接多个数据源(Cassandra、Kafka、JMX、Redis ...)。
PrestoDB 运行 作为一个带有可选 worker 的服务器(有一个没有额外 worker 的独立模式),然后你使用一个小的可执行 JAR(称为 presto CLI)来进行查询。
一旦你配置好了Presto服务器,你就可以使用traditionnal SQL:
SELECT count(*) FROM TOPIC_NAME;
我实际上用它来对我的 POC 进行基准测试。您要使用 ConsumerOffsetChecker 的项目。您可以使用如下 bash 脚本 运行 它。
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group testgroup
下面是结果:
更新:ConsumerOffsetChecker 自 0.10.0 起已弃用,您可能想开始使用 ConsumerGroupCommand。
Apache Kafka 命令获取主题所有分区上未处理的消息:
kafka-run-class kafka.tools.ConsumerOffsetChecker
--topic test --zookeeper localhost:2181
--group test_group
打印:
Group Topic Pid Offset logSize Lag Owner
test_group test 0 11051 11053 2 none
test_group test 1 10810 10812 2 none
test_group test 2 11027 11028 1 none
第 6 列是 un-handled 条消息。像这样加起来:
kafka-run-class kafka.tools.ConsumerOffsetChecker
--topic test --zookeeper localhost:2181
--group test_group 2>/dev/null | awk 'NR>1 {sum += }
END {print sum}'
awk 读取行,跳过 header 行并将第 6 列相加,最后打印总和。
版画
5
要获取为该主题存储的所有消息,您可以在每个分区的流的开头和结尾寻找消费者并对结果求和
List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
.map(p -> new TopicPartition(topic, p.partition()))
.collect(Collectors.toList());
consumer.assign(partitions);
consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream()
.collect(Collectors.toMap(Function.identity(), consumer::position));
consumer.seekToBeginning(Collections.emptySet());
System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());
使用 Kafka 2.11-1.0.0 的 Java 客户端,你可以做以下事情:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// after each message, query the number of messages of the topic
Set<TopicPartition> partitions = consumer.assignment();
Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
for(TopicPartition partition : offsets.keySet()) {
System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
}
}
}
输出是这样的:
offset = 10, key = null, value = un
partition test is at 13
offset = 11, key = null, value = deux
partition test is at 13
offset = 12, key = null, value = trois
partition test is at 13
在最新版本的 Kafka Manager 中,有一列标题为 Summed Recent Offsets。
有时,人们感兴趣的是了解每个分区中的消息数量,例如,在测试自定义 partitioner.The 时,已经测试了后续步骤与来自 Confluent 3.2 的 Kafka 0.10.2.1-2 一起使用。给定一个 Kafka 主题,kt
和以下命令行:
$ kafka-run-class kafka.tools.GetOffsetShell \
--broker-list host01:9092,host02:9092,host02:9092 --topic kt
打印示例输出显示三个分区中的消息计数:
kt:2:6138
kt:1:6123
kt:0:6137
行数可能会更多或更少,具体取决于主题的分区数。
运行 以下(假设 kafka-console-consumer.sh
在路径上):
kafka-console-consumer.sh --from-beginning \
--bootstrap-server yourbroker:9092 --property print.key=true \
--property print.value=false --property print.partition \
--topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
Kafka 文档摘录
0.9.0.0 中的弃用
kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) 已被弃用。今后,请使用 kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) 来实现此功能。
我是 运行 Kafka 代理,为服务器和客户端启用了 SSL。下面的命令我使用
kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config
kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x
其中/tmp/ssl_config如下
security.protocol=SSL
ssl.truststore.location=truststore_file_path.jks
ssl.truststore.password=truststore_password
ssl.keystore.location=keystore_file_path.jks
ssl.keystore.password=keystore_password
ssl.key.password=key_password
如果您有权访问服务器的 JMX 接口,则开始和结束偏移量位于:
kafka.log:type=Log,name=LogStartOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
kafka.log:type=Log,name=LogEndOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
(您需要替换 TOPICNAME
& PARTITIONNUMBER
)。
请记住,您需要检查给定分区的每个副本,或者您需要找出哪个代理是 given 分区的领导者(这可以改变时间)。
或者,您可以使用 Kafka Consumer 方法 beginningOffsets
和 endOffsets
。
我发现的最简单的方法是使用 Kafdrop REST API /topic/topicName
并指定键:"Accept"
/ 值:"application/json"
header 以便返回 JSON 响应。
由于不再支持ConsumerOffsetChecker
,您可以使用此命令查看主题中的所有消息:
bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
--group my-group \
--bootstrap-server localhost:9092 \
--describe
其中 LAG
是主题分区中的消息数:
也可以尝试使用kafkacat。这是一个开源项目,可以帮助您从主题和分区中读取消息并将它们打印到标准输出。这是一个示例,它从 sample-kafka-topic
主题读取最后 10 条消息,然后退出:
kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e
我有同样的问题,这就是我在 Kotlin 中来自 KafkaConsumer 的方式:
val messageCount = consumer.listTopics().entries.filter { it.key == topicName }
.map {
it.value.map { topicInfo -> TopicPartition(topicInfo.topic(), topicInfo.partition()) }
}.map { consumer.endOffsets(it).values.sum() - consumer.beginningOffsets(it).values.sum()}
.first()
非常粗糙的代码,因为我刚刚让它工作,但基本上你想从结束偏移量中减去主题的开始偏移量,这将是该主题的当前消息数。
您不能仅仅依赖结束偏移量,因为其他配置(清理策略、保留时间等)可能最终导致从您的主题中删除旧消息。 仅向前偏移 "move",因此开始偏移将向前移动更接近结束偏移(或最终移动到相同的值,如果主题现在不包含消息)。
基本上,结束偏移量表示通过该主题的消息总数,两者之差表示该主题现在包含的消息数。
如果您需要为一个消费者组中的所有消费者(或不同的消费者组)计算结果,另一种选择是使用管理客户端并从 topic/partition 中减去消费者组偏移量偏移量,Kotlin 中的代码示例:
val topicName = "someTopic"
val groupId = "theGroupId"
val admin = Admin.create(kafkaProps.buildAdminProperties()) // Spring KafkaProperties
val parts = admin.describeTopics(listOf(topicName)).values()[topicName]!!.get().partitions()
val topicPartitionOffsets = admin.listOffsets(parts.associate { TopicPartition(topicName, it.partition()) to OffsetSpec.latest() }).all().get()
val consumerGroupOffsets = admin.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata().get()
val highWaterMark = topicPartitionOffsets.map { it.value.offset() }.sum()
val consumerPos = consumerGroupOffsets.map { it.value.offset() }.sum()
val unProcessedMessages = highWaterMark - consumerPos
这里还有一个 LeYAUable 示例代码的工作版本,它仅使用常规 (non-admin) 客户端:
val partitions = consumer.partitionsFor("topicName")
.map { TopicPartition(it.topic(), it.partition()) }
val highWaterMark = consumer.endOffsets(partitions).values.sum()
val consumerPosition = consumer.beginningOffsets(partitions).values.sum()
val msgCount = highWaterMark - consumerPosition
不过,这只会为您提供针对该特定消费者的补偿!通常需要注意的是,当主题被压缩时,这是不精确的。