simpMessagingTemplate convertAndSendToUser 大量等待线程阻塞其他功能

simpMessagingTemplate convertAndSendToUser lot of waiting threads blocking other functionality

我们在应用程序中使用了 Stomp、SpringBoot 和 WebSockets。服务器应用程序正在执行以下操作: 1)生成要推送给用户的消息, 2) 接受 WebSocket 连接和 3) 将消息推送到 ActiveMQ stomp broker。线程转储显示大量与 simpMessagingTemplate convertAndSendToUser API 调用关联的等待线程。

应用程序的两个实例 运行 在云中。此应用程序使用 simpMessagingTemplate convertAndSendToUser API 生成消息并推送到 ActiveMQ stomp 代理(运行 单独)。

我们使用 Gatling 来模拟用户 WebSocket 连接以进行负载测试。 Gatling 在单独的实例上运行。该应用程序适用于 2000 个用户连接。一旦我们将用户增加到 4000,我们就会看到消息生成线程停止。不过,用户可以毫无问题地连接到同一台服务器。

如果我们对 simpMessagingTemplate convertAndSendToUser API 调用进行评论,那么一切都会正常工作(生成消息和新的 WebSocket 连接)。所以我们怀疑 convertAndSendToUser API.

的问题

Threaddump 堆栈跟踪如下:

"ForkJoinPool-1-worker-440" #477 daemon prio=5 os_prio=0 tid=0x00007f0c541c2800 nid=0x2a47 sleeping[0x00007f08e6371000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
 at java.lang.Thread.sleep(Native Method)
 at reactor.util.concurrent.WaitStrategy$Sleeping.waitFor(WaitStrategy.java:319)
 at reactor.core.publisher.MonoProcessor.block(MonoProcessor.java:211)
 at reactor.core.publisher.MonoProcessor.block(MonoProcessor.java:176)
 at org.springframework.messaging.tcp.reactor.AbstractMonoToListenableFutureAdapter.get(AbstractMonoToListenableFutureAdapter.java:73)
 at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$SystemStompConnectionHandler.forward(StompBrokerRelayMessageHandler.java:980)
 at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.handleMessageInternal(StompBrokerRelayMessageHandler.java:549)
 at org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler.handleMessage(AbstractBrokerMessageHandler.java:234)
 at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:138)
 at org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(ExecutorSubscribableChannel.java:94)
 at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:119)
 at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:105)
 at org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(SimpMessagingTemplate.java:187)
 at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:162)
 at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:48)
 at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
 at org.springframework.messaging.simp.user.UserDestinationMessageHandler.handleMessage(UserDestinationMessageHandler.java:227)
 at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:138)
 at org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(ExecutorSubscribableChannel.java:94)
 at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:119)
 at org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(SimpMessagingTemplate.java:187)
 at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:162)
 at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:48)
 at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
 at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:150)
 at org.springframework.messaging.simp.SimpMessagingTemplate.convertAndSendToUser(SimpMessagingTemplate.java:229)
 at org.springframework.messaging.simp.SimpMessagingTemplate.convertAndSendToUser(SimpMessagingTemplate.java:218)
 at org.springframework.messaging.simp.SimpMessagingTemplate.convertAndSendToUser(SimpMessagingTemplate.java:204)
 at com.mypackage.PushMessageManager.lambda$sendMyMessage(PushMessageManager.java:77)
 at com.mypackage.PushMessageManager$$Lambda3/1850582969.accept(Unknown Source)
 at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
 at java.util.stream.ReferencePipeline.accept(ReferencePipeline.java:175)
 at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
 at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
 at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
 at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
 at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
 at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
 at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
 at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
 at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
 at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
 at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
 at com.mypackage.PushMessageManager.sendMyMessage(PushMessageManager.java:74)
 at com.mypackage.PushMessageManager.lambda$processPushMessage[=10=](PushMessageManager.java:61)
 at com.mypackage.PushMessageManager$$Lambda4/624459498.run(Unknown Source)
 at nl.talsmasoftware.context.functions.RunnableWithContext.run(RunnableWithContext.java:42)
 at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at nl.talsmasoftware.context.executors.ContextAwareExecutorService.call(ContextAwareExecutorService.java:59)
 at nl.talsmasoftware.context.delegation.RunnableAdapter.run(RunnableAdapter.java:44)
 at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
 at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
 at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
 at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
 at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

   Locked ownable synchronizers:
 - None

步骤如下w.r.t示意图:

  1. Gatling JMS 发布者以每分钟 20000 条消息的速度将 JMS 消息推送到 Active MQ 代理。请注意,这些消息并非只针对一位用户。它基于 WebSocket 用户连接进行分发。
  2. 我们的应用程序有一个 JMS 侦听器来接收这些消息。我们是 运行 说应用程序的 2 个实例,因此有两个 JMS 侦听器来处理此消息。
  3. 应用程序收到 JMS 消息后,它会检查缓存中的会话信息以识别连接的用户,并使用 simpMessagingTemplate convertAndSendToUser API simpMessagingTemplate.convertAndSendToUser(sessionId, "/queue/abc”,有效载荷)。请注意,当用户首次连接到应用程序时,sessionId 存储在分布式缓存中。所以这些是有效的会话 ID。
  4. ActiveMQ stomp broker 然后将这些消息传播到各个用户 stomp 队列。
  5. Gatling WebSocket 客户端(每个客户端有 2000 个用户连接)应通过 WebSocket 连接接收这些消息。
  6. 客户端连接和订阅看起来像这样

    stompClient.connect({'username': $("#userName").val()}, function (frame) { 设置连接(真); subscription = stompClient.subscribe('/user/queue/abc', function (message) { showData(JSON.parse(message.body)); },headers = {'loginusername': $("#userName").val()}); });

因此,每个用户只能收到发给他们的消息,而不是所有消息。这就是我们在通过 WebSocket 连接时将用户连接到各个队列并使用 convertAndSendToUser 将消息推送到特定会话的原因。后端 JMS 发布者确保消息以循环方式发布给用户。

为了回答您有关识别瓶颈的问题,如果我们连接 2000 个用户,一切正常。但是当我们添加更多用户时,我们发现应用程序的 JMS 侦听器无法侦听后端 Gatling JMS 负载生成器每分钟发送的 20000 条消息。 ActiveMQ JMS 队列深度因此而增加。

为了确保瓶颈是 convertAndSendToUser API 我们所做的是评论 API 调用。如果我们这样做,我们能够连接 ~13k WebSocket 连接,并且后端 JMS 侦听器也能够每分钟消息消耗所有 20000 条消息。

希望这能澄清您的一些问题。 更新 下面给出了显示 simpMessagingTemplate.convertAndSendToUser API 的异步调用的代码片段。这里 RepositoryUtil.executor() 是我们自己的执行器对象包装器。

    public CompletableFuture<Void> processPushMessage(String userName, String payload) {
    return ContextAwareCompletableFuture.runAsync(() -> {
        sendABCMessage(payload, userName);
    }, RepositoryUtil.executor());
}

public void sendABCMessage(@Payload String payload, String username) {
    ArrayList<UserProfiles> userProfiles = (ArrayList<UserProfiles>) cacheService.getValue(username);
    if (Objects.nonNull(userProfiles) && userProfiles.size() > 0) {
      userProfiles.parallelStream()
          .filter(userProfiles1 -> ("/user/queue/abc".equalsIgnoreCase(userProfiles1.getSubscribeMapping()) && username.equals(userProfiles1.getUserName())))
          .forEach(userProfiles1 -> {              simpMessagingTemplate.convertAndSendToUser(userProfiles1.getSessionId(), "/queue/abc", payload);
          });
    } else {
      LOGGER.info("sendABCMessage userProfiles is null. Payload: {}", payload);
    }
}

The application works fine for 2000 user connections with a load of 20,000 messages per minute. Once we increase the users to 4000 we see that the message generation thread stops.

如果您向 ActiveMQ 推送 20,000 条消息并且每条消息有 1,000 个订阅者,这意味着 20,000,000 条消息 (1,000 * 20,000) 被发布回 WebSocket 客户端。因此,尝试确定流经的消息总量并了解瓶颈在哪里(服务器向 ActiveMQ 转发消息、ActiveMQ 处理消息或服务器向 WebSocket 客户端发布消息)。

对于 20,000 条消息,它们是从单个线程生成的,还是从大量不同的线程发送的,例如作为处理来自 WebSocket 客户端或 REST HTTP 调用的消息的结果?如果是后者,则可能是太多线程试图同时将消息转发给代理,您可能必须应用某种速率限制。

归根结底,您需要了解总体容量、瓶颈所在以及应用某些速率限制的位置。

我们可以通过移至 /user/topic 而不是 /user/queue 来解决问题。我们现在能够处理来自后端和 8k 网络套接字用户连接的每分钟约 35k 条消息。