将 KafkaAvroDeserializer 与 Alpakka 结合使用

Using the KafkaAvroDeserializer with Alpakka

我有一个 SchemaRegistry 和一个 KafkaBroker,我使用 Avro v1.8.1 从中提取数据。对于反序列化,我一直在使用 Alpakka 提供的 Confluent KafkaAvroDeserializer. Now I've meant to refactor my code in order to use the Elasticsearch API,但不幸的是,这会破坏反序列化,因为它会导致 NullPointerExceptions:

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic-0 at offset 0. If needed, please seek past the record to continue consumption. Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 2 Caused by: java.lang.NullPointerException at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116) at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58) at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1030) at org.apache.kafka.clients.consumer.internals.Fetcher.access00(Fetcher.java:110) at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1250) at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access00(Fetcher.java:1099) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:545) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:506) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1269) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) at de.adesso.fds.connectors.dpa.news.NewsConsumer.main(MyConsumer.java:58)

我一直在使用 Alpakka 的 ConsumerSettings API,如 example 中所述:

val system = ActorSystem.create();

// necessary to convert timestamps correctly in Avro Version 1.8.1 to avoid ClassCastExceptions
SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());

val consumerSettings = ConsumerSettings.create(system, new StringDeserializer(), new KafkaAvroDeserializer())
    .withBootstrapServers(kafkaBootstrapServerUrl)
    .withClientId(InetAddress.getLocalHost().getHostName())
    .withGroupId("" + new Random().nextInt())
    .withProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)
    .withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    .withStopTimeout(Duration.ofSeconds(5));

这些设置导致 NullPointerExceptions,而这个香草 Kafka 消费者道具工作正常:

val props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName()); 
props.put(ConsumerConfig.GROUP_ID_CONFIG, "" + new Random().nextInt());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// necessary to convert timestamps correctly in newer Avro Versions and to avoid ClassCastExceptions
SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
val consumer = new KafkaConsumer<String, MyClass>(props);

在工作示例中,ConsumerRecords 的值已成功反序列化为 AvroMavenPlugin 从模式生成的 类。

如有任何提示,我们将不胜感激!

我认为您需要将 new KafkaAvroDeserializer() 拉到它自己的变量中,然后调用该实例上的 .configure() 方法以传入 non-null 注册表 URL。

然后将配置好的实例传入ConsumerSettings.create

FWIW,根据您的需要,Kafka Connect 可以很好地加载 Elasticsearch