PubSubInboundChannelAdapter 在第 4 条消息后停止接收消息
PubSubInboundChannelAdapter stops to receive messages after 4th message
我创建了重现我的真实问题的简化示例。
我的示例从 google pub/sub 接受,记录它并将确认发送回 Pub/Sub
配置:
@Slf4j
@Configuration
public class MyConfig implements FlowSupport {
private final AppProperties properties;
public MyConfig(AppProperties properties) {
this.properties = properties;
}
@Bean
public JacksonFactory jacksonFactory() {
return JacksonFactory.getDefaultInstance();
}
@Bean
public MessageChannel bucketNotificationChannel() {
return MessageChannels.direct("input-notification-channel").get();
}
@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(PubSubTemplate template) {
var adapter = new PubSubInboundChannelAdapter(template, properties.getBucketTopicSubscription());
adapter.setOutputChannel(bucketNotificationChannel());
adapter.setErrorChannel(errorChannel());
adapter.setAckMode(AckMode.MANUAL);
adapter.setPayloadType(Notification.class);
return adapter;
}
@Bean
@Override
public MessageChannel idempotentDiscardChannel() {
return MessageChannels.direct("idempotent-discard-channel").get();
}
@Bean
public MessageChannel errorChannel() {
return MessageChannels.direct("global-error-channel").get();
}
@Bean
@Override
public ConcurrentMetadataStore idempotencyStore() {
return new SimpleMetadataStore();
}
@Bean
public IntegrationFlow bucketNotificationFlow(
EmptyNotificationHandler handler,
IntegrationFlow acknowledgementFlow
) {
return flow -> flow.channel(bucketNotificationChannel())
.handle(handler)
.log(INFO, "Handler finished", m -> {
return "got" + m;
}).gateway(acknowledgementFlow);
}
@Bean
public IntegrationFlow acknowledgementFlow(PubSubAckHandler handler) {
return flow -> flow.log(DEBUG, "acknowledgementFlow", m -> "Handling acknowledgement for message: " + m)
.handle(handler);
}
}
通知处理程序:
@Component
@Slf4j
public class EmptyNotificationHandler implements GenericHandler<Notification> {
private final ResourceLoader loader;
public EmptyNotificationHandler(ResourceLoader loader) {
this.loader = loader;
}
@Override
public Resource handle(Notification payload, MessageHeaders headers) {
try {
return new Resource() {
@Override
public boolean exists() {
return false;
}
...
};
} catch (Exception e) {
log.error("Error occurred:", e);
return null;
}
}
}
AckHandler:
@Component
public class MyPubSubAckHandler implements MessageHandler {
private final ConcurrentMetadataStore idempotencyStore;
public MyPubSubAckHandler(ConcurrentMetadataStore idempotencyStore, MeterRegistry meterRegistry) {
this.idempotencyStore = idempotencyStore;
}
@Override
public void handleMessage(@NonNull Message<?> message) throws MessagingException {
Message<?> targetMessage = MessageUtils.unwrap(message);
var pubSubMessage = getOriginalMessage(targetMessage);
if (pubSubMessage == null) {
removeFromIdempotentStore(targetMessage);
return;
}
var generation = targetMessage.getHeaders().get(OBJECT_GENERATION_HEADER);
if (message instanceof ErrorMessage || message.getPayload() instanceof Throwable) {
pubSubMessage.nack().addCallback(
v -> {
removeFromIdempotentStore(targetMessage);
log.error("Error message was nacked - {}", generation);
},
e -> {
removeFromIdempotentStore(targetMessage);
log.error("Failed to nack message {}", generation, e);
}
);
} else {
pubSubMessage.ack().addCallback(
v -> {
removeFromIdempotentStore(targetMessage);
log.info("Acknowledged message - {}", generation);
},
e -> {
removeFromIdempotentStore(targetMessage);
log.error("Failed to acknowledge message - {}", generation, e);
}
);
}
}
@SuppressWarnings({"RedundantSuppression", "unchecked"}) //IDEMPOTENCY_HEADER has Set<String> underneath
private void removeFromIdempotentStore(Message<?> targetMessage) {
Optional.ofNullable(targetMessage.getHeaders().get(IDEMPOTENCY_HEADER, Set.class)).stream()
.flatMap(Collection::stream)
.forEach(key -> idempotencyStore.remove(String.valueOf(key)));
}
}
当我发送第一条消息时,一切正常 - 我在日志中看到消息,ack 被发送到 pubsub。我还看到 gcp 订阅页面上未确认的消息数量为 0。
但是在几次成功的消息之后,我的应用程序就停止接收消息。我浪费了很多时间进行调试,并且能够找到以下内容:
几个线程在线挂起:
org.springframework.messaging.core.GenericMessagingTemplate.TemporaryReplyChannel#receive(long):314
this.replyLatch.await();
线程转储:
"gcp-pubsub-subscriber1@7980" prio=5 tid=0x1e nid=NA waiting
java.lang.Thread.State: WAITING
at jdk.internal.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1039)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1345)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:232)
at org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel.receive(GenericMessagingTemplate.java:314)
at org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel.receive(GenericMessagingTemplate.java:306)
at org.springframework.messaging.core.GenericMessagingTemplate.doReceive(GenericMessagingTemplate.java:207)
at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:240)
at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:46)
at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97)
at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:503)
at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:474)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.sendOrSendAndReceive(GatewayProxyFactoryBean.java:573)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:508)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:478)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:468)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
at com.sun.proxy.$Proxy110.exchange(Unknown Source:-1)
at org.springframework.integration.gateway.GatewayMessageHandler.handleRequestMessage(GatewayMessageHandler.java:88)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:444)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:318)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:266)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:229)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:133)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198)
at org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter.consumeMessage(PubSubInboundChannelAdapter.java:148)
at org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter$$Lambda9.600858818.accept(Unknown Source:-1)
at org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberTemplate.lambda$subscribeAndConvert(PubSubSubscriberTemplate.java:152)
at org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberTemplate$$Lambda0.1495761010.receiveMessage(Unknown Source:-1)
at com.google.cloud.pubsub.v1.MessageDispatcher.run(MessageDispatcher.java:379)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.util.concurrent.FutureTask.run(FutureTask.java:-1)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.lang.Thread.run(Thread.java:834)
我想这是一个原因,因为在 com.google.cloud.pubsub.v1.MessageDispatcher#processOutstandingMessage:
里面
行:
executor.execute(deliverMessageTask);
已执行但 deliverMessageTask 未被 threadExecutor 执行。
在我看来,它看起来像是库中的一个错误,但它可能是库滥用。无论如何,我正在寻找任何 solution/workaround 来避免这个问题。
库版本:
我使用:
spring-开机2.2.0.RELEASE
springCloudVersion = "Greenwich.SR3"
com.google.cloud:google-cloud-pubsub:1.98.0
P.S.
我知道我可以像这样增加线程池大小:
spring:
cloud:
gcp:
pubsub:
enabled: true
subscriber:
executor-threads: 100
但我认为这不是个好主意。
你的问题在这里:
.gateway(acknowledgementFlow);
意思是request-reply
,我们猜不到你的acknowledgementFlow
是one-way
流。我通过您的 MyPubSubAckHandler
实施看到了 returns void
的 handleMessage()
实施。这样网关等待回复,但真正的子流永远不会 return 任何结果。因此等待回复线程被卡住,最终您的应用程序失败。
其中一个解决方案是将 GatewayEndpointSpec.replyTimeout()
设为 0
。因此,您的 void
子流程不会阻止潜在回复的主要流程。
另一种方法就是不使用gateway()
,而是直接在主流程中使用子流程内容。看起来您确实不希望收到回复,所以这应该对您有用:
return flow -> flow.channel(bucketNotificationChannel())
.handle(handler)
.log(INFO, "Handler finished", m -> {
return "got" + m;
})
.log(DEBUG, "acknowledgementFlow", m -> "Handling acknowledgement for message: " + m)
.handle(pubSubAckHandler);
我创建了重现我的真实问题的简化示例。
我的示例从 google pub/sub 接受,记录它并将确认发送回 Pub/Sub
配置:
@Slf4j
@Configuration
public class MyConfig implements FlowSupport {
private final AppProperties properties;
public MyConfig(AppProperties properties) {
this.properties = properties;
}
@Bean
public JacksonFactory jacksonFactory() {
return JacksonFactory.getDefaultInstance();
}
@Bean
public MessageChannel bucketNotificationChannel() {
return MessageChannels.direct("input-notification-channel").get();
}
@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(PubSubTemplate template) {
var adapter = new PubSubInboundChannelAdapter(template, properties.getBucketTopicSubscription());
adapter.setOutputChannel(bucketNotificationChannel());
adapter.setErrorChannel(errorChannel());
adapter.setAckMode(AckMode.MANUAL);
adapter.setPayloadType(Notification.class);
return adapter;
}
@Bean
@Override
public MessageChannel idempotentDiscardChannel() {
return MessageChannels.direct("idempotent-discard-channel").get();
}
@Bean
public MessageChannel errorChannel() {
return MessageChannels.direct("global-error-channel").get();
}
@Bean
@Override
public ConcurrentMetadataStore idempotencyStore() {
return new SimpleMetadataStore();
}
@Bean
public IntegrationFlow bucketNotificationFlow(
EmptyNotificationHandler handler,
IntegrationFlow acknowledgementFlow
) {
return flow -> flow.channel(bucketNotificationChannel())
.handle(handler)
.log(INFO, "Handler finished", m -> {
return "got" + m;
}).gateway(acknowledgementFlow);
}
@Bean
public IntegrationFlow acknowledgementFlow(PubSubAckHandler handler) {
return flow -> flow.log(DEBUG, "acknowledgementFlow", m -> "Handling acknowledgement for message: " + m)
.handle(handler);
}
}
通知处理程序:
@Component
@Slf4j
public class EmptyNotificationHandler implements GenericHandler<Notification> {
private final ResourceLoader loader;
public EmptyNotificationHandler(ResourceLoader loader) {
this.loader = loader;
}
@Override
public Resource handle(Notification payload, MessageHeaders headers) {
try {
return new Resource() {
@Override
public boolean exists() {
return false;
}
...
};
} catch (Exception e) {
log.error("Error occurred:", e);
return null;
}
}
}
AckHandler:
@Component
public class MyPubSubAckHandler implements MessageHandler {
private final ConcurrentMetadataStore idempotencyStore;
public MyPubSubAckHandler(ConcurrentMetadataStore idempotencyStore, MeterRegistry meterRegistry) {
this.idempotencyStore = idempotencyStore;
}
@Override
public void handleMessage(@NonNull Message<?> message) throws MessagingException {
Message<?> targetMessage = MessageUtils.unwrap(message);
var pubSubMessage = getOriginalMessage(targetMessage);
if (pubSubMessage == null) {
removeFromIdempotentStore(targetMessage);
return;
}
var generation = targetMessage.getHeaders().get(OBJECT_GENERATION_HEADER);
if (message instanceof ErrorMessage || message.getPayload() instanceof Throwable) {
pubSubMessage.nack().addCallback(
v -> {
removeFromIdempotentStore(targetMessage);
log.error("Error message was nacked - {}", generation);
},
e -> {
removeFromIdempotentStore(targetMessage);
log.error("Failed to nack message {}", generation, e);
}
);
} else {
pubSubMessage.ack().addCallback(
v -> {
removeFromIdempotentStore(targetMessage);
log.info("Acknowledged message - {}", generation);
},
e -> {
removeFromIdempotentStore(targetMessage);
log.error("Failed to acknowledge message - {}", generation, e);
}
);
}
}
@SuppressWarnings({"RedundantSuppression", "unchecked"}) //IDEMPOTENCY_HEADER has Set<String> underneath
private void removeFromIdempotentStore(Message<?> targetMessage) {
Optional.ofNullable(targetMessage.getHeaders().get(IDEMPOTENCY_HEADER, Set.class)).stream()
.flatMap(Collection::stream)
.forEach(key -> idempotencyStore.remove(String.valueOf(key)));
}
}
当我发送第一条消息时,一切正常 - 我在日志中看到消息,ack 被发送到 pubsub。我还看到 gcp 订阅页面上未确认的消息数量为 0。
但是在几次成功的消息之后,我的应用程序就停止接收消息。我浪费了很多时间进行调试,并且能够找到以下内容:
几个线程在线挂起: org.springframework.messaging.core.GenericMessagingTemplate.TemporaryReplyChannel#receive(long):314
this.replyLatch.await();
线程转储:
"gcp-pubsub-subscriber1@7980" prio=5 tid=0x1e nid=NA waiting
java.lang.Thread.State: WAITING
at jdk.internal.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1039)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1345)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:232)
at org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel.receive(GenericMessagingTemplate.java:314)
at org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel.receive(GenericMessagingTemplate.java:306)
at org.springframework.messaging.core.GenericMessagingTemplate.doReceive(GenericMessagingTemplate.java:207)
at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:240)
at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:46)
at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97)
at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:503)
at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:474)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.sendOrSendAndReceive(GatewayProxyFactoryBean.java:573)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:508)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:478)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:468)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
at com.sun.proxy.$Proxy110.exchange(Unknown Source:-1)
at org.springframework.integration.gateway.GatewayMessageHandler.handleRequestMessage(GatewayMessageHandler.java:88)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:444)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:318)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:266)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:229)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:133)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198)
at org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter.consumeMessage(PubSubInboundChannelAdapter.java:148)
at org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter$$Lambda9.600858818.accept(Unknown Source:-1)
at org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberTemplate.lambda$subscribeAndConvert(PubSubSubscriberTemplate.java:152)
at org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberTemplate$$Lambda0.1495761010.receiveMessage(Unknown Source:-1)
at com.google.cloud.pubsub.v1.MessageDispatcher.run(MessageDispatcher.java:379)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.util.concurrent.FutureTask.run(FutureTask.java:-1)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.lang.Thread.run(Thread.java:834)
我想这是一个原因,因为在 com.google.cloud.pubsub.v1.MessageDispatcher#processOutstandingMessage:
里面行:
executor.execute(deliverMessageTask);
已执行但 deliverMessageTask 未被 threadExecutor 执行。
在我看来,它看起来像是库中的一个错误,但它可能是库滥用。无论如何,我正在寻找任何 solution/workaround 来避免这个问题。
库版本:
我使用:
spring-开机2.2.0.RELEASE
springCloudVersion = "Greenwich.SR3"
com.google.cloud:google-cloud-pubsub:1.98.0
P.S.
我知道我可以像这样增加线程池大小:
spring:
cloud:
gcp:
pubsub:
enabled: true
subscriber:
executor-threads: 100
但我认为这不是个好主意。
你的问题在这里:
.gateway(acknowledgementFlow);
意思是request-reply
,我们猜不到你的acknowledgementFlow
是one-way
流。我通过您的 MyPubSubAckHandler
实施看到了 returns void
的 handleMessage()
实施。这样网关等待回复,但真正的子流永远不会 return 任何结果。因此等待回复线程被卡住,最终您的应用程序失败。
其中一个解决方案是将 GatewayEndpointSpec.replyTimeout()
设为 0
。因此,您的 void
子流程不会阻止潜在回复的主要流程。
另一种方法就是不使用gateway()
,而是直接在主流程中使用子流程内容。看起来您确实不希望收到回复,所以这应该对您有用:
return flow -> flow.channel(bucketNotificationChannel())
.handle(handler)
.log(INFO, "Handler finished", m -> {
return "got" + m;
})
.log(DEBUG, "acknowledgementFlow", m -> "Handling acknowledgement for message: " + m)
.handle(pubSubAckHandler);