Spring Cloud Stream Elmhrust.RELEASE 无法更改 Serde
Spring Cloud Stream Elmhrust.RELEASE cannot change Serde
我无法使用文档 (https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_configuration_options_3) 中指定的语法更改通道(或绑定)的 Serde。
假设我的频道是 pcin
,我知道我应该使用以下属性指示 valueSerde 和 keySerde spring.cloud.stream.kafka.streams.bindings.pcin.producer.valueSerde
和 spring.cloud.stream.kafka.streams.bindings.pcin.producer.keySerde
.
但是,我收到一个异常:
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.Long). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
我正在尝试改编 Josh Long 的 Spring 提示中的示例:https://github.com/spring-tips/spring-cloud-stream-kafka-streams
我刚刚将 class PageViewEventProcessor
更改如下:
@Component
public static class PageViewEventProcessor {
@StreamListener
@SendTo(AnalyticsBinding.PAGE_COUNT_OUT)
public KStream<String, Long> process(@Input(AnalyticsBinding.PAGE_VIEWS_IN) KStream<String, PageViewEvent> events) {
return events
.filter((key, value) -> value.getDuration() > 10)
.map((key, value) -> new KeyValue<>(value.getPage(), value.getDuration()))
.groupByKey()
.aggregate(()-> 0L,
(cle, val, valAgregee) -> valAgregee + val,
Materialized.as(AnalyticsBinding.PAGE_COUNT_MV))
.toStream();
}
}
我计算的不是事件(页面访问)的数量,而是每次访问的持续时间总和。
这是 application.properties 的摘录(来自 Spring 提示样本):
# page counts out
spring.cloud.stream.bindings.pcout.destination=pcs
spring.cloud.stream.bindings.pcout.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.pcout.producer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcout.producer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde
#
# page counts in
spring.cloud.stream.bindings.pcin.destination=pcs
spring.cloud.stream.bindings.pcin.consumer.use-native-decoding=true
spring.cloud.stream.bindings.pcin.group=pcs
spring.cloud.stream.bindings.pcin.content-type=application/json
spring.cloud.stream.bindings.pcin.consumer.header-mode=raw
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde
是否还有其他需要更改的地方?
是否pcin
绑定消费者(输入)?如果是这样,您应该将这些属性用作
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.valueSerde
and
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.keySerde
您的传入值类型是PageViewEvent
。但是,您将值 Serde
设置为 LongSerde
。
您可以完全删除此 属性:spring.cloud.stream.bindings.pcin.consumer.use-native-decoding=true
并让框架为您进行 JSON 转换。这样,传入类型会自动转换为 PageViewEvent
,而无需您显式提供值 Serde。
如果您必须提供值 Serde(在这种情况下,native-decoding 属性 必须设置为 true
),那么您必须提供适当的 JsonSerde 作为值 Serde
。
更新:
通过以下更改,我可以 运行 应用程序而不会出现任何错误。
我这样修改了你的代码。
@StreamListener
@SendTo(AnalyticsBinding.PAGE_COUNT_OUT)
public KStream<String, Long> process(@Input(AnalyticsBinding.PAGE_VIEWS_IN) KStream<String, PageViewEvent> events) {
return events
.filter((key, value) -> value.getDuration() > 10)
.map((key, value) -> new KeyValue<>(value.getPage(), value.getDuration()))
.groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
.aggregate(()-> 0L,
(cle, val, valAgregee) -> valAgregee + val,
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(AnalyticsBinding.PAGE_COUNT_MV)
.withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())
)
.toStream();
}
groupByKey
和 aggregate
调用中的内部 Serdes 是必需的,因为它们不同于默认的 key/value Serde 组合。
我还更改了您的配置并清理了它:
#
# defaults
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.mms=1000
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
#
# page views out
spring.cloud.stream.bindings.pvout.destination=pvs
#
# page views in
spring.cloud.stream.bindings.pvin.destination=pvs
#
# page counts out
spring.cloud.stream.bindings.pcout.destination=pcs
spring.cloud.stream.bindings.pcout.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.pcout.producer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcout.producer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde
#
# page counts in
spring.cloud.stream.bindings.pcin.destination=pcs
spring.cloud.stream.bindings.pcin.consumer.use-native-decoding=true
spring.cloud.stream.bindings.pcin.group=pcs
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde
我无法使用文档 (https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_configuration_options_3) 中指定的语法更改通道(或绑定)的 Serde。
假设我的频道是 pcin
,我知道我应该使用以下属性指示 valueSerde 和 keySerde spring.cloud.stream.kafka.streams.bindings.pcin.producer.valueSerde
和 spring.cloud.stream.kafka.streams.bindings.pcin.producer.keySerde
.
但是,我收到一个异常:
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.Long). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
我正在尝试改编 Josh Long 的 Spring 提示中的示例:https://github.com/spring-tips/spring-cloud-stream-kafka-streams
我刚刚将 class PageViewEventProcessor
更改如下:
@Component
public static class PageViewEventProcessor {
@StreamListener
@SendTo(AnalyticsBinding.PAGE_COUNT_OUT)
public KStream<String, Long> process(@Input(AnalyticsBinding.PAGE_VIEWS_IN) KStream<String, PageViewEvent> events) {
return events
.filter((key, value) -> value.getDuration() > 10)
.map((key, value) -> new KeyValue<>(value.getPage(), value.getDuration()))
.groupByKey()
.aggregate(()-> 0L,
(cle, val, valAgregee) -> valAgregee + val,
Materialized.as(AnalyticsBinding.PAGE_COUNT_MV))
.toStream();
}
}
我计算的不是事件(页面访问)的数量,而是每次访问的持续时间总和。
这是 application.properties 的摘录(来自 Spring 提示样本):
# page counts out
spring.cloud.stream.bindings.pcout.destination=pcs
spring.cloud.stream.bindings.pcout.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.pcout.producer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcout.producer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde
#
# page counts in
spring.cloud.stream.bindings.pcin.destination=pcs
spring.cloud.stream.bindings.pcin.consumer.use-native-decoding=true
spring.cloud.stream.bindings.pcin.group=pcs
spring.cloud.stream.bindings.pcin.content-type=application/json
spring.cloud.stream.bindings.pcin.consumer.header-mode=raw
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde
是否还有其他需要更改的地方?
是否pcin
绑定消费者(输入)?如果是这样,您应该将这些属性用作
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.valueSerde
and
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.keySerde
您的传入值类型是PageViewEvent
。但是,您将值 Serde
设置为 LongSerde
。
您可以完全删除此 属性:spring.cloud.stream.bindings.pcin.consumer.use-native-decoding=true
并让框架为您进行 JSON 转换。这样,传入类型会自动转换为 PageViewEvent
,而无需您显式提供值 Serde。
如果您必须提供值 Serde(在这种情况下,native-decoding 属性 必须设置为 true
),那么您必须提供适当的 JsonSerde 作为值 Serde
。
更新:
通过以下更改,我可以 运行 应用程序而不会出现任何错误。
我这样修改了你的代码。
@StreamListener
@SendTo(AnalyticsBinding.PAGE_COUNT_OUT)
public KStream<String, Long> process(@Input(AnalyticsBinding.PAGE_VIEWS_IN) KStream<String, PageViewEvent> events) {
return events
.filter((key, value) -> value.getDuration() > 10)
.map((key, value) -> new KeyValue<>(value.getPage(), value.getDuration()))
.groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
.aggregate(()-> 0L,
(cle, val, valAgregee) -> valAgregee + val,
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(AnalyticsBinding.PAGE_COUNT_MV)
.withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())
)
.toStream();
}
groupByKey
和 aggregate
调用中的内部 Serdes 是必需的,因为它们不同于默认的 key/value Serde 组合。
我还更改了您的配置并清理了它:
#
# defaults
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.mms=1000
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
#
# page views out
spring.cloud.stream.bindings.pvout.destination=pvs
#
# page views in
spring.cloud.stream.bindings.pvin.destination=pvs
#
# page counts out
spring.cloud.stream.bindings.pcout.destination=pcs
spring.cloud.stream.bindings.pcout.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.pcout.producer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcout.producer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde
#
# page counts in
spring.cloud.stream.bindings.pcin.destination=pcs
spring.cloud.stream.bindings.pcin.consumer.use-native-decoding=true
spring.cloud.stream.bindings.pcin.group=pcs
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.pcin.consumer.value-serde=org.apache.kafka.common.serialization.Serdes$LongSerde