在 spring 云流中为 kafka 设置密钥 Serde
Set key Serde in spring cloud streams for kafka
我正在尝试使用 kafka 设置一个 spring 云流项目。一切都按预期工作,但关键 de/serialization。这些文件是 pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.8</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>cloud-stream</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>cloud-stream</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
<spring-cloud.version>2020.0.5</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<scope>test</scope>
<classifier>test-binder</classifier>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
主要消费者class
package com.example;
import java.util.function.Consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SpringBootApplication
public class App {
private Logger logger = LoggerFactory.getLogger(App.class);
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
@Bean
public Consumer<Message<ChatMessage>> consumeString(){
return message -> {
MessageHeaders headers = message.getHeaders();
logger.info("sup");
};
}
@Bean
public Consumer<Message<String>> consumeString2(){
return message -> {
MessageHeaders headers = message.getHeaders();
logger.info("sup");
};
}
}
一个使用 StreamBridge 按需发送 kafka 消息的控制器(注意 streamBridge 正在尝试将消息密钥 header 作为字符串发送)
package com.example;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class Controller {
private StreamBridge streamBridge;
public Controller(StreamBridge streamBridge){
this.streamBridge = streamBridge;
}
@PostMapping("/sendMessage/message")
public String publishMessageString(@RequestBody ChatMessage payload) {
streamBridge.send("output-out-0",
MessageBuilder.withPayload(payload)
.setHeader(KafkaHeaders.MESSAGE_KEY, "message")
.build());
return "Success";
}
}
模特class
package com.example;
public class ChatMessage {
private String contents;
private long time;
public ChatMessage() {
}
public ChatMessage(String contents, long time) {
this.contents = contents;
this.time = time;
}
public String getContents() {
return contents;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
@Override
public String toString() {
return "ChatMessage [contents=" + contents + ", time=" + time + "]";
}
}
最后是 application.yml
spring:
cloud:
stream:
function:
definition: consumeString;consumeString2
binders:
binder1:
type: kafka
environment.spring.cloud.stream.kafka.streams.binder:
brokers:
- localhost:9092
binder2:
type: kafka
environment.spring.cloud.stream.kafka.streams.binder:
brokers:
- localhost:9092
kafka:
streams:
binder:
configuration:
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
consumeString-in-0:
binder: binder1
destination: test
group: input-group-1
consumeString2-in-0:
binder: binder2
destination: test2
group: input-group-2
output-out-0:
binder: binder1
destination: test
当我尝试通过控制器发送消息时,出现以下错误:
java.lang.ClassCastException: class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19) ~[kafka-clients-2.7.2.jar:na]
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-2.7.2.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:918) ~[kafka-clients-2.7.2.jar:na]
问题似乎是 kafka 生产者和消费者不是在做一个字符串 de/serialization,而是一个字节数组 de/serialization。如何更改默认 de/serializer?
您可以在联编程序或绑定级别覆盖默认序列化程序。例如对于特定绑定:
spring:
cloud:
stream:
kafka:
bindings:
output-out-0:
producer:
configuration:
"[key.serializer]": ...
我正在尝试使用 kafka 设置一个 spring 云流项目。一切都按预期工作,但关键 de/serialization。这些文件是 pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.8</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>cloud-stream</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>cloud-stream</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
<spring-cloud.version>2020.0.5</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<scope>test</scope>
<classifier>test-binder</classifier>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
主要消费者class
package com.example;
import java.util.function.Consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SpringBootApplication
public class App {
private Logger logger = LoggerFactory.getLogger(App.class);
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
@Bean
public Consumer<Message<ChatMessage>> consumeString(){
return message -> {
MessageHeaders headers = message.getHeaders();
logger.info("sup");
};
}
@Bean
public Consumer<Message<String>> consumeString2(){
return message -> {
MessageHeaders headers = message.getHeaders();
logger.info("sup");
};
}
}
一个使用 StreamBridge 按需发送 kafka 消息的控制器(注意 streamBridge 正在尝试将消息密钥 header 作为字符串发送)
package com.example;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class Controller {
private StreamBridge streamBridge;
public Controller(StreamBridge streamBridge){
this.streamBridge = streamBridge;
}
@PostMapping("/sendMessage/message")
public String publishMessageString(@RequestBody ChatMessage payload) {
streamBridge.send("output-out-0",
MessageBuilder.withPayload(payload)
.setHeader(KafkaHeaders.MESSAGE_KEY, "message")
.build());
return "Success";
}
}
模特class
package com.example;
public class ChatMessage {
private String contents;
private long time;
public ChatMessage() {
}
public ChatMessage(String contents, long time) {
this.contents = contents;
this.time = time;
}
public String getContents() {
return contents;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
@Override
public String toString() {
return "ChatMessage [contents=" + contents + ", time=" + time + "]";
}
}
最后是 application.yml
spring:
cloud:
stream:
function:
definition: consumeString;consumeString2
binders:
binder1:
type: kafka
environment.spring.cloud.stream.kafka.streams.binder:
brokers:
- localhost:9092
binder2:
type: kafka
environment.spring.cloud.stream.kafka.streams.binder:
brokers:
- localhost:9092
kafka:
streams:
binder:
configuration:
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
consumeString-in-0:
binder: binder1
destination: test
group: input-group-1
consumeString2-in-0:
binder: binder2
destination: test2
group: input-group-2
output-out-0:
binder: binder1
destination: test
当我尝试通过控制器发送消息时,出现以下错误:
java.lang.ClassCastException: class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19) ~[kafka-clients-2.7.2.jar:na]
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-2.7.2.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:918) ~[kafka-clients-2.7.2.jar:na]
问题似乎是 kafka 生产者和消费者不是在做一个字符串 de/serialization,而是一个字节数组 de/serialization。如何更改默认 de/serializer?
您可以在联编程序或绑定级别覆盖默认序列化程序。例如对于特定绑定:
spring:
cloud:
stream:
kafka:
bindings:
output-out-0:
producer:
configuration:
"[key.serializer]": ...