Java GRPC 生成了许多新线程(Minecraft Spigot 插件)

Java GRPC spawns to many new threads (Minecraft Spigot Plugin)

我正在编写一个通过 GRPC 发送和接收消息的 Spigot 插件。 我的问题是,在查看时间后服务器崩溃,因为该进程无法为新线程打开更多文件处理。 从查看一些线程转储来看,GRPC 似乎每秒生成多个线程而从未关闭它们。 我用来接收消息流的代码如下所示:

public class GRPCMessageReceiver extends BukkitRunnable {

ChatSyncGrpc.ChatSyncBlockingStub chatSync;
LockedQueue lockedQueue = new LockedQueue();
Iterator<ChatMessage> receivingIterator;

public GRPCMessageReceiver(ChatSyncGrpc.ChatSyncBlockingStub chatSync, LockedQueue queue){
    lockedQueue = queue;
    this.chatSync = chatSync;
}

@Override
public void run() {
    if (receivingIterator == null) receivingIterator = chatSync.beginReceive(Empty.newBuilder().build());
    try {
        if (receivingIterator.hasNext()) {
            ChatMessage message = receivingIterator.next();
            lockedQueue.Put(message);
            message.
        }
    } catch (io.grpc.StatusRuntimeException e) {
        receivingIterator = chatSync.beginReceive(Empty.newBuilder().build());
        Bukkit.getLogger().info("Chat Sync Connection Failed created new one!");
    }
}

} 所有线程都有相同的堆栈跟踪:

  java.lang.Thread.State: WAITING
  at jdk.internal.misc.Unsafe.park(Unsafe.java:-1)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:211)
  at io.grpc.stub.ClientCalls$ThreadlessExecutor.waitAndDrain(ClientCalls.java:731)
  at io.grpc.stub.ClientCalls$BlockingResponseStream.waitForNext(ClientCalls.java:622)
  at io.grpc.stub.ClientCalls$BlockingResponseStream.hasNext(ClientCalls.java:643)
  at HausSheepPlugin.GRPCMessageReceiver.run(GRPCMessageReceiver.java:26)
  at org.bukkit.craftbukkit.v1_16_R2.scheduler.CraftTask.run(CraftTask.java:81)
  at org.bukkit.craftbukkit.v1_16_R2.scheduler.CraftAsyncTask.run(CraftAsyncTask.java:54)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
  at java.lang.Thread.run(Thread.java:832)

这是我的问题还是 GRPC 的问题?

hasNext() 等待元素到达或流关闭。

如果一个元素还没有到达,它将一直等待直到另一个元素到达。当许多调用这样做时,等待的线程多于元素,因此线程不断堆积。

所以,或多或少:您不应该以这种方式构建您对流的接收。要么使用 async API 和 StreamObserver —— 这似乎无论如何都符合你的代码的需要 —— 或者让 run() 消耗整个迭代器,而不是试图只消耗一个元素。