Spring Cloud Stream - 如何处理下游块?
Spring Cloud Stream - how to handle downstream blocks?
在我们的 Kafka 集群计划停机期间,我们基本上遇到了以下问题 How to specify timeout for sending message to RabbitMQ using Spring Cloud Stream?(显然是 Kafka 而不是 RabbitMQ)。
来自@GaryRussell 的回答:
The channel sendTimeout
only applies if the channel itself can block, e.g. a QueueChannel
with a bounded queue that is currently full; the caller will block until either space becomes available in the queue, or the timeout occurs.
In this case, the block is downstream of the channel so the sendTimeout is irrelevant (in any case, it's a DirectChannel which can't block anyway, the subscribed handler is called directly on the calling thread).
The actual blocking you are seeing is most likely in the socket.write()
in the rabbitmq client, which does not have a timeout and is not interruptible; there is nothing that can be done by the calling thread to "time out" the write.
The only possible solution I am aware of is to force close the rabbit connection by calling resetConnection()
on the connection factory.
很好地解释了为什么所讨论的方法 (org.springframework.integration.channel.AbstractSubscribableChannel#doSend
) 没有考虑 timeout
。然而,这对我来说还是有点奇怪。
在 spring-integration-kafka-3.2.1.RELEASE-sources.jar!/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java:566
中,我们可以看到,如果需要 sync
行为:
565 if (this.sync) {
566 Long sendTimeout = this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class);
567 if (sendTimeout == null || sendTimeout < 0) {
568 future.get();
569 }
570 else {
571 try {
572 future.get(sendTimeout, TimeUnit.MILLISECONDS);
573 }
574 catch (TimeoutException te) {
575 throw new MessageTimeoutException(message, "Timeout waiting for response from KafkaProducer", te);
576 }
577 }
578 }
被调用,其中考虑了超时。 sendTimeoutExpression
分配给默认值:
private static final long DEFAULT_SEND_TIMEOUT = 10000;
private Expression sendTimeoutExpression = new ValueExpression<>(DEFAULT_SEND_TIMEOUT);
然而,我们的堆栈跟踪揭示了一些不同的东西:
"pool-1-thread-3" - Thread t@108
java.lang.Thread.State: TIMED_WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <4ebda621> (a org.springframework.util.concurrent.SettableListenableFuture$SettableTask)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:426)
at java.util.concurrent.FutureTask.get(FutureTask.java:204)
at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:134)
* at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.processSendResult(KafkaProducerMessageHandler.java:572)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:414)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:69)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1035)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:69)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
标记为*
的调用对应于future.get(sendTimeout, TimeUnit.MILLISECONDS);
调用。
看到底层客户端似乎如何支持它(由 future.get()
调用支持超时的事实给出),如何设置?我可以在活页夹参考中找到的唯一两个属性(参见 here)是 spring.cloud.stream.kafka.binder.healthTimeout
和 batchTimeout
,据我所知,这两个属性不会影响此设置。
看到 KafkaProducerMessageHandler
是如何在 org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.ProducerConfigurationMessageHandler
中构建的,私有 class,bean 覆盖似乎不是推荐的方法。
它似乎没有记录,但类似于侦听器容器定制器 https://docs.spring.io/spring-cloud-stream/docs/3.1.2/reference/html/spring-cloud-stream.html#_advanced_consumer_configuration 您可以添加 ProducerMessageHandlerCustomizer
@Bean
以在消息处理程序上设置任意属性。
在较新版本的处理程序中,超时始终配置为至少与 ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG
一样多,以避免漏报(处理程序超时后发布成功)。
在我们的 Kafka 集群计划停机期间,我们基本上遇到了以下问题 How to specify timeout for sending message to RabbitMQ using Spring Cloud Stream?(显然是 Kafka 而不是 RabbitMQ)。
来自@GaryRussell 的回答:
The channel
sendTimeout
only applies if the channel itself can block, e.g. aQueueChannel
with a bounded queue that is currently full; the caller will block until either space becomes available in the queue, or the timeout occurs.In this case, the block is downstream of the channel so the sendTimeout is irrelevant (in any case, it's a DirectChannel which can't block anyway, the subscribed handler is called directly on the calling thread).
The actual blocking you are seeing is most likely in the
socket.write()
in the rabbitmq client, which does not have a timeout and is not interruptible; there is nothing that can be done by the calling thread to "time out" the write.The only possible solution I am aware of is to force close the rabbit connection by calling
resetConnection()
on the connection factory.
很好地解释了为什么所讨论的方法 (org.springframework.integration.channel.AbstractSubscribableChannel#doSend
) 没有考虑 timeout
。然而,这对我来说还是有点奇怪。
在 spring-integration-kafka-3.2.1.RELEASE-sources.jar!/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java:566
中,我们可以看到,如果需要 sync
行为:
565 if (this.sync) {
566 Long sendTimeout = this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class);
567 if (sendTimeout == null || sendTimeout < 0) {
568 future.get();
569 }
570 else {
571 try {
572 future.get(sendTimeout, TimeUnit.MILLISECONDS);
573 }
574 catch (TimeoutException te) {
575 throw new MessageTimeoutException(message, "Timeout waiting for response from KafkaProducer", te);
576 }
577 }
578 }
被调用,其中考虑了超时。 sendTimeoutExpression
分配给默认值:
private static final long DEFAULT_SEND_TIMEOUT = 10000;
private Expression sendTimeoutExpression = new ValueExpression<>(DEFAULT_SEND_TIMEOUT);
然而,我们的堆栈跟踪揭示了一些不同的东西:
"pool-1-thread-3" - Thread t@108
java.lang.Thread.State: TIMED_WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <4ebda621> (a org.springframework.util.concurrent.SettableListenableFuture$SettableTask)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:426)
at java.util.concurrent.FutureTask.get(FutureTask.java:204)
at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:134)
* at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.processSendResult(KafkaProducerMessageHandler.java:572)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:414)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:69)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1035)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:69)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
标记为*
的调用对应于future.get(sendTimeout, TimeUnit.MILLISECONDS);
调用。
看到底层客户端似乎如何支持它(由 future.get()
调用支持超时的事实给出),如何设置?我可以在活页夹参考中找到的唯一两个属性(参见 here)是 spring.cloud.stream.kafka.binder.healthTimeout
和 batchTimeout
,据我所知,这两个属性不会影响此设置。
看到 KafkaProducerMessageHandler
是如何在 org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.ProducerConfigurationMessageHandler
中构建的,私有 class,bean 覆盖似乎不是推荐的方法。
它似乎没有记录,但类似于侦听器容器定制器 https://docs.spring.io/spring-cloud-stream/docs/3.1.2/reference/html/spring-cloud-stream.html#_advanced_consumer_configuration 您可以添加 ProducerMessageHandlerCustomizer
@Bean
以在消息处理程序上设置任意属性。
在较新版本的处理程序中,超时始终配置为至少与 ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG
一样多,以避免漏报(处理程序超时后发布成功)。