Apache Beam KafkaIO 提到主题分区而不是主题名称
Apache Beam KafkaIO mention topic partition instead of topic name
Apache Beam KafkaIO 支持 kafka 消费者仅从指定分区读取。我有以下代码。
KafkaIO.<String, String>read()
.withCreateTime(Duration.standardMinutes(1))
.withReadCommitted()
.withBootstrapServers(endPoint)
.withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
.put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5)
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
.build())
.commitOffsetsInFinalize()
.withTopicPartitions(List<TopicPartitions>)
我有以下2个问题。
- 如何从kafka获取分区名称?我如何在kafkaIO中提及它?
- Apache beam 产生的 kafka 消费者的数量是否等于创建 kafka 消费者时提到的分区列表?
我自己找到了答案。
如何让 kafkaIO 从特定分区读取数据?
kafkaIO 有方法 withTopicPartitions(List<TopicPartitions>)
接受 TopicPartition
个对象的列表。
主题分区被命名为从零开始的连续数字。因此,以下应该有效
KafkaIO.<String, String>read()
.withCreateTime(Duration.standardMinutes(1))
.withReadCommitted()
.withBootstrapServers(endPoint)
.withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
.put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5)
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
.build())
.commitOffsetsInFinalize()
.withTopicPartitions(Arrays.asList(new TopicPartition(topicName, 0),new TopicPartition(topicName, 1),new TopicPartition(topicName, 2)))
要测试它,请使用 kafkacat
和以下命令
kafkacat -P -b localhost:9092 -t sample -p 0
- 此命令生成指定分区。
Apache beam 产生的 kafka 消费者的数量是否等于创建 kafka 消费者时提到的分区列表?
它将生成一个消费者组,其消费者数量与在构建 kafka Producer 对象时明确提到的分区数量相同。
Apache Beam KafkaIO 支持 kafka 消费者仅从指定分区读取。我有以下代码。
KafkaIO.<String, String>read()
.withCreateTime(Duration.standardMinutes(1))
.withReadCommitted()
.withBootstrapServers(endPoint)
.withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
.put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5)
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
.build())
.commitOffsetsInFinalize()
.withTopicPartitions(List<TopicPartitions>)
我有以下2个问题。
- 如何从kafka获取分区名称?我如何在kafkaIO中提及它?
- Apache beam 产生的 kafka 消费者的数量是否等于创建 kafka 消费者时提到的分区列表?
我自己找到了答案。
如何让 kafkaIO 从特定分区读取数据?
kafkaIO 有方法 withTopicPartitions(List<TopicPartitions>)
接受 TopicPartition
个对象的列表。
主题分区被命名为从零开始的连续数字。因此,以下应该有效
KafkaIO.<String, String>read()
.withCreateTime(Duration.standardMinutes(1))
.withReadCommitted()
.withBootstrapServers(endPoint)
.withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
.put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5)
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
.build())
.commitOffsetsInFinalize()
.withTopicPartitions(Arrays.asList(new TopicPartition(topicName, 0),new TopicPartition(topicName, 1),new TopicPartition(topicName, 2)))
要测试它,请使用 kafkacat
和以下命令
kafkacat -P -b localhost:9092 -t sample -p 0
- 此命令生成指定分区。
Apache beam 产生的 kafka 消费者的数量是否等于创建 kafka 消费者时提到的分区列表?
它将生成一个消费者组,其消费者数量与在构建 kafka Producer 对象时明确提到的分区数量相同。