KafkaStreams 如何在流聚合中指定 Serdes?
KafkaStreams How to specify Serdes in stream aggregation?
我正在开发一个 Kafka 流应用程序,但我在弄清楚如何进行聚合时遇到了一些问题。
我有一个 KStream bankTransactions
,其中键的类型为 String
,值的类型为 JsonNode
,因此我使用
配置了我应用程序的 Serdes
// Definition of the different Serdes used in the streams
final Serde<String> stringSerde = Serdes.String();
final Serde<JsonNode> jsonSerde = new JsonSerde();
final Serde<Long> longSerde = Serdes.Long();
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, jsonSerde.getClass().getName());
我想聚合 KTable<String, Long>
中的值,其中键相同,但值将从我的 Json.
中提取 Long
所以我首先写道:
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.as("bank-total-balance")
);
我在运行时收到以下错误:
Caused by: org.apache.kafka.streams.errors.StreamsException:
A serializer (value: org.apache.kafka.connect.json.JsonSerializer) is not compatible to
the actual value type (value type: java.lang.Long).
Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
我知道 Kafka 在抱怨,因为我正在尝试使用默认的 Json serdes 来序列化一个 Long
。所以阅读 confluent's doc 我试过这个
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.as("bank-total-balance").withValueSerde(Serdes.Long())
);
但是我在编译时遇到错误:
Error:(121, 89) java: incompatible types:
org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted
to org.apache.kafka.common.serialization.Serde<java.lang.Object>
我尝试了不同的方式来编写此代码(例如,使用 Serdes.long()
而不是我的 longSerdes
,尝试参数化 Materialize
的类型,甚至尝试编写我的初始化程序和聚合器作为函数,Java 7 种风格)但我不知道我做错了什么。
所以我的问题很简单:当 aggregate
不是默认 Serdes 时,如何正确指定它们应该使用的 Serdes?
似乎正确的语法如下:
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("bank-total-balances")
.withKeySerde(stringSerde)
.withValueSerde(longSerde)
);
Materialize.
之后的三种类型分别是key、value和store用于实体化KTable的三种类型,这一种不应该改变。那么我们就可以在这个key-value store中定义用来写入的Serdes了。
注意 我从 github 上的一个随机回购中得到了这个语法,我仍然很乐意接受一些文档支持的更精确答案的答案。
我正在开发一个 Kafka 流应用程序,但我在弄清楚如何进行聚合时遇到了一些问题。
我有一个 KStream bankTransactions
,其中键的类型为 String
,值的类型为 JsonNode
,因此我使用
// Definition of the different Serdes used in the streams
final Serde<String> stringSerde = Serdes.String();
final Serde<JsonNode> jsonSerde = new JsonSerde();
final Serde<Long> longSerde = Serdes.Long();
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, jsonSerde.getClass().getName());
我想聚合 KTable<String, Long>
中的值,其中键相同,但值将从我的 Json.
Long
所以我首先写道:
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.as("bank-total-balance")
);
我在运行时收到以下错误:
Caused by: org.apache.kafka.streams.errors.StreamsException:
A serializer (value: org.apache.kafka.connect.json.JsonSerializer) is not compatible to
the actual value type (value type: java.lang.Long).
Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
我知道 Kafka 在抱怨,因为我正在尝试使用默认的 Json serdes 来序列化一个 Long
。所以阅读 confluent's doc 我试过这个
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.as("bank-total-balance").withValueSerde(Serdes.Long())
);
但是我在编译时遇到错误:
Error:(121, 89) java: incompatible types:
org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted
to org.apache.kafka.common.serialization.Serde<java.lang.Object>
我尝试了不同的方式来编写此代码(例如,使用 Serdes.long()
而不是我的 longSerdes
,尝试参数化 Materialize
的类型,甚至尝试编写我的初始化程序和聚合器作为函数,Java 7 种风格)但我不知道我做错了什么。
所以我的问题很简单:当 aggregate
不是默认 Serdes 时,如何正确指定它们应该使用的 Serdes?
似乎正确的语法如下:
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("bank-total-balances")
.withKeySerde(stringSerde)
.withValueSerde(longSerde)
);
Materialize.
之后的三种类型分别是key、value和store用于实体化KTable的三种类型,这一种不应该改变。那么我们就可以在这个key-value store中定义用来写入的Serdes了。
注意 我从 github 上的一个随机回购中得到了这个语法,我仍然很乐意接受一些文档支持的更精确答案的答案。