为什么 Vertx Event Bus 在高负载下会阻塞?
Why does Vertx Event Bus block under high load?
我正在尝试像这样(使用 Hazelcast 集群)通过 Vertx 事件总线发送大量消息而不阻塞:
EventBus eb = vertx.eventBus();
for (int i = 0; i < 100; i++) {
vertx.setPeriodic(1, num -> {
eb.send("clusteredEndpoint", "ping");
});
}
当计时器数量较少时它工作正常但在大约 100 个计时器时我收到此错误。
我想知道如何在不阻塞的情况下扩展到 100K events/s(作为参考,我写了一个可能超过这个数字的 Vertx WebSocket 测试)。
如果不可能,我想了解是什么阻塞了 - 看起来是这个 class 中的东西:https://github.com/eclipse-vertx/vert.x/blob/master/src/main/java/io/vertx/core/eventbus/impl/clustered/Serializer.java
供参考 - 此代码不会阻塞 - 即使有 1000 个计时器:
HttpClient client = vertx.createHttpClient();
client.webSocket(8080, "localhost", "/", res -> {
for (int i = 0; i < 1000; i++) {
vertx.setPeriodic(1, num -> {
res.result().writeTextMessage("ping");
});
}
});
});
Dec 15, 2020 10:54:38 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-1,5,main] has been
blocked for 36794 ms, time limit is 2000 ms
io.vertx.core.VertxException: Thread blocked at
io.vertx.core.impl.future.FutureImpl.addListener(FutureImpl.java:140)
at
io.vertx.core.impl.future.PromiseImpl.addListener(PromiseImpl.java:23)
at
io.vertx.core.impl.future.FutureImpl.onComplete(FutureImpl.java:133)
at
io.vertx.core.impl.future.PromiseImpl.onComplete(PromiseImpl.java:23)
at
io.vertx.core.spi.cluster.impl.selector.Selectors.withSelector(Selectors.java:48)
at
io.vertx.core.spi.cluster.impl.DefaultNodeSelector.selectForSend(DefaultNodeSelector.java:42)
at
io.vertx.core.eventbus.impl.clustered.ClusteredEventBus$$Lambda65/195695453.accept(Unknown
Source) at
io.vertx.core.eventbus.impl.clustered.Serializer$SerializerQueue$SerializedTask.process(Serializer.java:147)
at
io.vertx.core.eventbus.impl.clustered.Serializer$SerializerQueue.checkPending(Serializer.java:94)
at
io.vertx.core.eventbus.impl.clustered.Serializer$SerializerQueue.add(Serializer.java:114)
at
io.vertx.core.eventbus.impl.clustered.Serializer.queue(Serializer.java:65)
at
io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.sendOrPub(ClusteredEventBus.java:172)
at
io.vertx.core.eventbus.impl.OutboundDeliveryContext.next(OutboundDeliveryContext.java:127)
at
io.vertx.core.eventbus.impl.EventBusImpl.sendOrPubInternal(EventBusImpl.java:394)
at
io.vertx.core.eventbus.impl.EventBusImpl.sendOrPubInternal(EventBusImpl.java:400)
at
io.vertx.core.eventbus.impl.EventBusImpl.send(EventBusImpl.java:103)
at
io.vertx.core.eventbus.impl.EventBusImpl.send(EventBusImpl.java:97)
at io.vertx.example.EBtestClient.lambda$start[=12=](EBtestClient.java:22)
at
io.vertx.example.EBtestClient$$Lambda56/1487417027.handle(Unknown
Source) at
io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:939)
at
io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:910)
at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:52)
at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:294) at
io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:24) at
io.vertx.core.impl.AbstractContext.emit(AbstractContext.java:49) at
io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:24) at
io.vertx.core.impl.VertxImpl$InternalTimerHandler.run(VertxImpl.java:933)
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at
io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:176)
at
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) at
io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
at
io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
首先,您将 运行 在同一个线程上执行 100 个任务,因为 Vert.x 具有线程关联性。如果你想避免这种情况,运行 它们在不同的 Verticle 上。但是,我仍然认为你没有 100 个 CPU,所以会有很多争用。
将它们全部设置为每 1 毫秒执行一次意味着它们需要以某种方式每次在 10 微秒内完成,其中包括网络代码,因为您使用的是集群 EventBus。
所以,这是测试的编写方式,而不是 Vert.x 在做什么。
如果您真的想测试这种负载(我们这里说的是 100K rps),请将您的请求分散到多台机器上。
不过,我不确定 Hazelcast 是为处理这种负载而构建的。
如果您想知道真正阻塞的是什么,我猜是这部分代码:
由于我没有现成的集群 Vert.x 设置,所以我无法确认我的假设是否正确。
这是我进一步调查后的分析:
当使用 Vertx 事件总线进行远程通信时,一旦消费者不知所措,它就会停止响应。这会导致生产者阻塞,我捕获了 3 条不同的阻塞消息(见下文)。在阻止警告之后有这个警告:
WARNING: No pong from server 2d1fb2ce-940f-4b60-bf60-39847f31bcaf -
will consider it dead
我的问题的答案是,它“为什么”阻塞并不重要,因为它已经死了(因为它达到了一定的限制)。
我很惊讶 Vert.x 没有更优雅地处理这个问题——比如可能抛出异常。
阻止错误 #1
线程在 io.vertx.core.impl.future.FutureImpl.addListener(FutureImpl.java:140) 在 io.vertx.core.impl.future.PromiseImpl.addListener(PromiseImpl.java:23) 在 io.vertx.core.impl.future.FutureImpl.onComplete(FutureImpl.java: 133) 在io.vertx.core.impl.future.PromiseImpl.onComplete(PromiseImpl.java:23) 在io.vertx.core.spi.cluster.impl.selector.Selectors.withSelector(Selectors.java:48) 在
阻止错误 #2
io.vertx.core.VertxException: 线程阻塞
在 java.nio.charset.CharsetEncoder.(CharsetEncoder.java:198)
在 java.nio.charset.CharsetEncoder.(CharsetEncoder.java:233)
在 sun.nio.cs.UTF_8$Encoder.(UTF_8.java:558)
在 sun.nio.cs.UTF_8$Encoder.(UTF_8.java:554)
在 sun.nio.cs.UTF_8.newEncoder(UTF_8.java:72)
阻止错误 #3
io.vertx.core.VertxException: 线程阻塞
在 io.vertx.core.eventbus.impl.clustered.ConnectionHolder.writeMessage(ConnectionHolder.java:93)
在 io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.sendRemote(ClusteredEventBus.java:332)
在 io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.sendToNode(ClusteredEventBus.java:283)
我正在尝试像这样(使用 Hazelcast 集群)通过 Vertx 事件总线发送大量消息而不阻塞:
EventBus eb = vertx.eventBus();
for (int i = 0; i < 100; i++) {
vertx.setPeriodic(1, num -> {
eb.send("clusteredEndpoint", "ping");
});
}
当计时器数量较少时它工作正常但在大约 100 个计时器时我收到此错误。
我想知道如何在不阻塞的情况下扩展到 100K events/s(作为参考,我写了一个可能超过这个数字的 Vertx WebSocket 测试)。
如果不可能,我想了解是什么阻塞了 - 看起来是这个 class 中的东西:https://github.com/eclipse-vertx/vert.x/blob/master/src/main/java/io/vertx/core/eventbus/impl/clustered/Serializer.java
供参考 - 此代码不会阻塞 - 即使有 1000 个计时器:
HttpClient client = vertx.createHttpClient();
client.webSocket(8080, "localhost", "/", res -> {
for (int i = 0; i < 1000; i++) {
vertx.setPeriodic(1, num -> {
res.result().writeTextMessage("ping");
});
}
});
});
Dec 15, 2020 10:54:38 AM io.vertx.core.impl.BlockedThreadChecker WARNING: Thread Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 36794 ms, time limit is 2000 ms io.vertx.core.VertxException: Thread blocked at io.vertx.core.impl.future.FutureImpl.addListener(FutureImpl.java:140) at io.vertx.core.impl.future.PromiseImpl.addListener(PromiseImpl.java:23) at io.vertx.core.impl.future.FutureImpl.onComplete(FutureImpl.java:133) at io.vertx.core.impl.future.PromiseImpl.onComplete(PromiseImpl.java:23) at io.vertx.core.spi.cluster.impl.selector.Selectors.withSelector(Selectors.java:48) at io.vertx.core.spi.cluster.impl.DefaultNodeSelector.selectForSend(DefaultNodeSelector.java:42) at io.vertx.core.eventbus.impl.clustered.ClusteredEventBus$$Lambda65/195695453.accept(Unknown Source) at io.vertx.core.eventbus.impl.clustered.Serializer$SerializerQueue$SerializedTask.process(Serializer.java:147) at io.vertx.core.eventbus.impl.clustered.Serializer$SerializerQueue.checkPending(Serializer.java:94) at io.vertx.core.eventbus.impl.clustered.Serializer$SerializerQueue.add(Serializer.java:114) at io.vertx.core.eventbus.impl.clustered.Serializer.queue(Serializer.java:65) at io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.sendOrPub(ClusteredEventBus.java:172) at io.vertx.core.eventbus.impl.OutboundDeliveryContext.next(OutboundDeliveryContext.java:127) at io.vertx.core.eventbus.impl.EventBusImpl.sendOrPubInternal(EventBusImpl.java:394) at io.vertx.core.eventbus.impl.EventBusImpl.sendOrPubInternal(EventBusImpl.java:400) at io.vertx.core.eventbus.impl.EventBusImpl.send(EventBusImpl.java:103) at io.vertx.core.eventbus.impl.EventBusImpl.send(EventBusImpl.java:97) at io.vertx.example.EBtestClient.lambda$start[=12=](EBtestClient.java:22) at io.vertx.example.EBtestClient$$Lambda56/1487417027.handle(Unknown Source) at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:939) at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:910) at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:52) at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:294) at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:24) at io.vertx.core.impl.AbstractContext.emit(AbstractContext.java:49) at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:24) at io.vertx.core.impl.VertxImpl$InternalTimerHandler.run(VertxImpl.java:933) at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:176) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748)
首先,您将 运行 在同一个线程上执行 100 个任务,因为 Vert.x 具有线程关联性。如果你想避免这种情况,运行 它们在不同的 Verticle 上。但是,我仍然认为你没有 100 个 CPU,所以会有很多争用。
将它们全部设置为每 1 毫秒执行一次意味着它们需要以某种方式每次在 10 微秒内完成,其中包括网络代码,因为您使用的是集群 EventBus。
所以,这是测试的编写方式,而不是 Vert.x 在做什么。
如果您真的想测试这种负载(我们这里说的是 100K rps),请将您的请求分散到多台机器上。
不过,我不确定 Hazelcast 是为处理这种负载而构建的。
如果您想知道真正阻塞的是什么,我猜是这部分代码:
由于我没有现成的集群 Vert.x 设置,所以我无法确认我的假设是否正确。
这是我进一步调查后的分析:
当使用 Vertx 事件总线进行远程通信时,一旦消费者不知所措,它就会停止响应。这会导致生产者阻塞,我捕获了 3 条不同的阻塞消息(见下文)。在阻止警告之后有这个警告:
WARNING: No pong from server 2d1fb2ce-940f-4b60-bf60-39847f31bcaf - will consider it dead
我的问题的答案是,它“为什么”阻塞并不重要,因为它已经死了(因为它达到了一定的限制)。
我很惊讶 Vert.x 没有更优雅地处理这个问题——比如可能抛出异常。
阻止错误 #1
线程在 io.vertx.core.impl.future.FutureImpl.addListener(FutureImpl.java:140) 在 io.vertx.core.impl.future.PromiseImpl.addListener(PromiseImpl.java:23) 在 io.vertx.core.impl.future.FutureImpl.onComplete(FutureImpl.java: 133) 在io.vertx.core.impl.future.PromiseImpl.onComplete(PromiseImpl.java:23) 在io.vertx.core.spi.cluster.impl.selector.Selectors.withSelector(Selectors.java:48) 在
阻止错误 #2
io.vertx.core.VertxException: 线程阻塞 在 java.nio.charset.CharsetEncoder.(CharsetEncoder.java:198) 在 java.nio.charset.CharsetEncoder.(CharsetEncoder.java:233) 在 sun.nio.cs.UTF_8$Encoder.(UTF_8.java:558) 在 sun.nio.cs.UTF_8$Encoder.(UTF_8.java:554) 在 sun.nio.cs.UTF_8.newEncoder(UTF_8.java:72)
阻止错误 #3
io.vertx.core.VertxException: 线程阻塞 在 io.vertx.core.eventbus.impl.clustered.ConnectionHolder.writeMessage(ConnectionHolder.java:93) 在 io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.sendRemote(ClusteredEventBus.java:332) 在 io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.sendToNode(ClusteredEventBus.java:283)