Spring Cloud Stream Elmhrust.RELEASE 无法更改 Serde

Spring Cloud Stream Elmhrust.RELEASE cannot change Serde

我无法使用文档 (https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_configuration_options_3) 中指定的语法更改通道(或绑定)的 Serde。

假设我的频道是 pcin,我知道我应该使用以下属性指示 valueSerdekeySerde spring.cloud.stream.kafka.streams.bindings.pcin.producer.valueSerdespring.cloud.stream.kafka.streams.bindings.pcin.producer.keySerde.

但是,我收到一个异常:

Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.Long). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

我正在尝试改编 Josh Long 的 Spring 提示中的示例:https://github.com/spring-tips/spring-cloud-stream-kafka-streams

我刚刚将 class PageViewEventProcessor 更改如下:

@Component
        public static class PageViewEventProcessor {

                @StreamListener
                @SendTo(AnalyticsBinding.PAGE_COUNT_OUT)
                public KStream<String, Long> process(@Input(AnalyticsBinding.PAGE_VIEWS_IN) KStream<String, PageViewEvent> events) {
                        return events
                            .filter((key, value) -> value.getDuration() > 10)
                            .map((key, value) -> new KeyValue<>(value.getPage(), value.getDuration()))
                            .groupByKey()
                            .aggregate(()-> 0L, 
                                    (cle, val, valAgregee) -> valAgregee + val, 
                                    Materialized.as(AnalyticsBinding.PAGE_COUNT_MV))

                            .toStream();
                }
        }

我计算的不是事件(页面访问)的数量,而是每次访问的持续时间总和。

这是 application.properties 的摘录(来自 Spring 提示样本):

# page counts out
spring.cloud.stream.bindings.pcout.destination=pcs
spring.cloud.stream.bindings.pcout.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.pcout.producer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcout.producer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde
#
# page counts in
spring.cloud.stream.bindings.pcin.destination=pcs
spring.cloud.stream.bindings.pcin.consumer.use-native-decoding=true
spring.cloud.stream.bindings.pcin.group=pcs
spring.cloud.stream.bindings.pcin.content-type=application/json
spring.cloud.stream.bindings.pcin.consumer.header-mode=raw
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde

是否还有其他需要更改的地方?

是否pcin绑定消费者(输入)?如果是这样,您应该将这些属性用作 spring.cloud.stream.kafka.streams.bindings.pcin.consumer.valueSerde and spring.cloud.stream.kafka.streams.bindings.pcin.consumer.keySerde

您的传入值类型是PageViewEvent。但是,您将值 Serde 设置为 LongSerde

您可以完全删除此 属性:spring.cloud.stream.bindings.pcin.consumer.use-native-decoding=true 并让框架为您进行 JSON 转换。这样,传入类型会自动转换为 PageViewEvent,而无需您显式提供值 Serde。

如果您必须提供值 Serde(在这种情况下,native-decoding 属性 必须设置为 true),那么您必须提供适当的 JsonSerde 作为值 Serde

更新:

通过以下更改,我可以 运行 应用程序而不会出现任何错误。

我这样修改了你的代码。

@StreamListener
@SendTo(AnalyticsBinding.PAGE_COUNT_OUT)
public KStream<String, Long> process(@Input(AnalyticsBinding.PAGE_VIEWS_IN) KStream<String, PageViewEvent> events) {

                    return events
                            .filter((key, value) -> value.getDuration() > 10)
                            .map((key, value) -> new KeyValue<>(value.getPage(), value.getDuration()))
                            .groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
                            .aggregate(()-> 0L,
                                    (cle, val, valAgregee) -> valAgregee + val,
                                    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(AnalyticsBinding.PAGE_COUNT_MV)
                                    .withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())
                            )
                            .toStream();
                }

groupByKeyaggregate 调用中的内部 Serdes 是必需的,因为它们不同于默认的 key/value Serde 组合。

我还更改了您的配置并清理了它:

#
# defaults
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.mms=1000
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
#
# page views out
spring.cloud.stream.bindings.pvout.destination=pvs
#
# page views in
spring.cloud.stream.bindings.pvin.destination=pvs
#
# page counts out
spring.cloud.stream.bindings.pcout.destination=pcs
spring.cloud.stream.bindings.pcout.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.pcout.producer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcout.producer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde
#
# page counts in
spring.cloud.stream.bindings.pcin.destination=pcs
spring.cloud.stream.bindings.pcin.consumer.use-native-decoding=true
spring.cloud.stream.bindings.pcin.group=pcs
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde