Spring Kafka @SendTo 未发送 Headers

Spring Kafka @SendTo Not Sending Headers

我正在使用 ReplyingKafkaTemplate 向 Kafka 发送消息,它使用 kafka_correlationId 发送消息。但是,当它访问我的 @KafkaListener 方法并将其转发到回复主题时,header 丢失了。

如何保存 kafka headers?

这是我的方法签名:

@KafkaListener(topics = "input")
@SendTo("reply")
public List<CustomOutput> consume(List<CustomInput> inputs) {
  ... /* some processing */
  return outputs;
}

我创建了一个 ProducerInterceptor,这样我就可以看到从 ReplyingKafkaTemplate 以及 @SendTo 注释发送的 header 是什么。由此,另一个奇怪的事情是 ReplyingKafkaTemplate 没有将记录的 kafka_replyTopic header 添加到消息中。

ReplyingKafkaTemplate 的配置方式如下:

@Bean
public KafkaMessageListenerContainer<Object, Object> replyContainer(ConsumerFactory<Object, Object> cf) {
  ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
  return new KafkaMessageListenerContainer<>(cf, containerProperties);
}

@Bean
public ReplyingKafkaTemplate<Object, Object, Object> replyingKafkaTemplate(ProducerFactory<Object, Object> pf, KafkaMessageListenerContainer<Object, Object> container) {
  return new ReplyingKafkaTemplate<>(pf, container);
}

我不确定这是否相关,但我也添加了 Spring Cloud Sleuth 作为依赖项,并且 span/trace header 在我发送消息时就在那里,但是在转发消息时会生成新的。

默认情况下,不会将请求消息中的任意 header 复制到回复消息中,只有 kafka_correlationId.

从 2.2 版开始,您可以配置一个 ReplyHeadersConfigurer,它被调用以确定应该复制哪些 header(s)。

the documentation

Starting with version 2.2, you can add a ReplyHeadersConfigurer to the listener container factory. This is consulted to determine which headers you want to set in the reply message.

编辑

顺便说一句,在 2.2 中,如果没有 header.

,RKT 会自动设置 replyTo

用2.1.x,可以做到,但是有点麻烦,需要自己做一些工作。关键是接收并回复一个Message<?>...

@KafkaListener(id = "so55622224", topics = "so55622224")
@SendTo("dummy.we.use.the.header.instead")
public Message<?> listen(Message<String> in) {
    System.out.println(in);
    Headers nativeHeaders = in.getHeaders().get(KafkaHeaders.NATIVE_HEADERS, Headers.class);
    byte[] replyTo = nativeHeaders.lastHeader(KafkaHeaders.REPLY_TOPIC).value();
    byte[] correlation = nativeHeaders.lastHeader(KafkaHeaders.CORRELATION_ID).value();
    return MessageBuilder.withPayload(in.getPayload().toUpperCase())
            .setHeader("myHeader", nativeHeaders.lastHeader("myHeader").value())
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .build();
}

// This is used to send the reply - needs a header mapper
@Bean
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory) {
    KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
    MessagingMessageConverter messageConverter = new MessagingMessageConverter();
    messageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper("*")); // map all byte[] headers
    kafkaTemplate.setMessageConverter(messageConverter);
    return kafkaTemplate;
}

@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
    return args -> {
        Headers headers = new RecordHeaders();
        headers.add(new RecordHeader("myHeader", "myHeaderValue".getBytes()));
        headers.add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "so55622224.replies".getBytes())); // automatic in 2.2
        ProducerRecord<String, String> record = new ProducerRecord<>("so55622224", null, null, "foo", headers);
        RequestReplyFuture<String, String, String> future = template.sendAndReceive(record);
        ConsumerRecord<String, String> reply = future.get();
        System.out.println("Reply: " + reply.value() + " myHeader="
                + new String(reply.headers().lastHeader("myHeader").value()));
    };
}