Spring 集成 AMQP - 消息偶尔未确认,需要超时?
Spring Integration AMQP - message unacked occasionally, need a timeout?
我有一个入站 RabbitMQ 通道适配器,每天成功处理 3000 条消息,但偶尔我会在 RabbitMQ 管理控制台中看到未确认的消息数为 1。这似乎仍然如此。
我确实有一个重试建议链,可以重试 3 次,然后通过死信路由键移动到 DLQ,这对大多数异常都有效。
unacked 在过去几周发生了两次,其中一次我能够进行线程转储并看到 int-http:outbound-gateway 调用卡在等待 http 响应 getStatusCode ()
我在 int-amqp:inbound-channel-adapter 上有一个 receive-timeout="59000",我希望它能在超过超时的任何地方使线程超时?
我现在注意到 int-http:outbound-gateway 上有一个 reply-timeout 属性,我应该设置它吗?
有什么想法吗?
<int-amqp:inbound-channel-adapter id="amqpInCdbEvents" channel="eventsAMQPChannel" channel-transacted="true" transaction-manager="transactionManager"
queue-names="internal.events.queue" connection-factory="connectionFactory"
receive-timeout="59000" concurrent-consumers="${eventsAMQPChannel.concurrent-consumers}"
advice-chain="retryChain" auto-startup="false" />
<int:channel id="eventsAMQPChannel" />
<!-- CHAIN of processing for Asynch Processing of Events from intermediate Queue -->
<int:chain id="routeEventChain" input-channel="eventsAMQPChannel">
<int:json-to-object-transformer type="xx.xx.xx.json.Event" object-mapper="springJacksonObjectMapper"/>
<int:header-enricher>
<int:header name="originalPayload" expression="payload" overwrite="true"/>
<int:header name="message_id" expression="payload.id" overwrite="true"/>
</int:header-enricher>
<int:router expression="payload.eventType">
<int:mapping value="VALUE" channel="valueEventChannel"/>
<int:mapping value="SWAP" channel="swapEventChannel"/>
</int:router>
</int:chain>
<int:channel id="valueEventChannel" />
<int:channel id="swapEventChannel" />
<int:chain id="valueEventChain" input-channel="valueEventChannel" output-channel="nullChannel">
<int:transformer ref="syncValuationTransformer" />
<int:object-to-json-transformer object-mapper="springJacksonObjectMapper" />
<int:header-enricher>
<int:header name="contentType" value="application/json;charset=UTF-8" overwrite="true"/>
</int:header-enricher>
<int-http:outbound-gateway id="httpOutboundGatewayValuationServiceFinalValuation"
expected-response-type="java.lang.String"
http-method="POST" charset="UTF-8"
extract-request-payload="true"
url="${value.service.uri}/value"/>
</int:chain>
reply-timeout
是将回复发送到回复通道时的超时(如果它可以阻塞 - 例如,一个已满的有界队列通道)。
int-http:outbound-gateway call was stuck waiting for a http response getStatusCode()
您在可以配置到出站适配器的 ClientHttpRequestFactory
上设置了客户端超时 (readtimeout
)...
/**
* Create a new instance of the {@link RestTemplate} based on the given {@link ClientHttpRequestFactory}.
* @param requestFactory HTTP request factory to use
* @see org.springframework.http.client.SimpleClientHttpRequestFactory
* @see org.springframework.http.client.HttpComponentsClientHttpRequestFactory
*/
public RestTemplate(ClientHttpRequestFactory requestFactory) {
this();
setRequestFactory(requestFactory);
}
我有一个入站 RabbitMQ 通道适配器,每天成功处理 3000 条消息,但偶尔我会在 RabbitMQ 管理控制台中看到未确认的消息数为 1。这似乎仍然如此。
我确实有一个重试建议链,可以重试 3 次,然后通过死信路由键移动到 DLQ,这对大多数异常都有效。
unacked 在过去几周发生了两次,其中一次我能够进行线程转储并看到 int-http:outbound-gateway 调用卡在等待 http 响应 getStatusCode ()
我在 int-amqp:inbound-channel-adapter 上有一个 receive-timeout="59000",我希望它能在超过超时的任何地方使线程超时?
我现在注意到 int-http:outbound-gateway 上有一个 reply-timeout 属性,我应该设置它吗?
有什么想法吗?
<int-amqp:inbound-channel-adapter id="amqpInCdbEvents" channel="eventsAMQPChannel" channel-transacted="true" transaction-manager="transactionManager"
queue-names="internal.events.queue" connection-factory="connectionFactory"
receive-timeout="59000" concurrent-consumers="${eventsAMQPChannel.concurrent-consumers}"
advice-chain="retryChain" auto-startup="false" />
<int:channel id="eventsAMQPChannel" />
<!-- CHAIN of processing for Asynch Processing of Events from intermediate Queue -->
<int:chain id="routeEventChain" input-channel="eventsAMQPChannel">
<int:json-to-object-transformer type="xx.xx.xx.json.Event" object-mapper="springJacksonObjectMapper"/>
<int:header-enricher>
<int:header name="originalPayload" expression="payload" overwrite="true"/>
<int:header name="message_id" expression="payload.id" overwrite="true"/>
</int:header-enricher>
<int:router expression="payload.eventType">
<int:mapping value="VALUE" channel="valueEventChannel"/>
<int:mapping value="SWAP" channel="swapEventChannel"/>
</int:router>
</int:chain>
<int:channel id="valueEventChannel" />
<int:channel id="swapEventChannel" />
<int:chain id="valueEventChain" input-channel="valueEventChannel" output-channel="nullChannel">
<int:transformer ref="syncValuationTransformer" />
<int:object-to-json-transformer object-mapper="springJacksonObjectMapper" />
<int:header-enricher>
<int:header name="contentType" value="application/json;charset=UTF-8" overwrite="true"/>
</int:header-enricher>
<int-http:outbound-gateway id="httpOutboundGatewayValuationServiceFinalValuation"
expected-response-type="java.lang.String"
http-method="POST" charset="UTF-8"
extract-request-payload="true"
url="${value.service.uri}/value"/>
</int:chain>
reply-timeout
是将回复发送到回复通道时的超时(如果它可以阻塞 - 例如,一个已满的有界队列通道)。
int-http:outbound-gateway call was stuck waiting for a http response getStatusCode()
您在可以配置到出站适配器的 ClientHttpRequestFactory
上设置了客户端超时 (readtimeout
)...
/**
* Create a new instance of the {@link RestTemplate} based on the given {@link ClientHttpRequestFactory}.
* @param requestFactory HTTP request factory to use
* @see org.springframework.http.client.SimpleClientHttpRequestFactory
* @see org.springframework.http.client.HttpComponentsClientHttpRequestFactory
*/
public RestTemplate(ClientHttpRequestFactory requestFactory) {
this();
setRequestFactory(requestFactory);
}