在请求之间共享应用程序线程池

Share Application Thread Pool among Requests

我有一个基于 Spring 的应用程序,它有一个 REST 服务,它导入一个 CSV 文件,解析它的内容,并在逐条记录处理后将数据(再次逐条记录)存储到数据库。将数据存储到数据库非常耗时,因为文件可能会增长到数千条记录。

现在我想到了多线程——将数据的处理和存储委托给线程。 好主意我想,但后来我想通了:等等 - 可以有多个用户同时导入文件,所以创建线程池每个请求有几百个 thead可能不是个好主意.

是否可以为每个应用程序创建一个包含 1000 个线程的线程池,并在到达该应用程序的所有请求之间共享该线程池 - 是的。但是如何限制每个线程占用的线程数?

我想限制每个应用程序的线程数,以免服务器过载,并且我想限制每个请求占用的线程数,以免一个线程消耗所有可用资源(尤其是线程) 并饿死所有其他可能出现的请求...

有什么想法、想法吗?

我们可以使用信号量来限制访问特定资源的并发线程数。java.util.concurrent.Semaphore在下面的例子中,我们将实现一个简单的登录队列来限制系统中的用户数:

class LoginQueueUsingSemaphore {

    private Semaphore semaphore;

    public LoginQueueUsingSemaphore(int slotLimit) {
        semaphore = new Semaphore(slotLimit);
    }

    boolean tryLogin() {
        return semaphore.tryAcquire();
    }

    void logout() {
        semaphore.release();
    }

    int availableSlots() {
        return semaphore.availablePermits();
    }

}

注意我们是如何使用以下方法的:

tryAcquire()

return 如果许可证立即可用则为真,否则获取它 return 为假,但 acquire() 会获取许可证并在许可证可用之前阻塞。

release()

发放许可证

availablePermits()

return 当前可用许可证数量

PS: 使用的示例来自 site

重新表述您的问题

这是一个相当复杂的情况。我会尝试重新措辞,看看我是否正确理解了您想要的行为。

您有多个应用程序可能会接收要在中央数据库上进行的批量更新。这些更新来自 CSV 文件,每批可以包含数千条记录。您希望并行处理这些更新,但是:

  1. 您想限制每个应用程序使用的线程数
  2. 如果并行处理多个批次,您想保证单个批次的进度

第一个有缺陷的提案

要限制每个应用程序使用的线程数,您可以为每个应用程序使用类似 fixed-size executor service 的东西。通过为每个 ExecutorService 的底层线程池提供适当的大小,您可以保证单个应用程序不会饿死其他应用程序。

然后可以将单个记录作为单个任务提交给执行程序服务。如果单个应用程序正在处理多个批次,来自这些批次的单个记录将被放入执行器服务中的单个队列中。来自不同批次的记录将交错,因为它们混合在执行程序服务的单个队列中。

此解决方案的问题在于它不能保证所有批次都同时处理。假设您有一个包含 4 个线程的池来支持执行程序服务。如果一大批记录被提交给执行者服务,所有 4 个线程都会开始处理这些记录。现在,如果第二批进来,它将在第一批之后添加到队列中,这意味着 4 个线程将在处理第二批之前处理第一批的所有记录。这很好,因为 4 个线程一直处于忙碌状态,但这不是您想要的行为。在这种情况下,您希望池中至少有一个线程开始处理第二批记录,对吗?

可能的解决方案?

我认为您可以实现类似于固定线程执行器服务的东西来解决您的特定问题。这是我会做的。

您可以创建一个线程池(以下简称 "worker threads")来处理来自多个队列的单个记录。一个队列对应一批记录。当需要处理新批次时,创建一个新队列并将其插入到工作线程正在从中获取记录进行处理的队列环中。将您的记录放入其中,以便线程可以在另一端处理它们,并在您的批次完成时从环中删除队列(队列为空,您没有更多记录可放入该批次)。所有队列都保持在一个环中,这样每个线程都可以遵循以下例程:

  1. 处理队列中的一条记录
  2. 移动到下一个队列(比如右边的队列)
  3. 从第 1 步开始重复

使用这样的方案,您可以确定无论有多少批次来来去去,它们都会取得进展,即使批次的数量多于处理它们的线程数。如果当前只有一个正在处理,工作线程也将能够专注于单个批次。

我建议您使用 ConcurrentLinkedQueue 之类的东西来管理您的批次。显然,在实现这种机制时存在许多编程陷阱。

  • 当队列从环中移除时,工作线程对队列的并发访问,
  • 当没有队列和记录要处理时如何处理工作线程),
  • 当工作人员检查队列中的记录但没有要处理的记录时,您会怎么做(跳到下一个队列?)
  • 当只有一个队列但所有记录都已处理并且工作人员正在等待工作时(阻塞地等待队列中的记录到来?如果批处理实际完成怎么办)

鉴于你的问题的复杂性,你似乎很有经验,我认为你应该能够弄清楚这一点。如果没有,我希望在 Whosebug 上看到您提出的更多问题!