如何使用 Spring Cloud Stream Supplier 将键控消息发送到 Kafka

How to send keyed message to Kafka using Spring Cloud Stream Supplier

我想使用 Spring Cloud Stream 向 Kafka 生成键控(具有特定键的消息)消息。

@SpringBootApplication
public class SpringCloudStreamKafkaApplication {

  public static void main(String[] args) {
    SpringApplication.run(SpringCloudStreamKafkaApplication.class, args);
  }

  @Bean
  Supplier<DataRecord> process(){
    return () -> new DataRecord(42L);
  }

}

我需要在供应商代码中更改什么以提供密钥? API 的新样式是否可能(使用 lambda)?

谢谢

Return a Message<?> 并设置 KafkaHeaders.MESSAGE_KEY header:

@Bean
Supplier<Message<String>> process() {
    return () -> MessageBuilder.withPayload("foo")
            .setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes())
            .build();
}

(假定默认密钥序列化器(byte[])。

编辑

这会被无限调用。

如果你想发送有限流,我相信你必须切换到反应模型。

@Bean
Supplier<Flux<Message<String>>> processFinite() {
    Message<String> msg1 = MessageBuilder.withPayload("foo")
            .setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes())
            .build();
    Message<String> msg2 = MessageBuilder.withPayload("baz")
            .setHeader(KafkaHeaders.MESSAGE_KEY, "qux".getBytes())
            .build();
    return () -> {
        return Flux.just(msg1, msg2);
    };
}

还有Flux.fromStream(myStream).

这将在流的末尾结束。

EDIT2

您也可以使用 StreamBridge

https://docs.spring.io/spring-cloud-stream/docs/3.1.4/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources