如何获取 Kafka Producer 消息数

How to get Kafka Producer messages count

我使用以下代码创建了一个产生大约 2000 条消息的生产者。

public class ProducerDemoWithCallback {

    public static void main(String[] args) {

        final Logger logger = LoggerFactory.getLogger(ProducerDemoWithCallback.class);

String bootstrapServers = "localhost:9092";
    Properties properties = new Properties();
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    // create the producer
    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);


    for (int i=0; i<2000; i++ ) {
        // create a producer record
        ProducerRecord<String, String> record =
                new ProducerRecord<String, String>("TwitterProducer", "Hello World " + Integer.toString(i));

        // send data - asynchronous
        producer.send(record, new Callback() {
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                // executes every time a record is successfully sent or an exception is thrown
                if (e == null) {
                    // the record was successfully sent
                    logger .info("Received new metadata. \n" +
                            "Topic:" + recordMetadata.topic() + "\n" +
                            "Partition: " + recordMetadata.partition() + "\n" +
                            "Offset: " + recordMetadata.offset() + "\n" +
                            "Timestamp: " + recordMetadata.timestamp());


                } else {
                    logger .error("Error while producing", e);
                }
            }
        });
    }

    // flush data
    producer.flush();
    // flush and close producer
    producer.close();
  }
}

我想对这些消息进行计数并获取 int 值。 我使用此命令并且它有效,但我正在尝试使用代码获取此计数。

"bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic TwitterProducer --time -1"

结果是

- TwitterProducer:0:2000

我以编程方式执行相同操作的代码看起来像这样,但我不确定这是否是获取计数的正确方法:

 int valueCount = (int) recordMetadata.offset();
 System.out.println("Offset value " + valueCount); 

有人可以帮助我使用代码获取 Kafka 消息偏移值的计数吗?

你为什么要得到那个值?如果你分享更多关于目的的细节,我可以给你更多好的提示。

对于你的最后一个问题,这不是获取具有偏移值的消息计数的正确方法。如果你的主题有一个分区,生产者是一个,你可以使用它。您需要考虑该主题有多个分区。

如果想获取每个生产者的消息数,可以在onCompletion()的回调函数中统计

或者您可以像这样使用 Consumer 客户端获取最后一个偏移量:

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-brokers");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

Consumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("topic_name");

Collection<TopicPartition> partitions = consumer.assignment();

consumer.seekToEnd(partitions);

for(TopicPartition tp: partitions) {
    long offsetPosition = consumer.position(tp);
}

您可以查看 GetOffsetShell 的实现细节。

这是在 Java 中重写的简化代码:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.*;
import java.util.stream.Collectors;

public class GetOffsetCommand {

    private static final Set<String> TopicNames = new HashSet<>();

    static {
        TopicNames.add("my-topic");
        TopicNames.add("not-my-topic");
    }

    public static void main(String[] args) {
        TopicNames.forEach(topicName -> {
            final Map<TopicPartition, Long> offsets = getOffsets(topicName);

            new ArrayList<>(offsets.entrySet()).forEach(System.out::println);
            System.out.println(topicName + ":" + offsets.values().stream().reduce(0L, Long::sum));
        });
    }

    private static Map<TopicPartition, Long> getOffsets(String topicName) {
        final KafkaConsumer<String, String> consumer = makeKafkaConsumer();
        final List<TopicPartition> partitions = listTopicPartitions(consumer, topicName);
        return consumer.endOffsets(partitions);
    }

    private static KafkaConsumer<String, String> makeKafkaConsumer() {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "get-offset-command");

        return new KafkaConsumer<>(props);
    }

    private static List<TopicPartition> listTopicPartitions(KafkaConsumer<String, String> consumer, String topicName) {
        return consumer.listTopics().entrySet().stream()
                .filter(t -> topicName.equals(t.getKey()))
                .flatMap(t -> t.getValue().stream())
                .map(p -> new TopicPartition(p.topic(), p.partition()))
                .collect(Collectors.toList());
    }
}

它为每个主题的分区和总和(消息总数)生成偏移量,例如:

my-topic-0=184
my-topic-2=187
my-topic-4=189
my-topic-1=196
my-topic-3=243
my-topic:999