如何为每个绑定设置 ValueSerde Spring Cloud Kafka Streams

How to set ValueSerde per Binding Spring Cloud Kafka Streams

我正在尝试为每个绑定设置 valueSerde,但是只考虑默认的 valueserde。

AppSerde class

public class AppSerdes {

    public static final class DepartmentSerde extends WrapperSerde<Department> {
        public DepartmentSerde() {
            super(new ProtobufSerializer<>(), new ProtobufDeserializer<>(Department.class));
        }
    }

    public static final class EmployeeSerde extends WrapperSerde<Employee> {
        public EmployeeSerde() {
            super(new ProtobufSerializer<>(), new ProtobufDeserializer<>(Employee.class));
        }
    }

    public static final class DepartmentDataSerde extends WrapperSerde<DepartmentData> {
        public DepartmentDataSerde() {
            super(new ProtobufSerializer<>(), new ProtobufDeserializer<>(DepartmentData.class));
        }
    }
}

StreamsConfig.java

@Configuration
@Slf4j
public class StreamsConfiguration {

    @Bean
    public BiFunction<KStream<String, Employee>, KStream<String, Department>, KStream<String, DepartmentData>> process() {
        return (Employee, Department) -> Employee.leftJoin(Department, (v1, v2) -> {
            if (v2 == null) {
                log.info("No Department is present");
                return null;
            } else {
                var data = DepartmentData.newBuilder();
                data.setId(v2.getId());
                data.setName(v2.getName());
                data.addEmployees(v1);
                return data.build();
            }
        }, JoinWindows.of(Duration.ofMinutes(1))).peek((k, v) -> {
            log.info("Key->{}, value->{}", k, v);
        });
    }
}

和application.yml

spring:
  application.name: kafka-join-example
spring.kafka.bootstrap-servers: 192.168.56.101:19092
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde: io.github.kprasad99.streams.AppSerdes$EmployeeSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde: io.github.kprasad99.streams.AppSerdes$DepartmentSerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde: io.github.kprasad99.streams.AppSerdes$DepartmentDataSerde
spring.cloud.stream:
  bindings:
    process-in-0:
      destination: kp.sch
      consumer:
        use-native-decoding: true
    process-in-1:
      destination: kp.lm
      consumer:
        use-native-decoding: true
    process-out-0:
      destination: kp.lm.sch
      producer:
        use-native-encoding: true
  kafka.streams.bindings:
    process-in-0:
      consumer:
        value.serde: io.github.kprasad99.streams.AppSerdes$EmployeeSerde
    process-in-1:
      consumer:
        value.serde: io.github.kprasad99.streams.AppSerdes$DepartmentSerde
    process-out-0:
      producer:
        value.serde: io.github.kprasad99.streams.AppSerdes$DepartmentDataSerde
  kafka.streams.binder:
    brokers:
    - 192.168.56.101:19092
#    replication-factor: 3
#    required-acks: 2
    min-partition-count: 5
    configuration:
      commit.interval.ms: 100
      default:
        key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        value.serde: io.github.kprasad99.streams.kafka.serde.ProtobufSerde

非常感谢任何对此的见解。


编辑

完整的示例代码是 here

堆栈跟踪2020-12-06 21:55:39.929 错误 141897 --- [-StreamThread-1]

o.a.k.s.p.i.AssignedStreamsTasks         : stream-thread [kafka-join-example-4aa10729-e5b7-4a33-89c6-906dd8ab2e5d-StreamThread-1] Failed to process stream task 1_4 due to the following error:

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_4, processor=KSTREAM-SOURCE-0000000001, topic=kp.department, partition=4, offset=1, stacktrace=org.apache.kafka.common.errors.SerializationException: java.lang.InstantiationException: No target type provided
Caused by: java.lang.InstantiationException: No target type provided
    at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:45)
    at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:1)
    at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160)
    at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:56)
    at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:26)
    at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:100)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process(StreamTask.java:383)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)

    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:400) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.1.jar:na]
Caused by: org.apache.kafka.common.errors.SerializationException: java.lang.InstantiationException: No target type provided
Caused by: java.lang.InstantiationException: No target type provided
    at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:45) ~[classes/:na]
    at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:1) ~[classes/:na]
    at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:56) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:26) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:100) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process(StreamTask.java:383) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.1.jar:na]

2020-12-06 21:55:39.930 ERROR 141897 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [kafka-join-example-4aa10729-e5b7-4a33-89c6-906dd8ab2e5d-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_4, processor=KSTREAM-SOURCE-0000000001, topic=kp.department, partition=4, offset=1, stacktrace=org.apache.kafka.common.errors.SerializationException: java.lang.InstantiationException: No target type provided
Caused by: java.lang.InstantiationException: No target type provided
    at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:45)
    at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:1)
    at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160)
    at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:56)
    at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:26)
    at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:100)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process(StreamTask.java:383)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)

    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:400) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.1.jar:na]
Caused by: org.apache.kafka.common.errors.SerializationException: java.lang.InstantiationException: No target type provided
Caused by: java.lang.InstantiationException: No target type provided
    at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:45) ~[classes/:na]
    at io.github.kprasad99.streams.protobuf.serialization.ProtobufDeserializer.deserialize(ProtobufDeserializer.java:1) ~[classes/:na]
    at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:56) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:26) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:100) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process(StreamTask.java:383) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.1.jar:na]

我稍微清理了你的配置,应该可以。如果没有,请创建一个小示例应用程序并分享,然后我们可以进一步研究。

spring:
  application.name: kafka-join-example
spring.kafka.bootstrap-servers: 192.168.56.101:19092
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde: io.github.kprasad99.streams.AppSerdes$EmployeeSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde: io.github.kprasad99.streams.AppSerdes$DepartmentSerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde: io.github.kprasad99.streams.AppSerdes$DepartmentDataSerde
spring.cloud.stream:
  bindings:
    process-in-0:
      destination: kp.sch
    process-in-1:
      destination: kp.lm
    process-out-0:
      destination: kp.lm.sch 
  kafka.streams.binder:
    brokers:
    - 192.168.56.101:19092
#    replication-factor: 3
#    required-acks: 2
    min-partition-count: 5
    configuration:
      commit.interval.ms: 100
      default:
        key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        value.serde: io.github.kprasad99.streams.kafka.serde.ProtobufSerde

您还可以在您的应用程序中定义这些 bean,如下所示。

@Bean
public Serde<Department> departmentSerde() {
  return new DepartmentSerde();
}

// add the other two Serde beans.

如果您在应用程序中定义这些 Serde bean,那么您不需要配置中的 3 个对应的 valueSerde 属性,因为 bean 具有优先级。