SerializationFailedException:Spring 启用 kpl-kcl 的 Cloud Stream Kinesis 活页夹:true

SerializationFailedException: Spring Cloud Stream Kinesis binder with kpl-kcl-enabled:true

我目前正在评估在新项目中使用 spring cloud stream kinesis binder 的可能性,但我遇到了一些问题。

当我启用 kcl-kpl-false 时,一切正常。然而,当我启用 kcl-kop 时,我不断收到以下错误:

org.springframework.core.serializer.support.SerializationFailedException: Failed to deserialize payload. Is the byte array a result of corresponding serialization for DefaultDeserializer?; nested exception is java.io.StreamCorruptedException: invalid stream header: 61686868

这是我当前的配置:

spring:
  cloud:
    stream:
      kinesis:
        binder:
          checkpoint:
            create-delay: 0
            table: feeder_mycollection_changes_table
          kpl-kcl-enabled: true
        bindings:
          processEvent-in-0:
            consumer:
              shardIteratorType: TRIM_HORIZON
      bindings:
        processEvent-in-0:
          destination: mycollection_changes_stream
          content-type: application/json
          consumer:
            headerMode: none

我在测试中使用的依赖项版本是:

<spring-cloud.version>Hoxton.RC2</spring-cloud.version>
<spring-cloud-stream.version>Horsham.RC2</spring-cloud-stream.version>
<spring-cloud-stream-kinesis.version>2.0.0.BUILD-SNAPSHOT</spring-cloud-stream-kinesis.version>

修复在这里:https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/commit/b64cd10c5b5aac209b61399f81e2801f24fbbaf4

问题是 KclMessageDrivenChannelAdapter 中的默认 converterDeserializingConverter

Spring Cloud Stream 不处理 Java 序列化,它有自己的转换机制 byte[]。因此,我们需要修复 Binder 实现以依赖 Spring Cloud Stream 转换基础设施。