spring.cloud.stream.kafka.binder.headers 未按预期工作
spring.cloud.stream.kafka.binder.headers not working as expected
我正在尝试使用 spring.cloud.stream.kafka.binder.headers
传输我根据之前的 .
设置的自定义 header
我在 documentation 中阅读过...
spring.cloud.stream.kafka.binder.headers
The list of custom headers that will be transported by the binder.
Default: empty.
似乎建议设置列表(逗号分隔?)将导致自定义 header 在 Message<>
中传输,但 header 一旦kafka写入完成。
我的注释创建了 header 作为对 MessagingGateway 调用的一部分:
@MessagingGateway(name = "redemptionGateway", defaultRequestChannel = Channels.GATEWAY_OUTPUT, defaultHeaders = @GatewayHeader(name = "orderId", expression = "#gatewayMethod.name"))
public interface RedemptionGateway {
...
}
我观察到 header 在第一个预发送调试中正确创建:
2016-08-15 15:09:04 http-nio-8080-exec-2 DEBUG DirectChannel:430 - preSend on channel 'gatewayOutput', message: GenericMessage [payload=x.TrivialRedemption@2d052d2a[orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f], headers={orderId=create, id=5dccea6f-266e-82b9-54c6-57ec441a26ac, timestamp=1471288144882}] - {applicationSystemCode=x, clientIP=0:0:0:0:0:0:0:1, clusterId=Cluster-Id-NA, containerId=Container-Id-NA, correlationId=UNDEFINED, domainName=defaultDomain, hostName=Host-NA, messageId=10.113.21.144-eb8404d0-de93-4f94-80cb-e5b638e8aeef, userId=anonymous, webAnalyticsCorrelationId=|}
但是在下一次预发送时,header 丢失了:
2016-08-15 15:09:05 kafka-binder- DEBUG DirectChannel:430 - preSend on channel 'enrichingInput', message: GenericMessage [payload=x.TrivialRedemption@357bd4dd[orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f], headers={kafka_offset=10, orderId=create, kafka_messageKey=null, kafka_topic=received, kafka_partitionId=0, kafka_nextOffset=11, contentType=application/x-java-object;type=x.TrivialRedemption}] - {}
我的属性包含:
spring.cloud.stream.kafka.binder.headers=orderId
您使用的 spring-cloud-stream 是什么版本?
我刚刚写了一个快速测试用例,它工作得很好...
spring.cloud.stream.kafka.binder.headers=bar
spring.cloud.stream.bindings.output.destination=foobar
spring.cloud.stream.bindings.input.destination=foobar
spring.cloud.stream.bindings.input.group=foo
应用程序:
package com.example;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
@SpringBootApplication
@EnableBinding(Processor.class)
public class So38961697Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So38961697Application.class, args);
Foo foo = context.getBean(Foo.class);
foo.start();
foo.send();
Thread.sleep(30000);
context.close();
}
@Bean
public Foo foo() {
return new Foo();
}
private static class Foo {
@Autowired
Processor processor;
public void send() {
Message<?> m = MessageBuilder.withPayload("foo")
.setHeader("bar", "baz")
.build();
processor.output().send(m);
}
public void start() {
this.processor.input().subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> m) throws MessagingException {
System.out.println(m);
}
});
}
}
}
结果:
GenericMessage [payload=foo, headers={bar=baz, kafka_offset=0, kafka_messageKey=null, kafka_topic=foobar, kafka_partitionId=0, kafka_nextOffset=1, contentType=text/plain}]
完整项目is here.
编辑:查看评论,升级到 1.0。2.RELEASE 解决了问题
编辑
添加群组保证消费者从最早的消息开始消费。请参阅下面的评论。
我正在尝试使用 spring.cloud.stream.kafka.binder.headers
传输我根据之前的
我在 documentation 中阅读过...
spring.cloud.stream.kafka.binder.headers
The list of custom headers that will be transported by the binder.
Default: empty.
似乎建议设置列表(逗号分隔?)将导致自定义 header 在 Message<>
中传输,但 header 一旦kafka写入完成。
我的注释创建了 header 作为对 MessagingGateway 调用的一部分:
@MessagingGateway(name = "redemptionGateway", defaultRequestChannel = Channels.GATEWAY_OUTPUT, defaultHeaders = @GatewayHeader(name = "orderId", expression = "#gatewayMethod.name"))
public interface RedemptionGateway {
...
}
我观察到 header 在第一个预发送调试中正确创建:
2016-08-15 15:09:04 http-nio-8080-exec-2 DEBUG DirectChannel:430 - preSend on channel 'gatewayOutput', message: GenericMessage [payload=x.TrivialRedemption@2d052d2a[orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f], headers={orderId=create, id=5dccea6f-266e-82b9-54c6-57ec441a26ac, timestamp=1471288144882}] - {applicationSystemCode=x, clientIP=0:0:0:0:0:0:0:1, clusterId=Cluster-Id-NA, containerId=Container-Id-NA, correlationId=UNDEFINED, domainName=defaultDomain, hostName=Host-NA, messageId=10.113.21.144-eb8404d0-de93-4f94-80cb-e5b638e8aeef, userId=anonymous, webAnalyticsCorrelationId=|}
但是在下一次预发送时,header 丢失了:
2016-08-15 15:09:05 kafka-binder- DEBUG DirectChannel:430 - preSend on channel 'enrichingInput', message: GenericMessage [payload=x.TrivialRedemption@357bd4dd[orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f], headers={kafka_offset=10, orderId=create, kafka_messageKey=null, kafka_topic=received, kafka_partitionId=0, kafka_nextOffset=11, contentType=application/x-java-object;type=x.TrivialRedemption}] - {}
我的属性包含:
spring.cloud.stream.kafka.binder.headers=orderId
您使用的 spring-cloud-stream 是什么版本?
我刚刚写了一个快速测试用例,它工作得很好...
spring.cloud.stream.kafka.binder.headers=bar
spring.cloud.stream.bindings.output.destination=foobar
spring.cloud.stream.bindings.input.destination=foobar
spring.cloud.stream.bindings.input.group=foo
应用程序:
package com.example;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
@SpringBootApplication
@EnableBinding(Processor.class)
public class So38961697Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So38961697Application.class, args);
Foo foo = context.getBean(Foo.class);
foo.start();
foo.send();
Thread.sleep(30000);
context.close();
}
@Bean
public Foo foo() {
return new Foo();
}
private static class Foo {
@Autowired
Processor processor;
public void send() {
Message<?> m = MessageBuilder.withPayload("foo")
.setHeader("bar", "baz")
.build();
processor.output().send(m);
}
public void start() {
this.processor.input().subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> m) throws MessagingException {
System.out.println(m);
}
});
}
}
}
结果:
GenericMessage [payload=foo, headers={bar=baz, kafka_offset=0, kafka_messageKey=null, kafka_topic=foobar, kafka_partitionId=0, kafka_nextOffset=1, contentType=text/plain}]
完整项目is here.
编辑:查看评论,升级到 1.0。2.RELEASE 解决了问题
编辑
添加群组保证消费者从最早的消息开始消费。请参阅下面的评论。