致谢 Kafka 生产者 Apache Beam
Acknowledgement Kafka Producer Apache Beam
如何在 apache beam KafkaIO 中获取收到确认的记录?
基本上,我希望所有未收到任何确认的记录都转到大查询 table,以便稍后重试。我使用了文档中的以下代码片段
.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics.
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
// Above four are required configuration. returns PCollection<KafkaRecord<Long, String>>
// Rest of the settings are optional :
// you can further customize KafkaConsumer used to read the records by adding more
// settings for ConsumerConfig. e.g :
.updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1"))
// set event times and watermark based on LogAppendTime. To provide a custom
// policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
.withLogAppendTime()
// restrict reader to committed messages on Kafka (see method documentation).
.withReadCommitted()
// offset consumed by the pipeline can be committed back.
.commitOffsetsInFinalize()
// finally, if you don't need Kafka metadata, you can drop it.g
.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.<String>create()) // PCollection<String>
默认情况下,光束 IOs 旨在不断尝试 write/read/process 元素,直到 . (批处理流水线反复出错会失败)
您所指的通常称为 Dead Letter Queue,用于获取失败的记录并将它们添加到 PCollection、Pubsub 主题、队列服务等。这通常是可取的,因为它允许streaming pipeline 在写入某些记录时遇到错误,但允许写入成功的一次。
不幸的是,除非我弄错了,否则 Kafka IO 中没有实现死信队列。可以修改 KafkaIO 来支持它。在 beam 邮件列表上进行了一些讨论,提出了一些实现此目的的想法,might have some ideas。
我怀疑可以将它添加到 KafkaWriter, catching the records that failed and outputting them to another PCollection. If you choose to implement this, please also contact the beam community mailing list,如果你想帮助将它合并到 master 中,他们将能够帮助确保更改涵盖必要的要求,以便它可以合并并且作为一个整体对 beam 有意义。
然后您的管道可以将这些写入其他地方(即不同的来源)。当然,如果该次要来源同时具有 outage/issue,您将需要另一个 DLQ。
如何在 apache beam KafkaIO 中获取收到确认的记录?
基本上,我希望所有未收到任何确认的记录都转到大查询 table,以便稍后重试。我使用了文档中的以下代码片段
.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics.
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
// Above four are required configuration. returns PCollection<KafkaRecord<Long, String>>
// Rest of the settings are optional :
// you can further customize KafkaConsumer used to read the records by adding more
// settings for ConsumerConfig. e.g :
.updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1"))
// set event times and watermark based on LogAppendTime. To provide a custom
// policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
.withLogAppendTime()
// restrict reader to committed messages on Kafka (see method documentation).
.withReadCommitted()
// offset consumed by the pipeline can be committed back.
.commitOffsetsInFinalize()
// finally, if you don't need Kafka metadata, you can drop it.g
.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.<String>create()) // PCollection<String>
默认情况下,光束 IOs 旨在不断尝试 write/read/process 元素,直到 . (批处理流水线反复出错会失败)
您所指的通常称为 Dead Letter Queue,用于获取失败的记录并将它们添加到 PCollection、Pubsub 主题、队列服务等。这通常是可取的,因为它允许streaming pipeline 在写入某些记录时遇到错误,但允许写入成功的一次。
不幸的是,除非我弄错了,否则 Kafka IO 中没有实现死信队列。可以修改 KafkaIO 来支持它。在 beam 邮件列表上进行了一些讨论,提出了一些实现此目的的想法,might have some ideas。
我怀疑可以将它添加到 KafkaWriter, catching the records that failed and outputting them to another PCollection. If you choose to implement this, please also contact the beam community mailing list,如果你想帮助将它合并到 master 中,他们将能够帮助确保更改涵盖必要的要求,以便它可以合并并且作为一个整体对 beam 有意义。
然后您的管道可以将这些写入其他地方(即不同的来源)。当然,如果该次要来源同时具有 outage/issue,您将需要另一个 DLQ。