无法使用 MessageProperties 将 header 设置为 AMQP 消息

Not able to set header to AMQP Message using MessageProperties

下面的代码显示了我如何将 header 和消息类型设置为 AMQP 消息。

MessageProperties properties = new MessageProperties();
    properties.setHeader("KEY", "HOUSE");
    properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
Message message = new Message("1234567;Branch A;SALES;3000.50;Pending approval".getBytes(), properties);
rabbitTemplate.sendAndReceive("", QUEUE_NAME, message);

在queue中发送消息后,消息被Transformer接收。

@Transformer(inputChannel = "inboundChannel", outputChannel = "toutboundChannel")
public Property buildProperty(Message<String> property){
    LOGGER.info("message received :: HEADERS: {}, PAYLOAD :{}", property.getHeaders(), property.getPayload());
....
}    

在日志中,缺少 header "KEY: HOUSE",甚至消息类型也不是 JSON,而是 "text/plain"。

LOGS:
[SimpleAsyncTaskExecutor-1] INFO com.demo.maven.spring.integration.endpoint.TransformerRequestBuilder - message received :: HEADERS: {amqp_receivedRoutingKey=mobile.queue, amqp_deliveryTag=2, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABByYWJiaXRAbG9jYWxob3N0AAAW9QAAAAAD.tTIFOS2gsM7qIlGYaybfrg==, amqp_deliveryMode=PERSISTENT, amqp_redelivered=true, id=399dda4f-4ba1-7cf4-2310-03dbfbac82b6, contentType=text/plain, timestamp=1421649922840}, PAYLOAD :1234567;Branch A;SALES;3000.50;Pending approval

这会起作用,您必须正确构建消息属性。

MessageProperties properties = new MessageProperties();
    properties.builder()
    .contentType(MediaType.APPLICATION_JSON)
    //headers here
    .headers(Map<String, Object>)
    .build();
  1. MessagePropertiesBuilder class 就是为了那个。

  2. 默认情况下 Spring 集成 AMQP 入站端点(AmqpInboundChannelAdapterAmqpInboundGateway)仅映射标准 AMQP headers。这是 DefaultAmqpHeaderMapper 的默认行为。要接受任何 user-specofic header,您应该使用选项 setRequestHeaderNames("*")AmqpHeaderMapper (setHeaderMapper) 注入到该入站端点。或者提供所需自定义名称的完整列表 headers.

  3. 回复。 contentType=text/plain:我认为 AMQP 入站端点和 @Transformer(inputChannel = "inboundChannel" 之间的某些东西会覆盖从 AMQP contentType header 接收到的信息。因为 RabbitTemplate 不会这样做,所以如果您发送 Message 而不是任何其他 Object。请为消息接收者共享 org.springframework.integration 类别的 DEBUG 日志。当然我们需要那部分日志,当你收到消息直到@Transformer