当一个主题有多个主题时,如何处理来自 Kafka(使用 Apache Beam)的 Avro 输入?
How to process Avro input from Kafka (with Apache Beam) when there are multiple subjects on one topic?
为了使用 KafkaIO
, one needs to pass an instance of ConfluentSchemaRegistryDeserializerProvider
作为值反序列化器使用 Apache Beam 处理 Avro 编码的消息。
一个典型的例子是这样的:
PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
.apply(KafkaIO.<Long, GenericRecord>read()
.withBootstrapServers("kafka-broker:9092")
.withTopic("my_topic")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of("http://my-local-schema-registry:8081", "my_subject"))
但是,我想使用的一些 Kafka 主题有多个不同的主题(事件类型)(出于排序原因)。因此,我无法预先提供一个固定的主题名称。如何解决这个难题?
(我的目标是,最终使用BigQueryIO
将这些事件推送到云端。)
您可以进行多次阅读,每个主题一次,然后 Flatten 他们。
为了使用 KafkaIO
, one needs to pass an instance of ConfluentSchemaRegistryDeserializerProvider
作为值反序列化器使用 Apache Beam 处理 Avro 编码的消息。
一个典型的例子是这样的:
PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
.apply(KafkaIO.<Long, GenericRecord>read()
.withBootstrapServers("kafka-broker:9092")
.withTopic("my_topic")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of("http://my-local-schema-registry:8081", "my_subject"))
但是,我想使用的一些 Kafka 主题有多个不同的主题(事件类型)(出于排序原因)。因此,我无法预先提供一个固定的主题名称。如何解决这个难题?
(我的目标是,最终使用BigQueryIO
将这些事件推送到云端。)
您可以进行多次阅读,每个主题一次,然后 Flatten 他们。