Spring Cloud Stream 中每个绑定的自定义密钥 serdes
Custom key serdes per binding in Spring Cloud Stream
我正在尝试了解如何在 Spring Cloud Stream 中使用不同的密钥 serializer/deserializer。我找到了指定全局 serdes 的方法,但我不明白如何为每个绑定指定不同的 serdes,以允许不同的键类型(整数、字符串等)。
例如,以下配置使用 属性 spring.kafka.consumer.keyDeserializer
和 spring.kafka.producer.keySerializer
指定全局密钥 serdes:
spring:
cloud:
stream:
bindings:
input:
contentType: application/*+avro
destination: user
group: my-group
output:
contentType: application/*+avro
destination: user
producer:
partition-count: 2
kafka:
binder:
brokers: default:9092
schemaRegistryClient:
endpoint: http://default:8081
kafka:
consumer:
keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
keySerializer: org.apache.kafka.common.serialization.StringSerializer
我可以使用以下代码片段使用和生成消息密钥:
// Consumer
@StreamListener(Sink.INPUT)
public void handle(@Payload UserValue user, @Headers Map<String, Object> headers,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println("Received: " + user + " with key: " + key + " and headers: " + headers);
}
// Producer
UserValue user = UserValue.newBuilder().setName("Alessandro").setSurname("Dionisi").build();
output.send(MessageBuilder.withPayload(user).setHeader(KafkaHeaders.MESSAGE_KEY, "1").build());
您可以通过 configuration
属性
spring.cloud.stream.kafka.bindings.<bindingName>.consumer.configuration.<whatever-kafka-property-name>=. . .
查看更多信息here
我正在尝试了解如何在 Spring Cloud Stream 中使用不同的密钥 serializer/deserializer。我找到了指定全局 serdes 的方法,但我不明白如何为每个绑定指定不同的 serdes,以允许不同的键类型(整数、字符串等)。
例如,以下配置使用 属性 spring.kafka.consumer.keyDeserializer
和 spring.kafka.producer.keySerializer
指定全局密钥 serdes:
spring:
cloud:
stream:
bindings:
input:
contentType: application/*+avro
destination: user
group: my-group
output:
contentType: application/*+avro
destination: user
producer:
partition-count: 2
kafka:
binder:
brokers: default:9092
schemaRegistryClient:
endpoint: http://default:8081
kafka:
consumer:
keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
keySerializer: org.apache.kafka.common.serialization.StringSerializer
我可以使用以下代码片段使用和生成消息密钥:
// Consumer
@StreamListener(Sink.INPUT)
public void handle(@Payload UserValue user, @Headers Map<String, Object> headers,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println("Received: " + user + " with key: " + key + " and headers: " + headers);
}
// Producer
UserValue user = UserValue.newBuilder().setName("Alessandro").setSurname("Dionisi").build();
output.send(MessageBuilder.withPayload(user).setHeader(KafkaHeaders.MESSAGE_KEY, "1").build());
您可以通过 configuration
属性
spring.cloud.stream.kafka.bindings.<bindingName>.consumer.configuration.<whatever-kafka-property-name>=. . .
查看更多信息here