在 spring 云流中为 kafka 设置密钥 Serde

Set key Serde in spring cloud streams for kafka

我正在尝试使用 kafka 设置一个 spring 云流项目。一切都按预期工作,但关键 de/serialization。这些文件是 pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.8</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>cloud-stream</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>cloud-stream</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>11</java.version>
        <spring-cloud.version>2020.0.5</spring-cloud.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <scope>test</scope>
            <classifier>test-binder</classifier>
            <type>test-jar</type>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

主要消费者class

package com.example;

import java.util.function.Consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SpringBootApplication
public class App {

    private Logger logger = LoggerFactory.getLogger(App.class);

    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }

    @Bean
    public Consumer<Message<ChatMessage>> consumeString(){
        return message -> {
            MessageHeaders headers = message.getHeaders();
            logger.info("sup");
        };
    }

    @Bean
    public Consumer<Message<String>> consumeString2(){
        return message -> {
            MessageHeaders headers = message.getHeaders();
            logger.info("sup");
        };
    }
}

一个使用 StreamBridge 按需发送 kafka 消息的控制器(注意 streamBridge 正在尝试将消息密钥 header 作为字符串发送)

package com.example;

import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class Controller {

    private StreamBridge streamBridge;

    public Controller(StreamBridge streamBridge){
        this.streamBridge = streamBridge;
    }

    @PostMapping("/sendMessage/message")
    public String publishMessageString(@RequestBody ChatMessage payload) {
        streamBridge.send("output-out-0",
            MessageBuilder.withPayload(payload)
                .setHeader(KafkaHeaders.MESSAGE_KEY, "message")
                .build());
        return "Success";
    }

}

模特class

package com.example;

public class ChatMessage {

    private String contents;
    private long time;

    public ChatMessage() {

    }

    public ChatMessage(String contents, long time) {

        this.contents = contents;
        this.time = time;
    }

    public String getContents() {

        return contents;
    }

    public long getTime() {

        return time;
    }

    public void setTime(long time) {

        this.time = time;
    }

    @Override
    public String toString() {

        return "ChatMessage [contents=" + contents + ", time=" + time + "]";
    }

}

最后是 application.yml

spring:
  cloud:
    stream:
      function:
        definition: consumeString;consumeString2
      binders:
        binder1:
          type: kafka
          environment.spring.cloud.stream.kafka.streams.binder:
            brokers:
              - localhost:9092
        binder2:
          type: kafka
          environment.spring.cloud.stream.kafka.streams.binder:
            brokers:
              - localhost:9092
      kafka:
        streams:
          binder:
            configuration:
              default:
                key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

      bindings:
        consumeString-in-0:
          binder: binder1
          destination: test
          group: input-group-1
        consumeString2-in-0:
          binder: binder2
          destination: test2
          group: input-group-2
        output-out-0:
          binder: binder1
          destination: test

当我尝试通过控制器发送消息时,出现以下错误:

java.lang.ClassCastException: class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19) ~[kafka-clients-2.7.2.jar:na]
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-2.7.2.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:918) ~[kafka-clients-2.7.2.jar:na]

问题似乎是 kafka 生产者和消费者不是在做一个字符串 de/serialization,而是一个字节数组 de/serialization。如何更改默认 de/serializer?

您可以在联编程序或绑定级别覆盖默认序列化程序。例如对于特定绑定:

spring:
  cloud:
    stream:
      kafka:
        bindings:
          output-out-0:
            producer:
              configuration:
                "[key.serializer]": ...

https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.2.1/reference/html/spring-cloud-stream-binder-kafka.html#kafka-producer-properties