无法使用 MessageProperties 将 header 设置为 AMQP 消息
Not able to set header to AMQP Message using MessageProperties
下面的代码显示了我如何将 header 和消息类型设置为 AMQP 消息。
MessageProperties properties = new MessageProperties();
properties.setHeader("KEY", "HOUSE");
properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
Message message = new Message("1234567;Branch A;SALES;3000.50;Pending approval".getBytes(), properties);
rabbitTemplate.sendAndReceive("", QUEUE_NAME, message);
在queue中发送消息后,消息被Transformer接收。
@Transformer(inputChannel = "inboundChannel", outputChannel = "toutboundChannel")
public Property buildProperty(Message<String> property){
LOGGER.info("message received :: HEADERS: {}, PAYLOAD :{}", property.getHeaders(), property.getPayload());
....
}
在日志中,缺少 header "KEY: HOUSE",甚至消息类型也不是 JSON,而是 "text/plain"。
LOGS:
[SimpleAsyncTaskExecutor-1] INFO com.demo.maven.spring.integration.endpoint.TransformerRequestBuilder - message received :: HEADERS: {amqp_receivedRoutingKey=mobile.queue, amqp_deliveryTag=2, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABByYWJiaXRAbG9jYWxob3N0AAAW9QAAAAAD.tTIFOS2gsM7qIlGYaybfrg==, amqp_deliveryMode=PERSISTENT, amqp_redelivered=true, id=399dda4f-4ba1-7cf4-2310-03dbfbac82b6, contentType=text/plain, timestamp=1421649922840}, PAYLOAD :1234567;Branch A;SALES;3000.50;Pending approval
这会起作用,您必须正确构建消息属性。
MessageProperties properties = new MessageProperties();
properties.builder()
.contentType(MediaType.APPLICATION_JSON)
//headers here
.headers(Map<String, Object>)
.build();
MessagePropertiesBuilder
class 就是为了那个。
默认情况下 Spring 集成 AMQP 入站端点(AmqpInboundChannelAdapter
和 AmqpInboundGateway
)仅映射标准 AMQP headers。这是 DefaultAmqpHeaderMapper
的默认行为。要接受任何 user-specofic header,您应该使用选项 setRequestHeaderNames("*")
将 AmqpHeaderMapper
(setHeaderMapper
) 注入到该入站端点。或者提供所需自定义名称的完整列表 headers.
回复。 contentType=text/plain
:我认为 AMQP 入站端点和 @Transformer(inputChannel = "inboundChannel"
之间的某些东西会覆盖从 AMQP contentType
header 接收到的信息。因为 RabbitTemplate
不会这样做,所以如果您发送 Message
而不是任何其他 Object。请为消息接收者共享 org.springframework.integration
类别的 DEBUG
日志。当然我们需要那部分日志,当你收到消息直到@Transformer
下面的代码显示了我如何将 header 和消息类型设置为 AMQP 消息。
MessageProperties properties = new MessageProperties();
properties.setHeader("KEY", "HOUSE");
properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
Message message = new Message("1234567;Branch A;SALES;3000.50;Pending approval".getBytes(), properties);
rabbitTemplate.sendAndReceive("", QUEUE_NAME, message);
在queue中发送消息后,消息被Transformer接收。
@Transformer(inputChannel = "inboundChannel", outputChannel = "toutboundChannel")
public Property buildProperty(Message<String> property){
LOGGER.info("message received :: HEADERS: {}, PAYLOAD :{}", property.getHeaders(), property.getPayload());
....
}
在日志中,缺少 header "KEY: HOUSE",甚至消息类型也不是 JSON,而是 "text/plain"。
LOGS:
[SimpleAsyncTaskExecutor-1] INFO com.demo.maven.spring.integration.endpoint.TransformerRequestBuilder - message received :: HEADERS: {amqp_receivedRoutingKey=mobile.queue, amqp_deliveryTag=2, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABByYWJiaXRAbG9jYWxob3N0AAAW9QAAAAAD.tTIFOS2gsM7qIlGYaybfrg==, amqp_deliveryMode=PERSISTENT, amqp_redelivered=true, id=399dda4f-4ba1-7cf4-2310-03dbfbac82b6, contentType=text/plain, timestamp=1421649922840}, PAYLOAD :1234567;Branch A;SALES;3000.50;Pending approval
这会起作用,您必须正确构建消息属性。
MessageProperties properties = new MessageProperties();
properties.builder()
.contentType(MediaType.APPLICATION_JSON)
//headers here
.headers(Map<String, Object>)
.build();
MessagePropertiesBuilder
class 就是为了那个。默认情况下 Spring 集成 AMQP 入站端点(
AmqpInboundChannelAdapter
和AmqpInboundGateway
)仅映射标准 AMQP headers。这是DefaultAmqpHeaderMapper
的默认行为。要接受任何 user-specofic header,您应该使用选项setRequestHeaderNames("*")
将AmqpHeaderMapper
(setHeaderMapper
) 注入到该入站端点。或者提供所需自定义名称的完整列表 headers.回复。
contentType=text/plain
:我认为 AMQP 入站端点和@Transformer(inputChannel = "inboundChannel"
之间的某些东西会覆盖从 AMQPcontentType
header 接收到的信息。因为RabbitTemplate
不会这样做,所以如果您发送Message
而不是任何其他 Object。请为消息接收者共享org.springframework.integration
类别的DEBUG
日志。当然我们需要那部分日志,当你收到消息直到@Transformer