Apache Beam KafkaIO 生产者将不同的消息路由到不同的主题
Apache Beam KafkaIO producer routing different messages to different topics
我有一个用例,其中传入数据有一个标识不同类型数据的键。有一个单一的输入 kafka 主题,所有类型的数据都被抛给它。 Beam 管道从输入的 kafka 主题中读取所有消息,并且必须根据密钥路由到不同的 kafka 主题。
目前,KafkaIO 不支持使用单个生产者写入多个主题。以下代码是KafkaIO.write()
的内部工作代码
final class AutoValue_KafkaIO_Write<K, V> extends Write<K, V> {
private final String topic;
private final WriteRecords<K, V> writeRecordsTransform;
private AutoValue_KafkaIO_Write(@Nullable String topic, WriteRecords<K, V> writeRecordsTransform) {
this.topic = topic;
this.writeRecordsTransform = writeRecordsTransform;
}
如何使用 apache beam 的 kafkaIO 生产者做到这一点?
经过几天尝试实现消息路由,目前kafkaIO不支持消息路由到不同的主题。
解决方法是为每个不同的主题创建一个 kafka 生产者,并根据发送到不同 kafka 主题的元素将元素隔离到不同的 pcollections。
我有一个用例,其中传入数据有一个标识不同类型数据的键。有一个单一的输入 kafka 主题,所有类型的数据都被抛给它。 Beam 管道从输入的 kafka 主题中读取所有消息,并且必须根据密钥路由到不同的 kafka 主题。
目前,KafkaIO 不支持使用单个生产者写入多个主题。以下代码是KafkaIO.write()
final class AutoValue_KafkaIO_Write<K, V> extends Write<K, V> {
private final String topic;
private final WriteRecords<K, V> writeRecordsTransform;
private AutoValue_KafkaIO_Write(@Nullable String topic, WriteRecords<K, V> writeRecordsTransform) {
this.topic = topic;
this.writeRecordsTransform = writeRecordsTransform;
}
如何使用 apache beam 的 kafkaIO 生产者做到这一点?
经过几天尝试实现消息路由,目前kafkaIO不支持消息路由到不同的主题。
解决方法是为每个不同的主题创建一个 kafka 生产者,并根据发送到不同 kafka 主题的元素将元素隔离到不同的 pcollections。