如何在特定 doFun 执行结束时在 Apache Beam 中手动提交 Kafka 偏移量
How to manually commit Kafka offset in Apache Beam at the end of specific doFun execution
我创建了一个简单的 Apache Beam 流式传输管道,它从 Kafka 读取数据,进行一些处理并通过调用一些外部服务 API 来保存结果。我想确保在管道重启或失败期间没有数据丢失,所以我想在特定 doFun 执行结束时成功调用 API 后手动将记录偏移量提交给 Kafka。
根据我以前的 Kafka 经验,我知道通过使用下面的 Kafka Consumer API,我可以手动将记录偏移量提交给 Kafka。
consumer.commitSync(currentOffsets);
在 KafkaIO 设置中有关闭自动提交的设置,但是我没有找到任何可行的解决方案来在 Apache Beam 中手动提交偏移量,因为我似乎无法在 doFun 中访问消费者。如果有专家可以分享一些示例代码的提示,我们将不胜感激。
默认情况下,pipeline.apply(KafkaIO.read()...)
会 return 一个 PCollection<KafkaRecord<K, V>>
。因此,在您的管道下游,您可以从 KafkaRecord
元数据中获取偏移量并以您需要的方式手动提交它(只是不要忘记在 KafkaIO.read()
中禁用 AUTO_COMMIT)。
不过,您需要确保对外部 API 和偏移提交的调用是原子的,以防止潜在的数据丢失(如果它很重要)。
我创建了一个简单的 Apache Beam 流式传输管道,它从 Kafka 读取数据,进行一些处理并通过调用一些外部服务 API 来保存结果。我想确保在管道重启或失败期间没有数据丢失,所以我想在特定 doFun 执行结束时成功调用 API 后手动将记录偏移量提交给 Kafka。
根据我以前的 Kafka 经验,我知道通过使用下面的 Kafka Consumer API,我可以手动将记录偏移量提交给 Kafka。
consumer.commitSync(currentOffsets);
在 KafkaIO 设置中有关闭自动提交的设置,但是我没有找到任何可行的解决方案来在 Apache Beam 中手动提交偏移量,因为我似乎无法在 doFun 中访问消费者。如果有专家可以分享一些示例代码的提示,我们将不胜感激。
默认情况下,pipeline.apply(KafkaIO.read()...)
会 return 一个 PCollection<KafkaRecord<K, V>>
。因此,在您的管道下游,您可以从 KafkaRecord
元数据中获取偏移量并以您需要的方式手动提交它(只是不要忘记在 KafkaIO.read()
中禁用 AUTO_COMMIT)。
不过,您需要确保对外部 API 和偏移提交的调用是原子的,以防止潜在的数据丢失(如果它很重要)。