聚合上使用了错误的序列化程序
Wrong serializers used on aggregate
我正在处理一个 kafka-streams 应用程序,我在其中处理日志事件。在这种情况下,我想将 WorkflowInput 类型聚合到一个 Workflow 类型中。我在聚合工作时遇到问题。
final KStream<String, WorkflowInput> filteredStream = someStream;
final KTable<String, Workflow> aggregatedWorkflows = filteredStream
.peek((k, v) -> {
if (!(v instanceof WorkflowInput)) {
throw new AssertionError("Type not expected");
}
})
.groupByKey()
.<Workflow>aggregate(Workflow::new, (k, input, workflow) -> workflow.updateFrom(input),
Materialized.<String, Workflow, KeyValueStore<Bytes, byte[]>>as("worflow-cache")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.serdeFrom(new JsonSerializer<Workflow>(), new JsonDeserializer<Workflow>(Workflow.class)));
我得到以下 exception:Caused 通过:org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: workflowauditstreamer.WorkflowInput).
需要注意两点:
* 值序列化器是一个 StringSerializer,而我使用 withValueSerde
配置了一些不同的东西。
* 实际值类型是 WorkflowInput
而我期望 Workflow
因为那是我的聚合值类型。
我是 kafka-streams 的新手,所以我可能遗漏了一些明显的东西,但我无法弄清楚。我在这里错过了什么?
如果您从配置中覆盖默认值 Serde
,它将在操作员就地覆盖中进行。它不会传播到下游(Kafka 2.0 —— 有改进的 WIP)。
因此,您还需要将在 someStream = builder.stream(...)
中使用的 Serde
传递给 .groupByKey(Serialized.with(...))
。
我正在处理一个 kafka-streams 应用程序,我在其中处理日志事件。在这种情况下,我想将 WorkflowInput 类型聚合到一个 Workflow 类型中。我在聚合工作时遇到问题。
final KStream<String, WorkflowInput> filteredStream = someStream;
final KTable<String, Workflow> aggregatedWorkflows = filteredStream
.peek((k, v) -> {
if (!(v instanceof WorkflowInput)) {
throw new AssertionError("Type not expected");
}
})
.groupByKey()
.<Workflow>aggregate(Workflow::new, (k, input, workflow) -> workflow.updateFrom(input),
Materialized.<String, Workflow, KeyValueStore<Bytes, byte[]>>as("worflow-cache")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.serdeFrom(new JsonSerializer<Workflow>(), new JsonDeserializer<Workflow>(Workflow.class)));
我得到以下 exception:Caused 通过:org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: workflowauditstreamer.WorkflowInput).
需要注意两点:
* 值序列化器是一个 StringSerializer,而我使用 withValueSerde
配置了一些不同的东西。
* 实际值类型是 WorkflowInput
而我期望 Workflow
因为那是我的聚合值类型。
我是 kafka-streams 的新手,所以我可能遗漏了一些明显的东西,但我无法弄清楚。我在这里错过了什么?
如果您从配置中覆盖默认值 Serde
,它将在操作员就地覆盖中进行。它不会传播到下游(Kafka 2.0 —— 有改进的 WIP)。
因此,您还需要将在 someStream = builder.stream(...)
中使用的 Serde
传递给 .groupByKey(Serialized.with(...))
。