Apache Beam KafkaIO 提到主题分区而不是主题名称

Apache Beam KafkaIO mention topic partition instead of topic name

Apache Beam KafkaIO 支持 kafka 消费者仅从指定分区读取。我有以下代码。

KafkaIO.<String, String>read()
                .withCreateTime(Duration.standardMinutes(1))
                .withReadCommitted()
                .withBootstrapServers(endPoint)
                .withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
                        .put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
                        .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5)
                        .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
                        .build())
                .commitOffsetsInFinalize()
                .withTopicPartitions(List<TopicPartitions>)

我有以下2个问题。

  1. 如何从kafka获取分区名称?我如何在kafkaIO中提及它?
  2. A​​pache beam 产生的 kafka 消费者的数量是否等于创建 kafka 消费者时提到的分区列表?

我自己找到了答案。

如何让 kafkaIO 从特定分区读取数据?

kafkaIO 有方法 withTopicPartitions(List<TopicPartitions>) 接受 TopicPartition 个对象的列表。

主题分区被命名为从零开始的连续数字。因此,以下应该有效

KafkaIO.<String, String>read()
                .withCreateTime(Duration.standardMinutes(1))
                .withReadCommitted()
                .withBootstrapServers(endPoint)
                .withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
                        .put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
                        .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5)
                        .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
                        .build())
                .commitOffsetsInFinalize()
                .withTopicPartitions(Arrays.asList(new TopicPartition(topicName, 0),new TopicPartition(topicName, 1),new TopicPartition(topicName, 2)))

要测试它,请使用 kafkacat 和以下命令

kafkacat -P -b localhost:9092 -t sample -p 0 - 此命令生成指定分区。

Apache beam 产生的 kafka 消费者的数量是否等于创建 kafka 消费者时提到的分区列表?

它将生成一个消费者组,其消费者数量与在构建 kafka Producer 对象时明确提到的分区数量相同。