消费者组中的 Apache Beam KafkaIO 消费者阅读相同的消息
Apache Beam KafkaIO consumers in consumer group reading same message
我在数据流中使用 KafkaIO 来读取来自一个主题的消息。我使用以下代码。
KafkaIO.<String, String>read()
.withReadCommitted()
.withBootstrapServers(endPoint)
.withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
.put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 8000).put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000)
.build())
// .commitOffsetsInFinalize()
.withTopics(Collections.singletonList(topicNames))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata();
我运行我本地的数据流程序直接使用运行ner。一切 运行 都很好。我 运行 同一程序的另一个并行实例,即另一个消费者。现在我在管道处理中看到重复的消息。
虽然我提供了消费者组 ID,但启动另一个具有相同消费者组 ID(同一程序的不同实例)的消费者不应该处理由另一个消费者处理的相同元素吗?
使用数据流 运行ner 的结果如何?
我认为您设置的选项不能保证跨管道的消息不重复传递。
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG:这是一个 flag for the Kafka consumer 不适用于 Beam 管道本身。似乎这是尽力而为且周期性的,因此您可能仍会在多个管道中看到重复项。
withReadCommitted():这只是意味着 Beam 不会读取未提交的消息。同样,它不会阻止跨多个管道的重复。
请参阅 here 了解 Beam 源使用的协议,以确定 Kafka 源的起点。
为了保证不重复交付,您可能必须阅读不同的主题或不同的订阅。
我在数据流中使用 KafkaIO 来读取来自一个主题的消息。我使用以下代码。
KafkaIO.<String, String>read()
.withReadCommitted()
.withBootstrapServers(endPoint)
.withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
.put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 8000).put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000)
.build())
// .commitOffsetsInFinalize()
.withTopics(Collections.singletonList(topicNames))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata();
我运行我本地的数据流程序直接使用运行ner。一切 运行 都很好。我 运行 同一程序的另一个并行实例,即另一个消费者。现在我在管道处理中看到重复的消息。
虽然我提供了消费者组 ID,但启动另一个具有相同消费者组 ID(同一程序的不同实例)的消费者不应该处理由另一个消费者处理的相同元素吗?
使用数据流 运行ner 的结果如何?
我认为您设置的选项不能保证跨管道的消息不重复传递。
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG:这是一个 flag for the Kafka consumer 不适用于 Beam 管道本身。似乎这是尽力而为且周期性的,因此您可能仍会在多个管道中看到重复项。
withReadCommitted():这只是意味着 Beam 不会读取未提交的消息。同样,它不会阻止跨多个管道的重复。
请参阅 here 了解 Beam 源使用的协议,以确定 Kafka 源的起点。
为了保证不重复交付,您可能必须阅读不同的主题或不同的订阅。