使用 KafkaAvroDeserializer 的 Kafka 流 "Consumed.with()"

Kafka Streams "Consumed.with()" with KafkaAvroDeserializer

我需要从使用 KafkaAvroDeserializer 而不是标准的 kafka 反序列化器使用的主题创建流。这是因为进一步向下,它将被发送到汇合 JDBC 接收器连接器(不支持标准 serializer/deserializers)中使用的主题。创建主题时,我对键和值都使用了 KafkaAvroSerializer。

我的原始代码(在我更改为使用 Kafka Avro 序列化器作为密钥之前)是:

final KStream<String, DocumentUpload> uploadStream = builder.stream(UPLOADS_TOPIC, Consumed.with(Serdes.String(), uploadSerde));

注意:上面的 Serdes.string 将无法正确反序列化,因为密钥是使用 KafkaAvroSerializer 序列化的。所以,也许还有另一种形式的代码可以让我构建一个流 而无需 设置密钥 serdes(因此它默认为配置中的内容)我可以设置值 serde (uploadSerde)?

如果没有,有人能告诉我如何将 "Serdes.String()" 标准反序列化器更改为 KafkaAvroDeserializer 吗?例如

final KStream<String, DocumentUpload> uploadStream = builder.stream(UPLOADS_TOPIC, Consumed.with(<What can I insert here for the KafkaAvroDeserializer.String???>, uploadSerde));

在我的消费者中,我设置了正确的默认反序列化器:

streamsConfiguration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
streamsConfiguration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);

如果使用表单(并允许在我的消费者中指定的默认值,即 KafkaAvro):

final KStream<String, DocumentUpload> uploadStream = builder.stream(UPLOADS_TOPIC);

我得到以下信息:

2018-04-08 00:24:53,433] ERROR [fc-demo-client-StreamThread-1] stream-thread [fc-demo-client-StreamThread-1] Failed to process stream task 0_0 due to the following error:    (org.apache.kafka.streams.processor.internals.AssignedTasks)
java.lang.ClassCastException: [B cannot be cast to java.lang.String
at     org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
at    org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
....

我正在使用 Java 从 avsc 文件生成的 classes 并且使用从 avro 模式生成的 Java class 初始化 uploadSerde。

谢谢。

键和值的逻辑相同。因此,您可以以相同的方式处理两者。

您的困惑在于在配置中设置消费者反序列化器。请注意,这些配置将被忽略(出于内部原因)。您不能直接配置消费者的反序列化器。你总是需要使用 Serdes。因此,如果你想为消费者设置默认的反序列化器,你需要在配置中指定默认的 Serde。

So I create a wrapper around the KafkaAvroSerializer and KafkaAvroDeserializer, that instantiates these and then use the wrapper for the key parameter in Consumed.with

没错。或者您也可以在配置中将此 Serde 设置为默认值。

Would have thought creating a stream from a topic with a KafkaAvroSerialize'd String key was a common use case

不确定。如果它是一个普通的字符串,我假设人们可能会直接使用 StringDeserializer 而不是将字符串包装为 Avro(不确定)。另请注意,建议在处理 Avro 时使用模式注册表。 Confluent 的模式注册表附带相应的 Avro Serdes:https://github.com/confluentinc/schema-registry/免责声明:我是 Confluent 的员工。