在 Spring Cloud Stream 上使用自定义 serde 序列化聚合状态存储时出错
Error while serializing aggregate state store with custom serde on Spring Cloud Stream
我正在尝试使用 Spring Cloud Stream 创建一个简单的功能 bean,它处理来自 KStream 和 GlobalKTable 的消息,加入它们,聚合它们,并将结果输出到一个新的流,但我'我在正确配置它所需的 serdes 时遇到困难。
事不宜迟,这是我的方法:
@Bean
public BiFunction<KStream<GenericRecord, GenericRecord>, GlobalKTable<Long, GenericRecord>, KStream<String, MyCustomJavaClass>> joinAndAggregate() {
return (stream, table) -> stream
.join(table,
(streamKey, streamValue) -> (Long) streamValue.get("something"),
(streamValue, tableValue) -> {
return new MyCustomJavaClass(streamValue, tableValue);
}).selectKey(((key, value) -> (Long) key.get("id")))
.groupBy((key, value) -> value.getKey(), Grouped.with(Serdes.String(), new MyCustomSerde()))
.aggregate(() -> {
return new MyCustomJavaClass();
}, (key, value, aggregatedValue) -> {
// aggregation logic
return new MyCustomJavaClass(aggregatedData);
}).toStream()
.peek((k, v) -> {
if (v == null)
log.warn("No value for key:\n" + k.toString() + "\n");
else
log.info("Aggregated result with key:\n" + k + "\nvalue:\n" + v.toString() + "\n");
});
}
static public final class MyCustomSerde extends JsonSerde<MyCustomJavaClass> { }
这是我的属性文件中的配置:
spring.application.name: test-application
spring.cloud.stream.kafka.binder.brokers: kafka-svc:9092
spring.kafka.properties.schema.registry.url: http://schema-registry-svc:8081
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.function.definition: joinAndAggregate
spring.cloud.stream.bindings.joinAndAggregate-in-0.destination: input-stream
spring.cloud.stream.bindings.joinAndAggregate-in-1.destination: input-global-ktable
spring.cloud.stream.bindings.joinAndAggregate-out-0.destination: aggregate-output
# Serdes
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-0.consumer.application-id: joinAndAggregate-in-0-v0.1.0
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-0.consumer.key-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-0.consumer.value-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-1.consumer.application-id: joinAndAggregate-in-1-v0.1.0
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-1.consumer.value-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-out-0.producer.value-serde: com.package.MyClass$MyCustomSerde
当我运行上面的代码时,我得到以下错误:
Failed to process stream task 2_0 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=2_0, processor=KSTREAM-SOURCE-0000000011, topic=joinAndAggregate-in-0-v0.1.0-KSTREAM-AGGREGATE-STATE-STORE-0000000007-repartition, partition=0, offset=0, stacktrace=org.apache.kafka.streams.errors.StreamsException:
A serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual value type (value type: com.package.model.MyCustomJavaClass). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:204)
... <omitting some lines here> ...
Caused by: java.lang.ClassCastException: class com.package.model.MyCustomJavaClass cannot be cast to class [B (com.package.model.MyCustomJavaClass is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
class com.package.model.MyCustomJavaClass 驻留在与定义功能流方法的 MyClass 不同的包中。这可能是问题所在吗?
我还验证了可以使用您在上面看到的自定义 serde (MyCustomSerde) 正确地序列化和反序列化 MyCustomJavaClass。这只是一个简单的 serde 扩展 JsonSerde。
我能够使用我在此处省略的其他功能方法处理输入和输出中使用 MyCustomSerde 序列化的值的消息,因此我使用的序列化器和自定义 java class 不是问题。不知何故,只有聚合状态存储流与我的自定义 serde 有问题,我无法通过查看示例和文档找到解决该问题的方法。
我做错了什么?
提前致谢!
当您看到如下错误时:
serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual value type (value type: com.package.model.MyCustomJavaClass). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
这意味着 Kafka Streams 使用的(反)序列化器与所提供的类型不匹配。在这种情况下,Kafka Streams 使用了 Serdes.ByteArraySerde
的默认序列化器。
如果您更新 aggregate
方法并添加第三个参数 Materialized.with(Serdes.String(), new MyCustomSerde())
,那么您的应用程序应该可以避免此错误。
.aggregate(() -> {
return new MyCustomJavaClass();
}, (key, value, aggregatedValue) -> {
// aggregation logic
return new MyCustomJavaClass(aggregatedData);
}, Materialized.with(Serdes.String(), new MyCustomSerde()))
告诉我进展如何。
-比尔
我正在尝试使用 Spring Cloud Stream 创建一个简单的功能 bean,它处理来自 KStream 和 GlobalKTable 的消息,加入它们,聚合它们,并将结果输出到一个新的流,但我'我在正确配置它所需的 serdes 时遇到困难。
事不宜迟,这是我的方法:
@Bean
public BiFunction<KStream<GenericRecord, GenericRecord>, GlobalKTable<Long, GenericRecord>, KStream<String, MyCustomJavaClass>> joinAndAggregate() {
return (stream, table) -> stream
.join(table,
(streamKey, streamValue) -> (Long) streamValue.get("something"),
(streamValue, tableValue) -> {
return new MyCustomJavaClass(streamValue, tableValue);
}).selectKey(((key, value) -> (Long) key.get("id")))
.groupBy((key, value) -> value.getKey(), Grouped.with(Serdes.String(), new MyCustomSerde()))
.aggregate(() -> {
return new MyCustomJavaClass();
}, (key, value, aggregatedValue) -> {
// aggregation logic
return new MyCustomJavaClass(aggregatedData);
}).toStream()
.peek((k, v) -> {
if (v == null)
log.warn("No value for key:\n" + k.toString() + "\n");
else
log.info("Aggregated result with key:\n" + k + "\nvalue:\n" + v.toString() + "\n");
});
}
static public final class MyCustomSerde extends JsonSerde<MyCustomJavaClass> { }
这是我的属性文件中的配置:
spring.application.name: test-application
spring.cloud.stream.kafka.binder.brokers: kafka-svc:9092
spring.kafka.properties.schema.registry.url: http://schema-registry-svc:8081
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.function.definition: joinAndAggregate
spring.cloud.stream.bindings.joinAndAggregate-in-0.destination: input-stream
spring.cloud.stream.bindings.joinAndAggregate-in-1.destination: input-global-ktable
spring.cloud.stream.bindings.joinAndAggregate-out-0.destination: aggregate-output
# Serdes
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-0.consumer.application-id: joinAndAggregate-in-0-v0.1.0
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-0.consumer.key-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-0.consumer.value-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-1.consumer.application-id: joinAndAggregate-in-1-v0.1.0
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-1.consumer.value-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-out-0.producer.value-serde: com.package.MyClass$MyCustomSerde
当我运行上面的代码时,我得到以下错误:
Failed to process stream task 2_0 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=2_0, processor=KSTREAM-SOURCE-0000000011, topic=joinAndAggregate-in-0-v0.1.0-KSTREAM-AGGREGATE-STATE-STORE-0000000007-repartition, partition=0, offset=0, stacktrace=org.apache.kafka.streams.errors.StreamsException:
A serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual value type (value type: com.package.model.MyCustomJavaClass). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:204)
... <omitting some lines here> ...
Caused by: java.lang.ClassCastException: class com.package.model.MyCustomJavaClass cannot be cast to class [B (com.package.model.MyCustomJavaClass is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
class com.package.model.MyCustomJavaClass 驻留在与定义功能流方法的 MyClass 不同的包中。这可能是问题所在吗?
我还验证了可以使用您在上面看到的自定义 serde (MyCustomSerde) 正确地序列化和反序列化 MyCustomJavaClass。这只是一个简单的 serde 扩展 JsonSerde。 我能够使用我在此处省略的其他功能方法处理输入和输出中使用 MyCustomSerde 序列化的值的消息,因此我使用的序列化器和自定义 java class 不是问题。不知何故,只有聚合状态存储流与我的自定义 serde 有问题,我无法通过查看示例和文档找到解决该问题的方法。
我做错了什么?
提前致谢!
当您看到如下错误时:
serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual value type (value type: com.package.model.MyCustomJavaClass). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
这意味着 Kafka Streams 使用的(反)序列化器与所提供的类型不匹配。在这种情况下,Kafka Streams 使用了 Serdes.ByteArraySerde
的默认序列化器。
如果您更新 aggregate
方法并添加第三个参数 Materialized.with(Serdes.String(), new MyCustomSerde())
,那么您的应用程序应该可以避免此错误。
.aggregate(() -> {
return new MyCustomJavaClass();
}, (key, value, aggregatedValue) -> {
// aggregation logic
return new MyCustomJavaClass(aggregatedData);
}, Materialized.with(Serdes.String(), new MyCustomSerde()))
告诉我进展如何。
-比尔