当一个主题有多个主题时,如何处理来自 Kafka(使用 Apache Beam)的 Avro 输入?

How to process Avro input from Kafka (with Apache Beam) when there are multiple subjects on one topic?

为了使用 KafkaIO, one needs to pass an instance of ConfluentSchemaRegistryDeserializerProvider 作为值反序列化器使用 Apache Beam 处理 Avro 编码的消息。

一个典型的例子是这样的:

PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
  .apply(KafkaIO.<Long, GenericRecord>read()
     .withBootstrapServers("kafka-broker:9092")
     .withTopic("my_topic")
     .withKeyDeserializer(LongDeserializer.class)
     .withValueDeserializer(
         ConfluentSchemaRegistryDeserializerProvider.of("http://my-local-schema-registry:8081", "my_subject"))

但是,我想使用的一些 Kafka 主题有多个不同的主题(事件类型)(出于排序原因)。因此,我无法预先提供一个固定的主题名称。如何解决这个难题?

(我的目标是,最终使用BigQueryIO将这些事件推送到云端。)

您可以进行多次阅读,每个主题一次,然后 Flatten 他们。