使用 Spring Cloud Stream Kafka Binder 批量使用 Kafka 消息及其密钥

Consuming Kafka messages with its key in batches using Spring Cloud Stream Kafka Binder

consuming them as batches时是否有可能以某种方式获取kafka消息的密钥?

我在使用 Message<String> 作为我的消费者函数的输入时设法访问了消息键,但这只适用于非批处理模式:

@SpringBootApplication
class KafkaSink {

    private val log = logger()

    @Bean
    fun sink() : Consumer<Message<String>> {
        return Consumer {
            log.info("key: ${it.headers[KafkaHeaders.RECEIVED_MESSAGE_KEY]} value: ${it.payload}")      
        }
    }
}

当设置 属性 spring.cloud.stream.binding.sink.consumer.batch-mode=true 时,我只能使用 List<String> 作为消费者的参数,而不能使用 List<Message<String>>

使用Message<List<String>>;然后,KafkaHeaders.RECEIVED_MESSAGE_KEY 是一个 List<?>,与有效载荷的顺序相同 - 所有其他 headers.

也是如此

编辑

在下方重新评论;默认 content-type 是 application/json。添加

      bindings:
        sink-in-0:
          content-type: text/plain

修复你的测试。

不幸的是,它在任何地方都没有提到:-(,但是将输入事件包装到org.springframework.messaging.Message中(例如获取记录密钥),这是批处理时正确的签名:

@Bean
public Consumer<Message<List<String>>> sink() {
    return messages -> {
        List<?> keys = messages.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, List.class);
        log.info("headers {}", messages.getHeaders());
        keys.forEach(k -> log.info("key {}", k));
        messages.getPayload().forEach(string -> log.info("processing {}", string));
    };
}

您注意到签名是相反的:Message<List<?>> 而不是 List<Message<?>>