使用 Spring Cloud Stream Kafka Streams with Avro input/output with nativeEncoding/decoding=false

Using Spring Cloud Stream Kafka Streams with Avro input/output with nativeEncoding/decoding=false

我们正在通过 Spring Cloud Stream 功能支持 Avro input/output 记录来测试 Kafka Streams 的使用,但设置 nativeEncoding=falsenativeDecoding=false 以便使用我们进行 Avro 转换的自定义 MessageConverter

默认的 serdes 键为 StringSerde,值为 ByteArraySerde

当我们只使用一个KStream转KStream函数时一切正常,例如:

    @Bean
    public Function<KStream<String, DataRecordAvro>, KStream<String, DataRecordAvro>> wordsCount() {
      return input -> input
          .flatMapValues(value -> Arrays.asList(value.getName().toString().toLowerCase().split("\W+")))
          .map((key, value) -> new KeyValue<>(value, value))
          .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
          .windowedBy(TimeWindows.of(Duration.ofSeconds(5)).grace(Duration.ofMillis(0)))
          .count()
          .toStream()
          .map((key, value) -> new KeyValue<>(key.key(), new DataRecordAvro(key.key(), value)));
    }

但是当我们尝试更复杂的示例时,涉及这样的输入 KTable:

    @Bean
    public BiFunction<KStream<String, DataRecordAvro>, KTable<String, DataRecordAvro>, KStream<String, DataRecordAvro>> userClicksRegionKTableAvro() {
      return (userClicksStream, usersRegionKTable) -> userClicksStream
          .leftJoin(usersRegionKTable,
              (clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region.getName().toString(), clicks.getCount()))
          .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
          .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
          .reduce(Long::sum)
          .mapValues((key, value) -> new DataRecordAvro(key, value))
          .toStream();
    }

(DataRecordAvroclass只有两个成员:CharSequence name; Long count;)

收到第一条记录时抛出此异常:

ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: java.lang.String, and value: com.xxxx.kstreams.fixtures.avro.DataRecordAvro.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.

抛出异常的处理器似乎是:

KSTREAM-LEFTJOIN-0000000011:
    states:     [user-regions-avro-STATE-STORE-0000000008]

我们不知道为什么它在这种情况下不起作用。也许 leftJoin 操作将信息持久化到一个内部主题,而 useNativeEncoding/Decoding=false 没有被考虑在内?但为什么上面的 kstream->kstream 示例确实有效?我们认为 Avro 转换仅在拓扑的开始和结束时完成,为什么在使用 leftJoin 时出现此转换异常?

这是另一个工作正常的例子(没有输入 Avro 记录,让消费者 useNativeDecoding 默认为真):

    @Bean
    public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, DataRecordAvro>> userClicksRegionKTable() {
      return (userClicksStream, usersRegionKTable) -> userClicksStream
          .leftJoin(usersRegionKTable,
              (clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks))
          .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
          .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
          .reduce(Long::sum)
          .mapValues((key, value) -> new DataRecordAvro(key, value))
          .toStream();
    }

请帮忙!

对于 Spring Cloud Stream 中的 Kafka Streams 活页夹,我们建议使用原生 decoding/encoding 和 Serdes,除非您有充分的理由依赖消息转换方法。迫使您在这里使用消息转换器的用例是什么?实际上,在 Spring Cloud Stream 中的 Kafka Streams 应用程序中使用消息转换器进行序列化会在您的拓扑中添加一个额外的层并使其更深,因此建议使用原生 decoding/encoding.

如您所述,对于 KTable,活页夹始终使用本机解码 - 目前,无法在那里使用消息转换器。当您关闭 KTable 绑定上的 useNativeDecoding 时,绑定程序会忽略它并仅使用默认字节 serde。我建议在 KTable 绑定上使用默认值,然后在您的应用程序配置中添加以下 bean。

@Bean
public Serde< DataRecordAvro> dataRecordAvroSerde() {
   // return Serde
}

这样绑定器将检测到这个 bean 并意识到 Serde 类型与函数签名中的类型匹配,然后在这些输入上使用它。

如果您对此应用还有其他问题,请随时分享 MCRE。到时候可以再看一下。