当未指定默认 serdes 并使用自定义 serdes 时,KStream 上的映射操作失败 -> org.apache.kafka.streams.errors.StreamsException
Map operation over a KStream fails when not specifying default serdes and using custom ones -> org.apache.kafka.streams.errors.StreamsException
因为我使用的是 Json 值,所以我没有设置默认的 serdes。
我处理了一个 KStream,使用必要的 spring 和产品 (json) serdes,但下一步(映射操作)失败了:
val props = Properties()
props[StreamsConfig.APPLICATION_ID_CONFIG] = applicationName
props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaBootstrapServers
val productSerde: Serde<Product> = Serdes.serdeFrom(JsonPojoSerializer<Product>(), JsonPojoDeserializer(Product::class.java))
builder.stream(INVENTORY_TOPIC, Consumed.with(Serdes.String(), productSerde))
.map { key, value ->
KeyValue(key, XXX)
}
.aggregate(...)
如果我删除地图操作,执行就会正常。
我还没有找到为 map() 指定 serdes 的方法,怎么办?
错误:
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.codependent.kafkastreams.inventory.dto.Product). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:92)
多个问题:
调用 map()
后调用 groupByKey().aggregate()
。这会触发数据重新分区,因此在 map()
数据被写入内部主题以进行数据重新分区之后。因此,您也需要在groupByKey()
内提供相应的Serde
。
但是,因为您没有修改密钥,所以您实际上应该调用 mapValues()
,以避免不必要的重新分区。
请注意,您需要为不应使用配置中的默认 Serde
的每个运算符提供 Serde
s。 Serde
s 不会沿下游传递,而是运算符就地覆盖。 (Kafka 2.1 正在改进这一点。)
如果有人像我一样登陆这里,这里有一个演示 Matthias 解决方案的实际示例。由于 ClassCastException,以下代码将失败。
builder.stream(
"my-topic",
Consumed.with(Serdes.String(), customSerde))
.map((k, v) -> {
// You've modified the key. Get ready for an error!
String newKey = k.split(":")[0];
return KeyValue.pair(newKey.toString(), v);
})
.groupByKey()
.aggregate(
MyAggregate::new,
(key, data, aggregation) -> {
// You'll never reach this block due to an error similar to:
// ClassCastException while producing data to topic
return aggregation.updateFrom(shot);
},
Materialized.<String, MyAggregate> as(storeSupplier)
.withKeySerde(Serdes.String())
.withValueSerde(aggregateSerde)
)
正如 Matthias 提到的,“您需要在 groupByKey 中提供相应的 Serdes”。方法如下:
builder.stream(
"my-topic",
Consumed.with(Serdes.String(), customSerde))
.map((k, v) -> {
// You've modified the key. Get ready for an error!
String newKey = k.split(":")[0];
return KeyValue.pair(newKey.toString(), v);
})
// Voila, you've specified the serdes required by the new internal
// repartition topic and you can carry on with your work
.groupByKey(Grouped.with(Serdes.String(), customSerde))
.aggregate(
MyAggregate::new,
(key, data, aggregation) -> {
// You'll never reach this block due to an error similar to:
// ClassCastException while producing data to topic
return aggregation.updateFrom(shot);
},
Materialized.<String, MyAggregate> as(storeSupplier)
.withKeySerde(Serdes.String())
.withValueSerde(aggregateSerde)
)
因为我使用的是 Json 值,所以我没有设置默认的 serdes。
我处理了一个 KStream,使用必要的 spring 和产品 (json) serdes,但下一步(映射操作)失败了:
val props = Properties()
props[StreamsConfig.APPLICATION_ID_CONFIG] = applicationName
props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaBootstrapServers
val productSerde: Serde<Product> = Serdes.serdeFrom(JsonPojoSerializer<Product>(), JsonPojoDeserializer(Product::class.java))
builder.stream(INVENTORY_TOPIC, Consumed.with(Serdes.String(), productSerde))
.map { key, value ->
KeyValue(key, XXX)
}
.aggregate(...)
如果我删除地图操作,执行就会正常。
我还没有找到为 map() 指定 serdes 的方法,怎么办?
错误:
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.codependent.kafkastreams.inventory.dto.Product). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:92)
多个问题:
调用
map()
后调用groupByKey().aggregate()
。这会触发数据重新分区,因此在map()
数据被写入内部主题以进行数据重新分区之后。因此,您也需要在groupByKey()
内提供相应的Serde
。但是,因为您没有修改密钥,所以您实际上应该调用
mapValues()
,以避免不必要的重新分区。请注意,您需要为不应使用配置中的默认
Serde
的每个运算符提供Serde
s。Serde
s 不会沿下游传递,而是运算符就地覆盖。 (Kafka 2.1 正在改进这一点。)
如果有人像我一样登陆这里,这里有一个演示 Matthias 解决方案的实际示例。由于 ClassCastException,以下代码将失败。
builder.stream(
"my-topic",
Consumed.with(Serdes.String(), customSerde))
.map((k, v) -> {
// You've modified the key. Get ready for an error!
String newKey = k.split(":")[0];
return KeyValue.pair(newKey.toString(), v);
})
.groupByKey()
.aggregate(
MyAggregate::new,
(key, data, aggregation) -> {
// You'll never reach this block due to an error similar to:
// ClassCastException while producing data to topic
return aggregation.updateFrom(shot);
},
Materialized.<String, MyAggregate> as(storeSupplier)
.withKeySerde(Serdes.String())
.withValueSerde(aggregateSerde)
)
正如 Matthias 提到的,“您需要在 groupByKey 中提供相应的 Serdes”。方法如下:
builder.stream(
"my-topic",
Consumed.with(Serdes.String(), customSerde))
.map((k, v) -> {
// You've modified the key. Get ready for an error!
String newKey = k.split(":")[0];
return KeyValue.pair(newKey.toString(), v);
})
// Voila, you've specified the serdes required by the new internal
// repartition topic and you can carry on with your work
.groupByKey(Grouped.with(Serdes.String(), customSerde))
.aggregate(
MyAggregate::new,
(key, data, aggregation) -> {
// You'll never reach this block due to an error similar to:
// ClassCastException while producing data to topic
return aggregation.updateFrom(shot);
},
Materialized.<String, MyAggregate> as(storeSupplier)
.withKeySerde(Serdes.String())
.withValueSerde(aggregateSerde)
)