Spring Kafka @SendTo 未发送 Headers
Spring Kafka @SendTo Not Sending Headers
我正在使用 ReplyingKafkaTemplate
向 Kafka 发送消息,它使用 kafka_correlationId
发送消息。但是,当它访问我的 @KafkaListener
方法并将其转发到回复主题时,header 丢失了。
如何保存 kafka headers?
这是我的方法签名:
@KafkaListener(topics = "input")
@SendTo("reply")
public List<CustomOutput> consume(List<CustomInput> inputs) {
... /* some processing */
return outputs;
}
我创建了一个 ProducerInterceptor
,这样我就可以看到从 ReplyingKafkaTemplate
以及 @SendTo
注释发送的 header 是什么。由此,另一个奇怪的事情是 ReplyingKafkaTemplate
没有将记录的 kafka_replyTopic
header 添加到消息中。
ReplyingKafkaTemplate
的配置方式如下:
@Bean
public KafkaMessageListenerContainer<Object, Object> replyContainer(ConsumerFactory<Object, Object> cf) {
ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
return new KafkaMessageListenerContainer<>(cf, containerProperties);
}
@Bean
public ReplyingKafkaTemplate<Object, Object, Object> replyingKafkaTemplate(ProducerFactory<Object, Object> pf, KafkaMessageListenerContainer<Object, Object> container) {
return new ReplyingKafkaTemplate<>(pf, container);
}
我不确定这是否相关,但我也添加了 Spring Cloud Sleuth 作为依赖项,并且 span/trace header 在我发送消息时就在那里,但是在转发消息时会生成新的。
默认情况下,不会将请求消息中的任意 header 复制到回复消息中,只有 kafka_correlationId
.
从 2.2 版开始,您可以配置一个 ReplyHeadersConfigurer
,它被调用以确定应该复制哪些 header(s)。
Starting with version 2.2, you can add a ReplyHeadersConfigurer
to the listener container factory. This is consulted to determine which headers you want to set in the reply message.
编辑
顺便说一句,在 2.2 中,如果没有 header.
,RKT 会自动设置 replyTo
用2.1.x,可以做到,但是有点麻烦,需要自己做一些工作。关键是接收并回复一个Message<?>
...
@KafkaListener(id = "so55622224", topics = "so55622224")
@SendTo("dummy.we.use.the.header.instead")
public Message<?> listen(Message<String> in) {
System.out.println(in);
Headers nativeHeaders = in.getHeaders().get(KafkaHeaders.NATIVE_HEADERS, Headers.class);
byte[] replyTo = nativeHeaders.lastHeader(KafkaHeaders.REPLY_TOPIC).value();
byte[] correlation = nativeHeaders.lastHeader(KafkaHeaders.CORRELATION_ID).value();
return MessageBuilder.withPayload(in.getPayload().toUpperCase())
.setHeader("myHeader", nativeHeaders.lastHeader("myHeader").value())
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.setHeader(KafkaHeaders.TOPIC, replyTo)
.build();
}
// This is used to send the reply - needs a header mapper
@Bean
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
messageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper("*")); // map all byte[] headers
kafkaTemplate.setMessageConverter(messageConverter);
return kafkaTemplate;
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
Headers headers = new RecordHeaders();
headers.add(new RecordHeader("myHeader", "myHeaderValue".getBytes()));
headers.add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "so55622224.replies".getBytes())); // automatic in 2.2
ProducerRecord<String, String> record = new ProducerRecord<>("so55622224", null, null, "foo", headers);
RequestReplyFuture<String, String, String> future = template.sendAndReceive(record);
ConsumerRecord<String, String> reply = future.get();
System.out.println("Reply: " + reply.value() + " myHeader="
+ new String(reply.headers().lastHeader("myHeader").value()));
};
}
我正在使用 ReplyingKafkaTemplate
向 Kafka 发送消息,它使用 kafka_correlationId
发送消息。但是,当它访问我的 @KafkaListener
方法并将其转发到回复主题时,header 丢失了。
如何保存 kafka headers?
这是我的方法签名:
@KafkaListener(topics = "input")
@SendTo("reply")
public List<CustomOutput> consume(List<CustomInput> inputs) {
... /* some processing */
return outputs;
}
我创建了一个 ProducerInterceptor
,这样我就可以看到从 ReplyingKafkaTemplate
以及 @SendTo
注释发送的 header 是什么。由此,另一个奇怪的事情是 ReplyingKafkaTemplate
没有将记录的 kafka_replyTopic
header 添加到消息中。
ReplyingKafkaTemplate
的配置方式如下:
@Bean
public KafkaMessageListenerContainer<Object, Object> replyContainer(ConsumerFactory<Object, Object> cf) {
ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
return new KafkaMessageListenerContainer<>(cf, containerProperties);
}
@Bean
public ReplyingKafkaTemplate<Object, Object, Object> replyingKafkaTemplate(ProducerFactory<Object, Object> pf, KafkaMessageListenerContainer<Object, Object> container) {
return new ReplyingKafkaTemplate<>(pf, container);
}
我不确定这是否相关,但我也添加了 Spring Cloud Sleuth 作为依赖项,并且 span/trace header 在我发送消息时就在那里,但是在转发消息时会生成新的。
默认情况下,不会将请求消息中的任意 header 复制到回复消息中,只有 kafka_correlationId
.
从 2.2 版开始,您可以配置一个 ReplyHeadersConfigurer
,它被调用以确定应该复制哪些 header(s)。
Starting with version 2.2, you can add a
ReplyHeadersConfigurer
to the listener container factory. This is consulted to determine which headers you want to set in the reply message.
编辑
顺便说一句,在 2.2 中,如果没有 header.
,RKT 会自动设置 replyTo用2.1.x,可以做到,但是有点麻烦,需要自己做一些工作。关键是接收并回复一个Message<?>
...
@KafkaListener(id = "so55622224", topics = "so55622224")
@SendTo("dummy.we.use.the.header.instead")
public Message<?> listen(Message<String> in) {
System.out.println(in);
Headers nativeHeaders = in.getHeaders().get(KafkaHeaders.NATIVE_HEADERS, Headers.class);
byte[] replyTo = nativeHeaders.lastHeader(KafkaHeaders.REPLY_TOPIC).value();
byte[] correlation = nativeHeaders.lastHeader(KafkaHeaders.CORRELATION_ID).value();
return MessageBuilder.withPayload(in.getPayload().toUpperCase())
.setHeader("myHeader", nativeHeaders.lastHeader("myHeader").value())
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.setHeader(KafkaHeaders.TOPIC, replyTo)
.build();
}
// This is used to send the reply - needs a header mapper
@Bean
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
messageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper("*")); // map all byte[] headers
kafkaTemplate.setMessageConverter(messageConverter);
return kafkaTemplate;
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
Headers headers = new RecordHeaders();
headers.add(new RecordHeader("myHeader", "myHeaderValue".getBytes()));
headers.add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "so55622224.replies".getBytes())); // automatic in 2.2
ProducerRecord<String, String> record = new ProducerRecord<>("so55622224", null, null, "foo", headers);
RequestReplyFuture<String, String, String> future = template.sendAndReceive(record);
ConsumerRecord<String, String> reply = future.get();
System.out.println("Reply: " + reply.value() + " myHeader="
+ new String(reply.headers().lastHeader("myHeader").value()));
};
}