如何在消费者中获取消息 headers
How to get the Message headers in a consumer
使用 Spring Cloud Stream 新的基于函数的编程模型,我需要访问我使用的消息的某些 headers。我想我会消耗 Message<MyDto>
而不是只消耗 MyDto
,但是如果我这样做 MyDto
属性都是空的。
这就是我想做的事情:
@ToString
@Getter
@Setter
public class MyDto {
private OtherDto otherDto;
}
@Bean
public Consumer<Message<MyDto>> onRawImport() {
message -> logger.info("Received {}", message.getPayload()); // <-- prints "Received MyDto(otherDto=OtherDto(...))"
}
而以下在我的配置中完美运行
@Bean
public Consumer<MyDto> onRawImport() {
myDto -> logger.info("Received {}", myDto); // <-- "Received MyDto(otherDto=null)"
}
有直接消费Message的简单方法吗?
附录:
如果我为 org.springframework.cloud.function.context.catalog 打开 DEBUG,我会看到 Consumer<MyDto>
:
BeanFactoryAwareFunctionRegistry : Applying function: onRawImport
BeanFactoryAwareFunctionRegistry : Applying type conversion on input value GenericMessage [payload=byte[288], headers=...snip...]
BeanFactoryAwareFunctionRegistry : Function type: java.util.function.Consumer<MyDto>
BeanFactoryAwareFunctionRegistry : Raw type of value: GenericMessage [payload=byte[288], ...snip...}] is class MyDto
BeanFactoryAwareFunctionRegistry : Converted from Message: MyDto(otherDto=OtherDto(...))
BeanFactoryAwareFunctionRegistry : Converted input value MyDto(otherDto=OtherDto(...))
MyOwnListener : Received MyDto(id=5, message=test)
这与 Consumer<Message<MyDto>>
BeanFactoryAwareFunctionRegistry : Applying function: onRawImport
BeanFactoryAwareFunctionRegistry : Applying type conversion on input value GenericMessage [payload=byte[288], headers=...snip...]
BeanFactoryAwareFunctionRegistry : Function type: Function type: java.util.function.Consumer<org.springframework.messaging.Message<MyDto>>
BeanFactoryAwareFunctionRegistry : Raw type of value: GenericMessage [payload=byte[288], ...snip...}] is class MyDto
BeanFactoryAwareFunctionRegistry : Converted from Message: MyDto(otherDto
=null)
BeanFactoryAwareFunctionRegistry : Converted input value MyDto(otherDto
=null)
MyOwnListener : Received MyDto(otherDto
=null)
您使用的是哪个版本?我刚刚使用 Boot 2.4.4、cloud 2020.0.2 对其进行了测试,它运行良好...
@SpringBootApplication
public class So66990612Application {
public static void main(String[] args) {
SpringApplication.run(So66990612Application.class, args);
}
@Bean
Consumer<Message<Foo>> input() {
return System.out::println;
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("input-in-0", "x", "{\"bar\":\"baz\"}",
msg -> {
msg.getMessageProperties().setContentType("application/json");
return msg;
});
};
}
public static class Foo {
private String bar;
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
GenericMessage [payload=Foo [bar=baz], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=input-in-0, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=input-in-0.anonymous.Uh_s89lKRnKeJ3ls991pXA, amqp_redelivered=false, amqp_receivedRoutingKey=x, amqp_contentEncoding=UTF-8, id=1a848cf6-3f85-c017-fa70-d52f43c0fc67, amqp_consumerTag=amq.ctag-w6ZyXBGOtC-q7rCY8Jy-gA, sourceData=(Body:'{"bar":"baz"}' MessageProperties [headers={}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=input-in-0, receivedRoutingKey=x, deliveryTag=1, consumerTag=amq.ctag-w6ZyXBGOtC-q7rCY8Jy-gA, consumerQueue=input-in-0.anonymous.Uh_s89lKRnKeJ3ls991pXA]), contentType=application/json, timestamp=1617888626028}]
使用 Spring Cloud Stream 新的基于函数的编程模型,我需要访问我使用的消息的某些 headers。我想我会消耗 Message<MyDto>
而不是只消耗 MyDto
,但是如果我这样做 MyDto
属性都是空的。
这就是我想做的事情:
@ToString
@Getter
@Setter
public class MyDto {
private OtherDto otherDto;
}
@Bean
public Consumer<Message<MyDto>> onRawImport() {
message -> logger.info("Received {}", message.getPayload()); // <-- prints "Received MyDto(otherDto=OtherDto(...))"
}
而以下在我的配置中完美运行
@Bean
public Consumer<MyDto> onRawImport() {
myDto -> logger.info("Received {}", myDto); // <-- "Received MyDto(otherDto=null)"
}
有直接消费Message的简单方法吗?
附录:
如果我为 org.springframework.cloud.function.context.catalog 打开 DEBUG,我会看到 Consumer<MyDto>
:
BeanFactoryAwareFunctionRegistry : Applying function: onRawImport
BeanFactoryAwareFunctionRegistry : Applying type conversion on input value GenericMessage [payload=byte[288], headers=...snip...]
BeanFactoryAwareFunctionRegistry : Function type: java.util.function.Consumer<MyDto>
BeanFactoryAwareFunctionRegistry : Raw type of value: GenericMessage [payload=byte[288], ...snip...}] is class MyDto
BeanFactoryAwareFunctionRegistry : Converted from Message: MyDto(otherDto=OtherDto(...))
BeanFactoryAwareFunctionRegistry : Converted input value MyDto(otherDto=OtherDto(...))
MyOwnListener : Received MyDto(id=5, message=test)
这与 Consumer<Message<MyDto>>
BeanFactoryAwareFunctionRegistry : Applying function: onRawImport
BeanFactoryAwareFunctionRegistry : Applying type conversion on input value GenericMessage [payload=byte[288], headers=...snip...]
BeanFactoryAwareFunctionRegistry : Function type: Function type: java.util.function.Consumer<org.springframework.messaging.Message<MyDto>>
BeanFactoryAwareFunctionRegistry : Raw type of value: GenericMessage [payload=byte[288], ...snip...}] is class MyDto
BeanFactoryAwareFunctionRegistry : Converted from Message: MyDto(otherDto
=null)
BeanFactoryAwareFunctionRegistry : Converted input value MyDto(otherDto
=null)
MyOwnListener : Received MyDto(otherDto
=null)
您使用的是哪个版本?我刚刚使用 Boot 2.4.4、cloud 2020.0.2 对其进行了测试,它运行良好...
@SpringBootApplication
public class So66990612Application {
public static void main(String[] args) {
SpringApplication.run(So66990612Application.class, args);
}
@Bean
Consumer<Message<Foo>> input() {
return System.out::println;
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("input-in-0", "x", "{\"bar\":\"baz\"}",
msg -> {
msg.getMessageProperties().setContentType("application/json");
return msg;
});
};
}
public static class Foo {
private String bar;
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
GenericMessage [payload=Foo [bar=baz], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=input-in-0, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=input-in-0.anonymous.Uh_s89lKRnKeJ3ls991pXA, amqp_redelivered=false, amqp_receivedRoutingKey=x, amqp_contentEncoding=UTF-8, id=1a848cf6-3f85-c017-fa70-d52f43c0fc67, amqp_consumerTag=amq.ctag-w6ZyXBGOtC-q7rCY8Jy-gA, sourceData=(Body:'{"bar":"baz"}' MessageProperties [headers={}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=input-in-0, receivedRoutingKey=x, deliveryTag=1, consumerTag=amq.ctag-w6ZyXBGOtC-q7rCY8Jy-gA, consumerQueue=input-in-0.anonymous.Uh_s89lKRnKeJ3ls991pXA]), contentType=application/json, timestamp=1617888626028}]