如何为每个绑定设置 ValueSerde Spring Cloud Kafka Streams
How to set ValueSerde per Binding Spring Cloud Kafka Streams
我正在尝试为每个绑定设置 valueSerde,但是只考虑默认的 valueserde。
AppSerde class
public class AppSerdes {
public static final class DepartmentSerde extends WrapperSerde<Department> {
public DepartmentSerde() {
super(new ProtobufSerializer<>(), new ProtobufDeserializer<>(Department.class));
}
}
public static final class EmployeeSerde extends WrapperSerde<Employee> {
public EmployeeSerde() {
super(new ProtobufSerializer<>(), new ProtobufDeserializer<>(Employee.class));
}
}
public static final class DepartmentDataSerde extends WrapperSerde<DepartmentData> {
public DepartmentDataSerde() {
super(new ProtobufSerializer<>(), new ProtobufDeserializer<>(DepartmentData.class));
}
}
}
StreamsConfig.java
@Configuration
@Slf4j
public class StreamsConfiguration {
@Bean
public BiFunction<KStream<String, Employee>, KStream<String, Department>, KStream<String, DepartmentData>> process() {
return (Employee, Department) -> Employee.leftJoin(Department, (v1, v2) -> {
if (v2 == null) {
log.info("No Department is present");
return null;
} else {
var data = DepartmentData.newBuilder();
data.setId(v2.getId());
data.setName(v2.getName());
data.addEmployees(v1);
return data.build();
}
}, JoinWindows.of(Duration.ofMinutes(1))).peek((k, v) -> {
log.info("Key->{}, value->{}", k, v);
});
}
}
和application.yml
spring:
application.name: kafka-join-example
spring.kafka.bootstrap-servers: 192.168.56.101:19092
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde: io.github.kprasad99.streams.AppSerdes$EmployeeSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde: io.github.kprasad99.streams.AppSerdes$DepartmentSerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde: io.github.kprasad99.streams.AppSerdes$DepartmentDataSerde
spring.cloud.stream:
bindings:
process-in-0:
destination: kp.sch
consumer:
use-native-decoding: true
process-in-1:
destination: kp.lm
consumer:
use-native-decoding: true
process-out-0:
destination: kp.lm.sch
producer:
use-native-encoding: true
kafka.streams.bindings:
process-in-0:
consumer:
value.serde: io.github.kprasad99.streams.AppSerdes$EmployeeSerde
process-in-1:
consumer:
value.serde: io.github.kprasad99.streams.AppSerdes$DepartmentSerde
process-out-0:
producer:
value.serde: io.github.kprasad99.streams.AppSerdes$DepartmentDataSerde
kafka.streams.binder:
brokers:
- 192.168.56.101:19092
# replication-factor: 3
# required-acks: 2
min-partition-count: 5
configuration:
commit.interval.ms: 100
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: io.github.kprasad99.streams.kafka.serde.ProtobufSerde
非常感谢任何对此的见解。
编辑
完整的示例代码是 here
堆栈跟踪2020-12-06 21:55:39.929 错误 141897 --- [-StreamThread-1]
o.a.k.s.p.i.AssignedStreamsTasks : stream-thread [kafka-join-example-4aa10729-e5b7-4a33-89c6-906dd8ab2e5d-StreamThread-1] Failed to process stream task 1_4 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_4, processor=KSTREAM-SOURCE-0000000001, topic=kp.department, partition=4, offset=1, stacktrace=org.apache.kafka.common.errors.SerializationException: java.lang.InstantiationException: No target type provided
Caused by: java.lang.InstantiationException: No target type provided
at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:45)
at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:1)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160)
at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:56)
at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:26)
at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:100)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process(StreamTask.java:383)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:400) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.1.jar:na]
Caused by: org.apache.kafka.common.errors.SerializationException: java.lang.InstantiationException: No target type provided
Caused by: java.lang.InstantiationException: No target type provided
at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:45) ~[classes/:na]
at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:1) ~[classes/:na]
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:56) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:26) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:100) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process(StreamTask.java:383) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.1.jar:na]
2020-12-06 21:55:39.930 ERROR 141897 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [kafka-join-example-4aa10729-e5b7-4a33-89c6-906dd8ab2e5d-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_4, processor=KSTREAM-SOURCE-0000000001, topic=kp.department, partition=4, offset=1, stacktrace=org.apache.kafka.common.errors.SerializationException: java.lang.InstantiationException: No target type provided
Caused by: java.lang.InstantiationException: No target type provided
at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:45)
at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:1)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160)
at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:56)
at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:26)
at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:100)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process(StreamTask.java:383)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:400) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.1.jar:na]
Caused by: org.apache.kafka.common.errors.SerializationException: java.lang.InstantiationException: No target type provided
Caused by: java.lang.InstantiationException: No target type provided
at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:45) ~[classes/:na]
at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:1) ~[classes/:na]
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:56) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:26) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:100) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process(StreamTask.java:383) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.1.jar:na]
我稍微清理了你的配置,应该可以。如果没有,请创建一个小示例应用程序并分享,然后我们可以进一步研究。
spring:
application.name: kafka-join-example
spring.kafka.bootstrap-servers: 192.168.56.101:19092
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde: io.github.kprasad99.streams.AppSerdes$EmployeeSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde: io.github.kprasad99.streams.AppSerdes$DepartmentSerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde: io.github.kprasad99.streams.AppSerdes$DepartmentDataSerde
spring.cloud.stream:
bindings:
process-in-0:
destination: kp.sch
process-in-1:
destination: kp.lm
process-out-0:
destination: kp.lm.sch
kafka.streams.binder:
brokers:
- 192.168.56.101:19092
# replication-factor: 3
# required-acks: 2
min-partition-count: 5
configuration:
commit.interval.ms: 100
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: io.github.kprasad99.streams.kafka.serde.ProtobufSerde
您还可以在您的应用程序中定义这些 bean,如下所示。
@Bean
public Serde<Department> departmentSerde() {
return new DepartmentSerde();
}
// add the other two Serde beans.
如果您在应用程序中定义这些 Serde
bean,那么您不需要配置中的 3 个对应的 valueSerde
属性,因为 bean 具有优先级。
我正在尝试为每个绑定设置 valueSerde,但是只考虑默认的 valueserde。
AppSerde class
public class AppSerdes {
public static final class DepartmentSerde extends WrapperSerde<Department> {
public DepartmentSerde() {
super(new ProtobufSerializer<>(), new ProtobufDeserializer<>(Department.class));
}
}
public static final class EmployeeSerde extends WrapperSerde<Employee> {
public EmployeeSerde() {
super(new ProtobufSerializer<>(), new ProtobufDeserializer<>(Employee.class));
}
}
public static final class DepartmentDataSerde extends WrapperSerde<DepartmentData> {
public DepartmentDataSerde() {
super(new ProtobufSerializer<>(), new ProtobufDeserializer<>(DepartmentData.class));
}
}
}
StreamsConfig.java
@Configuration
@Slf4j
public class StreamsConfiguration {
@Bean
public BiFunction<KStream<String, Employee>, KStream<String, Department>, KStream<String, DepartmentData>> process() {
return (Employee, Department) -> Employee.leftJoin(Department, (v1, v2) -> {
if (v2 == null) {
log.info("No Department is present");
return null;
} else {
var data = DepartmentData.newBuilder();
data.setId(v2.getId());
data.setName(v2.getName());
data.addEmployees(v1);
return data.build();
}
}, JoinWindows.of(Duration.ofMinutes(1))).peek((k, v) -> {
log.info("Key->{}, value->{}", k, v);
});
}
}
和application.yml
spring:
application.name: kafka-join-example
spring.kafka.bootstrap-servers: 192.168.56.101:19092
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde: io.github.kprasad99.streams.AppSerdes$EmployeeSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde: io.github.kprasad99.streams.AppSerdes$DepartmentSerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde: io.github.kprasad99.streams.AppSerdes$DepartmentDataSerde
spring.cloud.stream:
bindings:
process-in-0:
destination: kp.sch
consumer:
use-native-decoding: true
process-in-1:
destination: kp.lm
consumer:
use-native-decoding: true
process-out-0:
destination: kp.lm.sch
producer:
use-native-encoding: true
kafka.streams.bindings:
process-in-0:
consumer:
value.serde: io.github.kprasad99.streams.AppSerdes$EmployeeSerde
process-in-1:
consumer:
value.serde: io.github.kprasad99.streams.AppSerdes$DepartmentSerde
process-out-0:
producer:
value.serde: io.github.kprasad99.streams.AppSerdes$DepartmentDataSerde
kafka.streams.binder:
brokers:
- 192.168.56.101:19092
# replication-factor: 3
# required-acks: 2
min-partition-count: 5
configuration:
commit.interval.ms: 100
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: io.github.kprasad99.streams.kafka.serde.ProtobufSerde
非常感谢任何对此的见解。
编辑
完整的示例代码是 here
堆栈跟踪2020-12-06 21:55:39.929 错误 141897 --- [-StreamThread-1]
o.a.k.s.p.i.AssignedStreamsTasks : stream-thread [kafka-join-example-4aa10729-e5b7-4a33-89c6-906dd8ab2e5d-StreamThread-1] Failed to process stream task 1_4 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_4, processor=KSTREAM-SOURCE-0000000001, topic=kp.department, partition=4, offset=1, stacktrace=org.apache.kafka.common.errors.SerializationException: java.lang.InstantiationException: No target type provided
Caused by: java.lang.InstantiationException: No target type provided
at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:45)
at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:1)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160)
at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:56)
at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:26)
at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:100)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process(StreamTask.java:383)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:400) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.1.jar:na]
Caused by: org.apache.kafka.common.errors.SerializationException: java.lang.InstantiationException: No target type provided
Caused by: java.lang.InstantiationException: No target type provided
at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:45) ~[classes/:na]
at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:1) ~[classes/:na]
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:56) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:26) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:100) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process(StreamTask.java:383) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.1.jar:na]
2020-12-06 21:55:39.930 ERROR 141897 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [kafka-join-example-4aa10729-e5b7-4a33-89c6-906dd8ab2e5d-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_4, processor=KSTREAM-SOURCE-0000000001, topic=kp.department, partition=4, offset=1, stacktrace=org.apache.kafka.common.errors.SerializationException: java.lang.InstantiationException: No target type provided
Caused by: java.lang.InstantiationException: No target type provided
at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:45)
at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:1)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160)
at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:56)
at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:26)
at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:100)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process(StreamTask.java:383)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:400) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.1.jar:na]
Caused by: org.apache.kafka.common.errors.SerializationException: java.lang.InstantiationException: No target type provided
Caused by: java.lang.InstantiationException: No target type provided
at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:45) ~[classes/:na]
at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:1) ~[classes/:na]
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:56) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:26) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:100) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process(StreamTask.java:383) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.1.jar:na]
我稍微清理了你的配置,应该可以。如果没有,请创建一个小示例应用程序并分享,然后我们可以进一步研究。
spring:
application.name: kafka-join-example
spring.kafka.bootstrap-servers: 192.168.56.101:19092
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde: io.github.kprasad99.streams.AppSerdes$EmployeeSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde: io.github.kprasad99.streams.AppSerdes$DepartmentSerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde: io.github.kprasad99.streams.AppSerdes$DepartmentDataSerde
spring.cloud.stream:
bindings:
process-in-0:
destination: kp.sch
process-in-1:
destination: kp.lm
process-out-0:
destination: kp.lm.sch
kafka.streams.binder:
brokers:
- 192.168.56.101:19092
# replication-factor: 3
# required-acks: 2
min-partition-count: 5
configuration:
commit.interval.ms: 100
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: io.github.kprasad99.streams.kafka.serde.ProtobufSerde
您还可以在您的应用程序中定义这些 bean,如下所示。
@Bean
public Serde<Department> departmentSerde() {
return new DepartmentSerde();
}
// add the other two Serde beans.
如果您在应用程序中定义这些 Serde
bean,那么您不需要配置中的 3 个对应的 valueSerde
属性,因为 bean 具有优先级。