Kafka Streams 物化存储构建错误
Kafka Streams Materialized Store Build Error
我正在尝试在此处构建 Materialized.as DSL 代码:https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/state/Stores.html
但是我收到了错误
incompatible types: org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted to org.apache.kafka.common.serialization.Serde<java.lang.Object>
在线
.withKeySerde(Serdes.Long())
有谁知道这里可能出了什么问题?
final StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
KTable<Long,String> dataStore = builder.table(
"example_stream",
Materialized.as(storeSupplier)
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String()));
没有代码示例我不能肯定地说,但是错误消息非常清楚。您正在向 Kafka 指定密钥的类型为 Long
。但是,您的密钥实际上是其他 Java 对象。例如,如果您有一条带有字符串键的消息,则此代码将更改为:.withKeySerde(Serdes.String())
。检查密钥的类型并为该类型指定正确的 Serde
。
问题是 builder.table
不知道默认为 <Object,Object>
的通用类型。后来,Serde 类型不匹配。您需要指定类型
KTable<Long,String> dataStore = builder.<Long,String>table(
"example_stream",
Materialized.as(storeSupplier)
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String()));
我正在尝试在此处构建 Materialized.as DSL 代码:https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/state/Stores.html
但是我收到了错误
incompatible types: org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted to org.apache.kafka.common.serialization.Serde<java.lang.Object>
在线
.withKeySerde(Serdes.Long())
有谁知道这里可能出了什么问题?
final StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
KTable<Long,String> dataStore = builder.table(
"example_stream",
Materialized.as(storeSupplier)
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String()));
没有代码示例我不能肯定地说,但是错误消息非常清楚。您正在向 Kafka 指定密钥的类型为 Long
。但是,您的密钥实际上是其他 Java 对象。例如,如果您有一条带有字符串键的消息,则此代码将更改为:.withKeySerde(Serdes.String())
。检查密钥的类型并为该类型指定正确的 Serde
。
问题是 builder.table
不知道默认为 <Object,Object>
的通用类型。后来,Serde 类型不匹配。您需要指定类型
KTable<Long,String> dataStore = builder.<Long,String>table(
"example_stream",
Materialized.as(storeSupplier)
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String()));