Spring Boot WebClient 停止发送请求
Spring Boot WebClient stops sending requests
我正在 运行ning 一个 Spring 启动应用程序,它使用 WebClient 进行非阻塞和阻塞 HTTP 请求。应用 运行 一段时间后,所有传出 HTTP 请求似乎都卡住了。
WebClient 用于向多个主机发送请求,但作为示例,这里是如何初始化并用于向 Telegram 发送请求的:
网络客户端配置:
@Bean
public ReactorClientHttpConnector httpClient() {
HttpClient.create(ConnectionProvider.builder("connectionProvider").build())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.responseTimeout(Duration.ofMillis(responseTimeout));
return new ReactorClientHttpConnector(httpClient);
}
所有 WebClient 使用相同的 ReactorClientHttpConnector。
电报客户端:
@Autowired
ReactorClientHttpConnector httpClient;
WebClient webClient;
RateLimiter rateLimiter;
@PostConstruct
public void init() {
webClient = WebClient.builder()
.clientConnector(httpClient)
.baseUrl(telegramUrl)
.build();
rateLimiter = RateLimiter.of("telegram-rate-limiter",
RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofMinutes(1))
.limitForPeriod(20)
.build());
}
public void sendMessage(@PathVariable("token") String token, @RequestParam("chat_id") long chatId, @RequestParam("text") String message) {
webClient.post().uri(String.format("/bot%s/sendMessage", token))
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromFormData("chat_id", String.valueOf(chatId))
.with("text", message))
.retrieve()
.bodyToMono(Void.class)
.transformDeferred(RateLimiterOperator.of(rateLimiter))
.block();
}
RateLimiter 用于确保请求数量不超过电报中指定的每分钟 20 个 API。
当应用程序启动时,所有请求都按预期正常解决。但是过了一段时间后,所有请求似乎都卡住了。发生这种情况所需的时间可能从几小时到几天不等。它发生在对不同主机的所有请求中,并且当来自 TelegramBot 的消息停止时很容易注意到。一旦请求卡住,它们就会无限期地卡住,我必须重新启动应用程序才能使其再次运行。
日志中没有异常似乎是造成这种情况的原因。由于我为我的电报消息维护了一个队列,所以当队列中的消息数量稳步增加时以及在等待请求解决的其他进程中发生错误时,我可以看到请求停止的时间点。
似乎连请求都没有发送出去,因为我设置的连接超时和响应超时没有生效。
我之前也试过将空闲时间设置为0,但没有解决问题
@Bean
public ReactorClientHttpConnector httpClient() {
HttpClient httpClient = HttpClient.create(ConnectionProvider.builder("connectionProvider").maxConnections(1000).maxIdleTime(Duration.ofSeconds(0)).build())
HttpClient httpClient = HttpClient.newConnection()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.responseTimeout(Duration.ofMillis(responseTimeout));
return new ReactorClientHttpConnector(httpClient);
}
更新:
我启用了指标并在卡住时使用千分尺查看它。有趣的是,它显示 Telegram 有一个连接,但在空闲、挂起或活动状态下也显示没有连接。
reactor_netty_connection_provider_idle_connections{id="-1268283746",name="connectionProvider",remote_address="api.telegram.org:443",} 0.0
reactor_netty_connection_provider_pending_connections{id="-1268283746",name="connectionProvider",remote_address="api.telegram.org:443",} 0.0
reactor_netty_connection_provider_active_connections{id="-1268283746",name="connectionProvider",remote_address="api.telegram.org:443",} 0.0
reactor_netty_connection_provider_total_connections{id="-1268283746",name="connectionProvider",remote_address="api.telegram.org:443",} 1.0
问题可能是缺少连接吗?
更新二:
我认为这可能与另一个问题有关:
所以我将 HttpClient 更新为:
@Bean
public ReactorClientHttpConnector httpClient() {
HttpClient httpClient = HttpClient.create(ConnectionProvider.builder("connectionProvider").metrics(true).build())
.doAfterResponseSuccess((r, c) -> c.dispose())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.responseTimeout(Duration.ofMillis(responseTimeout));
return new ReactorClientHttpConnector(httpClient);
}
但这一切似乎只是加速了问题的发生。和以前一样,活动连接、挂起连接和空闲连接加起来不等于总连接数。总数总是大于其他 3 个指标的总和。
更新 3:
问题发生时我做了一个线程转储。总共有 74 个线程,所以我不认为该应用 运行 线程不足。
Telegram 线程的转储:
"TelegramBot" #20 daemon prio=5 os_prio=0 cpu=14.65ms elapsed=47154.24s tid=0x00007f6b28e73000 nid=0x1c waiting on condition [0x00007f6aed6fb000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@11.0.13/Native Method)
- parking to wait for <0x00000000fa865c80> (a java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(java.base@11.0.13/LockSupport.java:194)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.13/AbstractQueuedSynchronizer.java:885)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.13/AbstractQueuedSynchronizer.java:1039)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.13/AbstractQueuedSynchronizer.java:1345)
at java.util.concurrent.CountDownLatch.await(java.base@11.0.13/CountDownLatch.java:232)
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87)
at reactor.core.publisher.Mono.block(Mono.java:1707)
at com.moon.arbitrage.cm.feign.TelegramClient.sendMessage(TelegramClient.java:59)
at com.moon.arbitrage.cm.service.TelegramService.lambda$sendArbMessage[=16=](TelegramService.java:53)
at com.moon.arbitrage.cm.service.TelegramService$$Lambda92/0x000000084070f840.run(Unknown Source)
at com.moon.arbitrage.cm.service.TelegramService.task(TelegramService.java:82)
at com.moon.arbitrage.cm.service.TelegramService$$Lambda0/0x0000000840665040.run(Unknown Source)
at java.lang.Thread.run(java.base@11.0.13/Thread.java:829)
Locked ownable synchronizers:
- None
reactor 工作线程:
"reactor-http-epoll-1" #15 daemon prio=5 os_prio=0 cpu=810.44ms elapsed=47157.07s tid=0x00007f6b281c4000 nid=0x17 runnable [0x00007f6b0c46c000]
java.lang.Thread.State: RUNNABLE
at io.netty.channel.epoll.Native.epollWait0(Native Method)
at io.netty.channel.epoll.Native.epollWait(Native.java:177)
at io.netty.channel.epoll.EpollEventLoop.epollWait(EpollEventLoop.java:286)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:351)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(java.base@11.0.13/Thread.java:829)
Locked ownable synchronizers:
- None
"reactor-http-epoll-2" #16 daemon prio=5 os_prio=0 cpu=1312.16ms elapsed=47157.07s tid=0x00007f6b281c5000 nid=0x18 waiting on condition [0x00007f6b0c369000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@11.0.13/Native Method)
- parking to wait for <0x00000000fa865948> (a java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(java.base@11.0.13/LockSupport.java:194)
at java.util.concurrent.CompletableFuture$Signaller.block(java.base@11.0.13/CompletableFuture.java:1796)
at java.util.concurrent.ForkJoinPool.managedBlock(java.base@11.0.13/ForkJoinPool.java:3128)
at java.util.concurrent.CompletableFuture.waitingGet(java.base@11.0.13/CompletableFuture.java:1823)
at java.util.concurrent.CompletableFuture.get(java.base@11.0.13/CompletableFuture.java:1998)
at com.moon.arbitrage.cm.service.OrderService.reconcileOrder(OrderService.java:103)
at com.moon.arbitrage.cm.service.BotService$BotTask.lambda$task(BotService.java:383)
at com.moon.arbitrage.cm.service.BotService$BotTask$$Lambda61/0x00000008400af440.accept(Unknown Source)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:171)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:400)
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:419)
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:473)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:702)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(java.base@11.0.13/Thread.java:829)
Locked ownable synchronizers:
- None
"reactor-http-epoll-3" #17 daemon prio=5 os_prio=0 cpu=171.84ms elapsed=47157.07s tid=0x00007f6b28beb000 nid=0x19 runnable [0x00007f6b0c26a000]
java.lang.Thread.State: RUNNABLE
at io.netty.channel.epoll.Native.epollWait0(Native Method)
at io.netty.channel.epoll.Native.epollWait(Native.java:177)
at io.netty.channel.epoll.EpollEventLoop.epollWait(EpollEventLoop.java:281)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:351)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(java.base@11.0.13/Thread.java:829)
Locked ownable synchronizers:
- None
"reactor-http-epoll-4" #18 daemon prio=5 os_prio=0 cpu=188.10ms elapsed=47157.07s tid=0x00007f6b28b7d800 nid=0x1a runnable [0x00007f6b0c169000]
java.lang.Thread.State: RUNNABLE
at io.netty.channel.epoll.Native.epollWait0(Native Method)
at io.netty.channel.epoll.Native.epollWait(Native.java:177)
at io.netty.channel.epoll.EpollEventLoop.epollWait(EpollEventLoop.java:281)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:351)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(java.base@11.0.13/Thread.java:829)
Locked ownable synchronizers:
- None
似乎其中一个被另一个任务阻塞(甚至不是来自 Telegram 服务),但这应该不是问题,因为其他三个工作线程是 运行 可用的,对吗?
我建议看一下RateLimiter方向。
可能它不会按预期工作,具体取决于您的应用程序随时间执行的请求数。
来自 Ratelimiter 的 Javadoc:
“重要的是要注意,请求的许可数量永远不会影响请求本身的节流......但它会影响下一个请求的节流。即,如果一个昂贵的任务到达一个空闲的 RateLimiter,它将立即被授予,但下一个请求将经历额外的节流,从而支付昂贵任务的成本。”
这个讨论也可能有帮助:
github or github
我可以想象在 RateLimiter 中有一些节流加起来或其他效果,我会尝试使用它并确保它真的按照你想要的方式工作。
或者,考虑使用 Spring @Scheduled 从队列中读取。
您可能想使用嵌入式 JMS 为它增添趣味以获得更多好处(消息持久性等)。
终于找到并解决了问题。问题是我有一个阻塞任务被反应器线程阻塞了。由于线程转储,我才注意到这一点。阻塞任务正在等待一个事件,因此可能需要很长时间才能解决。所以最终,当所有四个反应器线程都被阻塞时,所有请求自然会被卡住,没有线程来处理它们。
简而言之:不要阻塞你的反应器线程。
这些问题通常与 spring-boot 上的线程耗尽有关,应该在 jstack 发生阻塞时生成线程转储进行分析:
jstack <java pid> > ThredDump.txt
生成文件的内容ThredDump.txt通常会提供有关阻塞线程的信息。
我有非常相似的问题。 Spring 启动应用程序(webflux),使用 webclient
向外部世界发出 API 请求。启动我的应用程序后,当我的服务收到 post 请求并使用 webclient 与其他服务通信时,它会卡住。而且我做了thread dump,被阻塞的线程不说它在等什么
"reactor-http-epoll-6@15467" daemon prio=5 tid=0xbe nid=NA waiting
java.lang.Thread.State: WAITING
at jdk.internal.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1796)
而且我的线程没有运行出来
我正在 运行ning 一个 Spring 启动应用程序,它使用 WebClient 进行非阻塞和阻塞 HTTP 请求。应用 运行 一段时间后,所有传出 HTTP 请求似乎都卡住了。
WebClient 用于向多个主机发送请求,但作为示例,这里是如何初始化并用于向 Telegram 发送请求的:
网络客户端配置:
@Bean
public ReactorClientHttpConnector httpClient() {
HttpClient.create(ConnectionProvider.builder("connectionProvider").build())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.responseTimeout(Duration.ofMillis(responseTimeout));
return new ReactorClientHttpConnector(httpClient);
}
所有 WebClient 使用相同的 ReactorClientHttpConnector。
电报客户端:
@Autowired
ReactorClientHttpConnector httpClient;
WebClient webClient;
RateLimiter rateLimiter;
@PostConstruct
public void init() {
webClient = WebClient.builder()
.clientConnector(httpClient)
.baseUrl(telegramUrl)
.build();
rateLimiter = RateLimiter.of("telegram-rate-limiter",
RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofMinutes(1))
.limitForPeriod(20)
.build());
}
public void sendMessage(@PathVariable("token") String token, @RequestParam("chat_id") long chatId, @RequestParam("text") String message) {
webClient.post().uri(String.format("/bot%s/sendMessage", token))
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromFormData("chat_id", String.valueOf(chatId))
.with("text", message))
.retrieve()
.bodyToMono(Void.class)
.transformDeferred(RateLimiterOperator.of(rateLimiter))
.block();
}
RateLimiter 用于确保请求数量不超过电报中指定的每分钟 20 个 API。
当应用程序启动时,所有请求都按预期正常解决。但是过了一段时间后,所有请求似乎都卡住了。发生这种情况所需的时间可能从几小时到几天不等。它发生在对不同主机的所有请求中,并且当来自 TelegramBot 的消息停止时很容易注意到。一旦请求卡住,它们就会无限期地卡住,我必须重新启动应用程序才能使其再次运行。
日志中没有异常似乎是造成这种情况的原因。由于我为我的电报消息维护了一个队列,所以当队列中的消息数量稳步增加时以及在等待请求解决的其他进程中发生错误时,我可以看到请求停止的时间点。
似乎连请求都没有发送出去,因为我设置的连接超时和响应超时没有生效。
我之前也试过将空闲时间设置为0,但没有解决问题
@Bean
public ReactorClientHttpConnector httpClient() {
HttpClient httpClient = HttpClient.create(ConnectionProvider.builder("connectionProvider").maxConnections(1000).maxIdleTime(Duration.ofSeconds(0)).build())
HttpClient httpClient = HttpClient.newConnection()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.responseTimeout(Duration.ofMillis(responseTimeout));
return new ReactorClientHttpConnector(httpClient);
}
更新:
我启用了指标并在卡住时使用千分尺查看它。有趣的是,它显示 Telegram 有一个连接,但在空闲、挂起或活动状态下也显示没有连接。
reactor_netty_connection_provider_idle_connections{id="-1268283746",name="connectionProvider",remote_address="api.telegram.org:443",} 0.0
reactor_netty_connection_provider_pending_connections{id="-1268283746",name="connectionProvider",remote_address="api.telegram.org:443",} 0.0
reactor_netty_connection_provider_active_connections{id="-1268283746",name="connectionProvider",remote_address="api.telegram.org:443",} 0.0
reactor_netty_connection_provider_total_connections{id="-1268283746",name="connectionProvider",remote_address="api.telegram.org:443",} 1.0
问题可能是缺少连接吗?
更新二:
我认为这可能与另一个问题有关:
所以我将 HttpClient 更新为:
@Bean
public ReactorClientHttpConnector httpClient() {
HttpClient httpClient = HttpClient.create(ConnectionProvider.builder("connectionProvider").metrics(true).build())
.doAfterResponseSuccess((r, c) -> c.dispose())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.responseTimeout(Duration.ofMillis(responseTimeout));
return new ReactorClientHttpConnector(httpClient);
}
但这一切似乎只是加速了问题的发生。和以前一样,活动连接、挂起连接和空闲连接加起来不等于总连接数。总数总是大于其他 3 个指标的总和。
更新 3: 问题发生时我做了一个线程转储。总共有 74 个线程,所以我不认为该应用 运行 线程不足。
Telegram 线程的转储:
"TelegramBot" #20 daemon prio=5 os_prio=0 cpu=14.65ms elapsed=47154.24s tid=0x00007f6b28e73000 nid=0x1c waiting on condition [0x00007f6aed6fb000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@11.0.13/Native Method)
- parking to wait for <0x00000000fa865c80> (a java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(java.base@11.0.13/LockSupport.java:194)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.13/AbstractQueuedSynchronizer.java:885)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.13/AbstractQueuedSynchronizer.java:1039)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.13/AbstractQueuedSynchronizer.java:1345)
at java.util.concurrent.CountDownLatch.await(java.base@11.0.13/CountDownLatch.java:232)
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87)
at reactor.core.publisher.Mono.block(Mono.java:1707)
at com.moon.arbitrage.cm.feign.TelegramClient.sendMessage(TelegramClient.java:59)
at com.moon.arbitrage.cm.service.TelegramService.lambda$sendArbMessage[=16=](TelegramService.java:53)
at com.moon.arbitrage.cm.service.TelegramService$$Lambda92/0x000000084070f840.run(Unknown Source)
at com.moon.arbitrage.cm.service.TelegramService.task(TelegramService.java:82)
at com.moon.arbitrage.cm.service.TelegramService$$Lambda0/0x0000000840665040.run(Unknown Source)
at java.lang.Thread.run(java.base@11.0.13/Thread.java:829)
Locked ownable synchronizers:
- None
reactor 工作线程:
"reactor-http-epoll-1" #15 daemon prio=5 os_prio=0 cpu=810.44ms elapsed=47157.07s tid=0x00007f6b281c4000 nid=0x17 runnable [0x00007f6b0c46c000]
java.lang.Thread.State: RUNNABLE
at io.netty.channel.epoll.Native.epollWait0(Native Method)
at io.netty.channel.epoll.Native.epollWait(Native.java:177)
at io.netty.channel.epoll.EpollEventLoop.epollWait(EpollEventLoop.java:286)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:351)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(java.base@11.0.13/Thread.java:829)
Locked ownable synchronizers:
- None
"reactor-http-epoll-2" #16 daemon prio=5 os_prio=0 cpu=1312.16ms elapsed=47157.07s tid=0x00007f6b281c5000 nid=0x18 waiting on condition [0x00007f6b0c369000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@11.0.13/Native Method)
- parking to wait for <0x00000000fa865948> (a java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(java.base@11.0.13/LockSupport.java:194)
at java.util.concurrent.CompletableFuture$Signaller.block(java.base@11.0.13/CompletableFuture.java:1796)
at java.util.concurrent.ForkJoinPool.managedBlock(java.base@11.0.13/ForkJoinPool.java:3128)
at java.util.concurrent.CompletableFuture.waitingGet(java.base@11.0.13/CompletableFuture.java:1823)
at java.util.concurrent.CompletableFuture.get(java.base@11.0.13/CompletableFuture.java:1998)
at com.moon.arbitrage.cm.service.OrderService.reconcileOrder(OrderService.java:103)
at com.moon.arbitrage.cm.service.BotService$BotTask.lambda$task(BotService.java:383)
at com.moon.arbitrage.cm.service.BotService$BotTask$$Lambda61/0x00000008400af440.accept(Unknown Source)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:171)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:400)
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:419)
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:473)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:702)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(java.base@11.0.13/Thread.java:829)
Locked ownable synchronizers:
- None
"reactor-http-epoll-3" #17 daemon prio=5 os_prio=0 cpu=171.84ms elapsed=47157.07s tid=0x00007f6b28beb000 nid=0x19 runnable [0x00007f6b0c26a000]
java.lang.Thread.State: RUNNABLE
at io.netty.channel.epoll.Native.epollWait0(Native Method)
at io.netty.channel.epoll.Native.epollWait(Native.java:177)
at io.netty.channel.epoll.EpollEventLoop.epollWait(EpollEventLoop.java:281)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:351)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(java.base@11.0.13/Thread.java:829)
Locked ownable synchronizers:
- None
"reactor-http-epoll-4" #18 daemon prio=5 os_prio=0 cpu=188.10ms elapsed=47157.07s tid=0x00007f6b28b7d800 nid=0x1a runnable [0x00007f6b0c169000]
java.lang.Thread.State: RUNNABLE
at io.netty.channel.epoll.Native.epollWait0(Native Method)
at io.netty.channel.epoll.Native.epollWait(Native.java:177)
at io.netty.channel.epoll.EpollEventLoop.epollWait(EpollEventLoop.java:281)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:351)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(java.base@11.0.13/Thread.java:829)
Locked ownable synchronizers:
- None
似乎其中一个被另一个任务阻塞(甚至不是来自 Telegram 服务),但这应该不是问题,因为其他三个工作线程是 运行 可用的,对吗?
我建议看一下RateLimiter方向。 可能它不会按预期工作,具体取决于您的应用程序随时间执行的请求数。 来自 Ratelimiter 的 Javadoc: “重要的是要注意,请求的许可数量永远不会影响请求本身的节流......但它会影响下一个请求的节流。即,如果一个昂贵的任务到达一个空闲的 RateLimiter,它将立即被授予,但下一个请求将经历额外的节流,从而支付昂贵任务的成本。” 这个讨论也可能有帮助: github or github
我可以想象在 RateLimiter 中有一些节流加起来或其他效果,我会尝试使用它并确保它真的按照你想要的方式工作。 或者,考虑使用 Spring @Scheduled 从队列中读取。 您可能想使用嵌入式 JMS 为它增添趣味以获得更多好处(消息持久性等)。
终于找到并解决了问题。问题是我有一个阻塞任务被反应器线程阻塞了。由于线程转储,我才注意到这一点。阻塞任务正在等待一个事件,因此可能需要很长时间才能解决。所以最终,当所有四个反应器线程都被阻塞时,所有请求自然会被卡住,没有线程来处理它们。
简而言之:不要阻塞你的反应器线程。
这些问题通常与 spring-boot 上的线程耗尽有关,应该在 jstack 发生阻塞时生成线程转储进行分析:
jstack <java pid> > ThredDump.txt
生成文件的内容ThredDump.txt通常会提供有关阻塞线程的信息。
我有非常相似的问题。 Spring 启动应用程序(webflux),使用 webclient 向外部世界发出 API 请求。启动我的应用程序后,当我的服务收到 post 请求并使用 webclient 与其他服务通信时,它会卡住。而且我做了thread dump,被阻塞的线程不说它在等什么
"reactor-http-epoll-6@15467" daemon prio=5 tid=0xbe nid=NA waiting
java.lang.Thread.State: WAITING
at jdk.internal.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1796)
而且我的线程没有运行出来