如何通过 spring 集成从 RabbitMQ 获取 header
How to get header from RabbitMQ with spring integration
为了使用 delayed-exchange,我使用这种方法通过 "int:gateway" 消息发送到 RabbitMq:
void send(@Payload Notification request, @Header(MessageProperties.X_DELAY) Integer delay, @Header("routingKey") String routingKey);
我可以在 RabbitMQ 中看到 headers 正确显示:
x-delay: -60000
但是,当我从 RabbitMQ 收到这条消息时,我如何才能得到这个 header?
到目前为止,我收到了我之前作为 Json 发送的 object,但是如果我尝试获取 header,我会收到异常。
发送中:
integration.xml 文件:
<!-- Producing service -->
<int:gateway id="gateway" default-request-channel="producingChannel" service-interface="Gateway"/>
<!-- Producing service -->
<!-- Service => RabbitMQ (Producing) -->
<int:chain input-channel="producingChannel">
<int:object-to-json-transformer/>
<int-amqp:outbound-channel-adapter exchange-name="${queuing.notifications-exchange}" routing-key-expression="headers.routingKey" mapped-request-headers="*"/>
</int:chain>
<!-- Service => RabbitMQ (Producing) -->
java 文件中的网关:
void send(@Payload Notification request, @Header(MessageProperties.X_DELAY) Integer delay, @Header("routingKey") String routingKey);
接收:
integration.xml 文件:
<!-- RabbitMQ => Service (Consuming) -->
<int-amqp:inbound-channel-adapter channel="consumingChannel" queue-names="${queuing.operator.queue}" concurrent-consumers="${queuing.concurrent-consumers}" prefetch-count="${queuing.prefetch-count}" mapped-request-headers="*" error-channel="errorChannel" />
<!-- RabbitMQ => Service (Consuming) -->
<!-- Routing -->
<int:chain input-channel="consumingChannel">
<int:json-to-object-transformer type="Notification"/>
<int:service-activator ref="workingService" method="processNotificationFromQueue"/>
</int:chain>
<!-- Routing -->
java 文件中的 WorkingService:
public void processNotificationFromQueue(Notification notification,
@Header(MessageProperties.X_DELAY) Integer delay) { ...
}
这里抛出异常:
Caused by: java.lang.IllegalArgumentException: required header not available: x-delay
您必须改用 AmqpHeaders.RECEIVED_DELAY
。
由于您使用正确的 mapped-request-headers="*"
默认 DefaultAmqpHeaderMapper
映射正确 :
Integer receivedDelay = amqpMessageProperties.getReceivedDelay();
if (receivedDelay != null) {
headers.put(AmqpHeaders.RECEIVED_DELAY, receivedDelay);
}
为了使用 delayed-exchange,我使用这种方法通过 "int:gateway" 消息发送到 RabbitMq:
void send(@Payload Notification request, @Header(MessageProperties.X_DELAY) Integer delay, @Header("routingKey") String routingKey);
我可以在 RabbitMQ 中看到 headers 正确显示: x-delay: -60000
但是,当我从 RabbitMQ 收到这条消息时,我如何才能得到这个 header?
到目前为止,我收到了我之前作为 Json 发送的 object,但是如果我尝试获取 header,我会收到异常。
发送中:
integration.xml 文件:
<!-- Producing service -->
<int:gateway id="gateway" default-request-channel="producingChannel" service-interface="Gateway"/>
<!-- Producing service -->
<!-- Service => RabbitMQ (Producing) -->
<int:chain input-channel="producingChannel">
<int:object-to-json-transformer/>
<int-amqp:outbound-channel-adapter exchange-name="${queuing.notifications-exchange}" routing-key-expression="headers.routingKey" mapped-request-headers="*"/>
</int:chain>
<!-- Service => RabbitMQ (Producing) -->
java 文件中的网关:
void send(@Payload Notification request, @Header(MessageProperties.X_DELAY) Integer delay, @Header("routingKey") String routingKey);
接收:
integration.xml 文件:
<!-- RabbitMQ => Service (Consuming) -->
<int-amqp:inbound-channel-adapter channel="consumingChannel" queue-names="${queuing.operator.queue}" concurrent-consumers="${queuing.concurrent-consumers}" prefetch-count="${queuing.prefetch-count}" mapped-request-headers="*" error-channel="errorChannel" />
<!-- RabbitMQ => Service (Consuming) -->
<!-- Routing -->
<int:chain input-channel="consumingChannel">
<int:json-to-object-transformer type="Notification"/>
<int:service-activator ref="workingService" method="processNotificationFromQueue"/>
</int:chain>
<!-- Routing -->
java 文件中的 WorkingService:
public void processNotificationFromQueue(Notification notification,
@Header(MessageProperties.X_DELAY) Integer delay) { ...
}
这里抛出异常:
Caused by: java.lang.IllegalArgumentException: required header not available: x-delay
您必须改用 AmqpHeaders.RECEIVED_DELAY
。
由于您使用正确的 mapped-request-headers="*"
默认 DefaultAmqpHeaderMapper
映射正确 :
Integer receivedDelay = amqpMessageProperties.getReceivedDelay();
if (receivedDelay != null) {
headers.put(AmqpHeaders.RECEIVED_DELAY, receivedDelay);
}