如何使用 Spring Cloud Stream Supplier 将键控消息发送到 Kafka
How to send keyed message to Kafka using Spring Cloud Stream Supplier
我想使用 Spring Cloud Stream 向 Kafka 生成键控(具有特定键的消息)消息。
@SpringBootApplication
public class SpringCloudStreamKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamKafkaApplication.class, args);
}
@Bean
Supplier<DataRecord> process(){
return () -> new DataRecord(42L);
}
}
我需要在供应商代码中更改什么以提供密钥?
API 的新样式是否可能(使用 lambda)?
谢谢
Return a Message<?>
并设置 KafkaHeaders.MESSAGE_KEY
header:
@Bean
Supplier<Message<String>> process() {
return () -> MessageBuilder.withPayload("foo")
.setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes())
.build();
}
(假定默认密钥序列化器(byte[])。
编辑
这会被无限调用。
如果你想发送有限流,我相信你必须切换到反应模型。
@Bean
Supplier<Flux<Message<String>>> processFinite() {
Message<String> msg1 = MessageBuilder.withPayload("foo")
.setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes())
.build();
Message<String> msg2 = MessageBuilder.withPayload("baz")
.setHeader(KafkaHeaders.MESSAGE_KEY, "qux".getBytes())
.build();
return () -> {
return Flux.just(msg1, msg2);
};
}
还有Flux.fromStream(myStream)
.
这将在流的末尾结束。
EDIT2
您也可以使用 StreamBridge
。
我想使用 Spring Cloud Stream 向 Kafka 生成键控(具有特定键的消息)消息。
@SpringBootApplication
public class SpringCloudStreamKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamKafkaApplication.class, args);
}
@Bean
Supplier<DataRecord> process(){
return () -> new DataRecord(42L);
}
}
我需要在供应商代码中更改什么以提供密钥? API 的新样式是否可能(使用 lambda)?
谢谢
Return a Message<?>
并设置 KafkaHeaders.MESSAGE_KEY
header:
@Bean
Supplier<Message<String>> process() {
return () -> MessageBuilder.withPayload("foo")
.setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes())
.build();
}
(假定默认密钥序列化器(byte[])。
编辑
这会被无限调用。
如果你想发送有限流,我相信你必须切换到反应模型。
@Bean
Supplier<Flux<Message<String>>> processFinite() {
Message<String> msg1 = MessageBuilder.withPayload("foo")
.setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes())
.build();
Message<String> msg2 = MessageBuilder.withPayload("baz")
.setHeader(KafkaHeaders.MESSAGE_KEY, "qux".getBytes())
.build();
return () -> {
return Flux.just(msg1, msg2);
};
}
还有Flux.fromStream(myStream)
.
这将在流的末尾结束。
EDIT2
您也可以使用 StreamBridge
。