从kafka主题读取时如何验证无效的分区名称
How to validate invalid partition name while reading from a kafka topic
我正在编写一些代码来根据分区号和偏移量从Kafka Topic读取特定消息,代码如下
TopicPartition topicPartition = new TopicPartition(this.topicName, partition);
this.consumer.assign(Collections.singletonList(topicPartition));
this.consumer.seek(topicPartition, offset);
ConsumerRecords<Object, Object> records = this.consumer.poll(50000L);
现在,如果在 partition
变量中传递了错误的值,我希望代码抛出错误,但现在它可以工作,只是不获取任何记录。所以我无法区分给定输入是没有记录还是给定输入是错误的。
您可以使用其中一种检索集群元数据的方法来查找现有分区。
例如,您可以调用 partitionsFor()
, or listTopics()
来获取主题的现有分区列表。然后你可以检测提供的partition
是否存在
我正在编写一些代码来根据分区号和偏移量从Kafka Topic读取特定消息,代码如下
TopicPartition topicPartition = new TopicPartition(this.topicName, partition);
this.consumer.assign(Collections.singletonList(topicPartition));
this.consumer.seek(topicPartition, offset);
ConsumerRecords<Object, Object> records = this.consumer.poll(50000L);
现在,如果在 partition
变量中传递了错误的值,我希望代码抛出错误,但现在它可以工作,只是不获取任何记录。所以我无法区分给定输入是没有记录还是给定输入是错误的。
您可以使用其中一种检索集群元数据的方法来查找现有分区。
例如,您可以调用 partitionsFor()
, or listTopics()
来获取主题的现有分区列表。然后你可以检测提供的partition
是否存在