如何获取 Kafka Producer 消息数
How to get Kafka Producer messages count
我使用以下代码创建了一个产生大约 2000 条消息的生产者。
public class ProducerDemoWithCallback {
public static void main(String[] args) {
final Logger logger = LoggerFactory.getLogger(ProducerDemoWithCallback.class);
String bootstrapServers = "localhost:9092";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// create the producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i=0; i<2000; i++ ) {
// create a producer record
ProducerRecord<String, String> record =
new ProducerRecord<String, String>("TwitterProducer", "Hello World " + Integer.toString(i));
// send data - asynchronous
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// executes every time a record is successfully sent or an exception is thrown
if (e == null) {
// the record was successfully sent
logger .info("Received new metadata. \n" +
"Topic:" + recordMetadata.topic() + "\n" +
"Partition: " + recordMetadata.partition() + "\n" +
"Offset: " + recordMetadata.offset() + "\n" +
"Timestamp: " + recordMetadata.timestamp());
} else {
logger .error("Error while producing", e);
}
}
});
}
// flush data
producer.flush();
// flush and close producer
producer.close();
}
}
我想对这些消息进行计数并获取 int 值。
我使用此命令并且它有效,但我正在尝试使用代码获取此计数。
"bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic TwitterProducer --time -1"
结果是
- TwitterProducer:0:2000
我以编程方式执行相同操作的代码看起来像这样,但我不确定这是否是获取计数的正确方法:
int valueCount = (int) recordMetadata.offset();
System.out.println("Offset value " + valueCount);
有人可以帮助我使用代码获取 Kafka 消息偏移值的计数吗?
你为什么要得到那个值?如果你分享更多关于目的的细节,我可以给你更多好的提示。
对于你的最后一个问题,这不是获取具有偏移值的消息计数的正确方法。如果你的主题有一个分区,生产者是一个,你可以使用它。您需要考虑该主题有多个分区。
如果想获取每个生产者的消息数,可以在onCompletion()的回调函数中统计
或者您可以像这样使用 Consumer 客户端获取最后一个偏移量:
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-brokers");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Consumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("topic_name");
Collection<TopicPartition> partitions = consumer.assignment();
consumer.seekToEnd(partitions);
for(TopicPartition tp: partitions) {
long offsetPosition = consumer.position(tp);
}
您可以查看 GetOffsetShell 的实现细节。
这是在 Java 中重写的简化代码:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
import java.util.stream.Collectors;
public class GetOffsetCommand {
private static final Set<String> TopicNames = new HashSet<>();
static {
TopicNames.add("my-topic");
TopicNames.add("not-my-topic");
}
public static void main(String[] args) {
TopicNames.forEach(topicName -> {
final Map<TopicPartition, Long> offsets = getOffsets(topicName);
new ArrayList<>(offsets.entrySet()).forEach(System.out::println);
System.out.println(topicName + ":" + offsets.values().stream().reduce(0L, Long::sum));
});
}
private static Map<TopicPartition, Long> getOffsets(String topicName) {
final KafkaConsumer<String, String> consumer = makeKafkaConsumer();
final List<TopicPartition> partitions = listTopicPartitions(consumer, topicName);
return consumer.endOffsets(partitions);
}
private static KafkaConsumer<String, String> makeKafkaConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "get-offset-command");
return new KafkaConsumer<>(props);
}
private static List<TopicPartition> listTopicPartitions(KafkaConsumer<String, String> consumer, String topicName) {
return consumer.listTopics().entrySet().stream()
.filter(t -> topicName.equals(t.getKey()))
.flatMap(t -> t.getValue().stream())
.map(p -> new TopicPartition(p.topic(), p.partition()))
.collect(Collectors.toList());
}
}
它为每个主题的分区和总和(消息总数)生成偏移量,例如:
my-topic-0=184
my-topic-2=187
my-topic-4=189
my-topic-1=196
my-topic-3=243
my-topic:999
我使用以下代码创建了一个产生大约 2000 条消息的生产者。
public class ProducerDemoWithCallback {
public static void main(String[] args) {
final Logger logger = LoggerFactory.getLogger(ProducerDemoWithCallback.class);
String bootstrapServers = "localhost:9092";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// create the producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i=0; i<2000; i++ ) {
// create a producer record
ProducerRecord<String, String> record =
new ProducerRecord<String, String>("TwitterProducer", "Hello World " + Integer.toString(i));
// send data - asynchronous
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// executes every time a record is successfully sent or an exception is thrown
if (e == null) {
// the record was successfully sent
logger .info("Received new metadata. \n" +
"Topic:" + recordMetadata.topic() + "\n" +
"Partition: " + recordMetadata.partition() + "\n" +
"Offset: " + recordMetadata.offset() + "\n" +
"Timestamp: " + recordMetadata.timestamp());
} else {
logger .error("Error while producing", e);
}
}
});
}
// flush data
producer.flush();
// flush and close producer
producer.close();
}
}
我想对这些消息进行计数并获取 int 值。 我使用此命令并且它有效,但我正在尝试使用代码获取此计数。
"bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic TwitterProducer --time -1"
结果是
- TwitterProducer:0:2000
我以编程方式执行相同操作的代码看起来像这样,但我不确定这是否是获取计数的正确方法:
int valueCount = (int) recordMetadata.offset();
System.out.println("Offset value " + valueCount);
有人可以帮助我使用代码获取 Kafka 消息偏移值的计数吗?
你为什么要得到那个值?如果你分享更多关于目的的细节,我可以给你更多好的提示。
对于你的最后一个问题,这不是获取具有偏移值的消息计数的正确方法。如果你的主题有一个分区,生产者是一个,你可以使用它。您需要考虑该主题有多个分区。
如果想获取每个生产者的消息数,可以在onCompletion()的回调函数中统计
或者您可以像这样使用 Consumer 客户端获取最后一个偏移量:
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-brokers");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Consumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("topic_name");
Collection<TopicPartition> partitions = consumer.assignment();
consumer.seekToEnd(partitions);
for(TopicPartition tp: partitions) {
long offsetPosition = consumer.position(tp);
}
您可以查看 GetOffsetShell 的实现细节。
这是在 Java 中重写的简化代码:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
import java.util.stream.Collectors;
public class GetOffsetCommand {
private static final Set<String> TopicNames = new HashSet<>();
static {
TopicNames.add("my-topic");
TopicNames.add("not-my-topic");
}
public static void main(String[] args) {
TopicNames.forEach(topicName -> {
final Map<TopicPartition, Long> offsets = getOffsets(topicName);
new ArrayList<>(offsets.entrySet()).forEach(System.out::println);
System.out.println(topicName + ":" + offsets.values().stream().reduce(0L, Long::sum));
});
}
private static Map<TopicPartition, Long> getOffsets(String topicName) {
final KafkaConsumer<String, String> consumer = makeKafkaConsumer();
final List<TopicPartition> partitions = listTopicPartitions(consumer, topicName);
return consumer.endOffsets(partitions);
}
private static KafkaConsumer<String, String> makeKafkaConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "get-offset-command");
return new KafkaConsumer<>(props);
}
private static List<TopicPartition> listTopicPartitions(KafkaConsumer<String, String> consumer, String topicName) {
return consumer.listTopics().entrySet().stream()
.filter(t -> topicName.equals(t.getKey()))
.flatMap(t -> t.getValue().stream())
.map(p -> new TopicPartition(p.topic(), p.partition()))
.collect(Collectors.toList());
}
}
它为每个主题的分区和总和(消息总数)生成偏移量,例如:
my-topic-0=184
my-topic-2=187
my-topic-4=189
my-topic-1=196
my-topic-3=243
my-topic:999