是否可以使用 spring 云流 api 聚合对象而不是字符串?
Is it possible to aggregate an object instad of a string with spring cloud stream api?
我想使用 spring 云流 api 来聚合来自主题的事件。
因此我使用 KStream 作为输入。
KStream<Object, LoggerCreatedMessage>
现在我想使用聚合器将我的新对象存储在键值存储中,所以我使用以下代码:
input
.map((key, value) -> {
return new KeyValue<>(value.logger_id,value);
})
/*.groupBy(
(s, loggerEvent) -> loggerEvent.logger_id,
Serialized.with(null, loggerEventSerde))*/
.groupByKey()
.aggregate(
String::new,
(s, loggerEvent, vr) -> {
return vr;
},
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(STORE_NAME).withKeySerde(Serdes.String()).
withValueSerde(Serdes.String())
);
为什么我只能使用字符串作为初始化器而不可以使用任何对象?
我想使用 LoggerDomain::new 而不是 String::new,但我只收到此错误消息:
Bad return type in method reference: cannot convert LoggerDomain to VR
我错过了什么吗?
您通过 Materialized.<String, String, KeyValueStore<Bytes, byte[]>>
将 <key,value>
定义为 <String, String>
-- 如果您的值类型应为 LoggerDomain
,则应为 Materialized.<KeyType, LoggerDomain, KeyValueStore<Bytes, byte[]>>()
.
请注意,您还需要为 LoggerDomain
为 Materialized
提供自定义 Serde
。
我想使用 spring 云流 api 来聚合来自主题的事件。 因此我使用 KStream 作为输入。
KStream<Object, LoggerCreatedMessage>
现在我想使用聚合器将我的新对象存储在键值存储中,所以我使用以下代码:
input
.map((key, value) -> {
return new KeyValue<>(value.logger_id,value);
})
/*.groupBy(
(s, loggerEvent) -> loggerEvent.logger_id,
Serialized.with(null, loggerEventSerde))*/
.groupByKey()
.aggregate(
String::new,
(s, loggerEvent, vr) -> {
return vr;
},
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(STORE_NAME).withKeySerde(Serdes.String()).
withValueSerde(Serdes.String())
);
为什么我只能使用字符串作为初始化器而不可以使用任何对象?
我想使用 LoggerDomain::new 而不是 String::new,但我只收到此错误消息:
Bad return type in method reference: cannot convert LoggerDomain to VR
我错过了什么吗?
您通过 Materialized.<String, String, KeyValueStore<Bytes, byte[]>>
将 <key,value>
定义为 <String, String>
-- 如果您的值类型应为 LoggerDomain
,则应为 Materialized.<KeyType, LoggerDomain, KeyValueStore<Bytes, byte[]>>()
.
请注意,您还需要为 LoggerDomain
为 Materialized
提供自定义 Serde
。