运行 100,000 个并发进程

Running 100,000 processes concurrently

我正在模拟一个银行系统,其中我有 100,000 笔交易到 运行。每种类型的交易都实现了运行nable,我有各种类型的交易可以发生。

transactions 是一个 Runnable 数组。

理想情况下,以下代码可以解决我的问题:

for (Transaction transaction : transactions) {
    new Thread(transaction).start();
}

但是,当尝试启动 100,000 个线程时,显然会出现 java.lang.OutOfMemoryError: unable to create new native thread

所以接下来我尝试实现一个 ExecutorService 来创建一个线程池来管理我的 100,000 个 运行nables。

ExecutorService service;
int cpus = Runtime.getRuntime().availableProcessors();
// cpus == 8 in my case
service = Executors.newFixedThreadPool(cpus);

for (Transaction transaction : transactions) {
    service.execute(transaction);
}

尝试这种方法时,long 处理 "hog" JVM。例如,一种类型的交易需要 30 - 60 秒才能执行。在分析应用程序时,在长事务发生时不允许其他线程 运行。

在这种情况下,线程 6 在其处理完成之前不允许任何其他线程 运行。

所以我的问题是:如何才能尽可能快地 运行 100,000 笔交易而不 运行 进入内存问题?如果 ExecutorService 是答案,那么我如何才能阻止非常长的事务占用 JVM 并允许其他事务同时 运行?

编辑:

我故意强制某些类型的事务发生 30 - 60 秒,以确保我的线程程序正常工作。每笔交易锁定一个账户,共有 10 个账户。这是我占用 JVM 的方法:(由 run() 调用)

public void makeTransaction() {
    synchronized(account) {
        long timeStarted = System.nanoTime();
        long timeToEnd = timeStarted + nanos;

        this.view = new BatchView(transactionNumber, account.getId());

        this.displayView();

        while(true) {
            if(System.nanoTime() % 1000000000 == 0) {
                System.out.println("batch | " + account.getId());
            }

            if(System.nanoTime() >= timeToEnd) {
                break;
            }
        }
    }
}

每次此交易得到 运行,只有一个帐户被锁定,剩下 9 个应该可以处理。 为什么 JVM 不再处理任何线程,而是挂起直到这个长事务完成?

这是一个 link 项目的缩小版本来演示问题:project

When profiling the application, no other threads are being allowed to run while the long transaction takes place.

很可能,此任务正在使用单线程资源。即 ti 的编写方式会阻止并发使用。

How can I run 100,000 transactions as fast as possible without running into memory problems?

如果交易是 CPU 绑定的,您应该有一个与您拥有的 CPU 数量大致相同的池。

如果事务依赖于数据库,您应该考虑对它们进行批处理以更有效地利用数据库。

If ExecutorService is the answer, then how can I stop very long transactions from hogging the JVM and allow other transactions to run concurrently?

使交易更短。如果您有一个运行时间超过几毫秒的任务,您应该弄清楚为什么要花这么长时间。我首先要看看 network/IO 是如何使用和分析任务的。大多数交易(如果您有大量交易)应该在 0.01 秒左右或更理想。

您应该仔细考虑如何使用共享资源。如果您的任务过多地使用相同的资源,您可能会发现多线程并不快,甚至更慢。

重要的是要根据您的硬件计算可以为您并行处理事务的工作线程数。可用于调整线程池大小的公式很少

对于CPU绑定的应用程序

N * U or (N+1)*U

对于 IO 绑定应用程序

N * U * (1+W/C)

在哪里 N - 处理器数量 U - 目标 CPU 利用率 W - 等待时间 C - 计算时间

例如,如果您的应用程序使用了 50% CPU 并且您有 8 个内核。然后 CPU 绑定应用程序以实现高效的多线程,你有

8 * (0.5) = 4

如果您有 4 个线程,那么您的所有内核都将高效处理。这改变了一些支持 hyperthreading

的公猪

如果您在笔记本电脑甚至 16 核台式机上执行,则很难在单独的线程中执行 100,000 次调用。您将需要一个网格或一组服务器来最佳地执行此操作。

但是,您仍然可以通过在 callback 中执行任何事务操作来扩展它。您的吞吐量可以增加。

您的应用程序的问题是很快所有线程都会为同一个帐户分配一个事务,然后除一个线程外的所有线程都必须等待。你可以在下面的屏幕截图中看到这一点,我暂停了应用程序。线程 pool-1-thread-3 当前正在处理 ID 为 19 的 Account 对象的事务(此 ID 不是您的帐户 ID,而是 Eclipse 分配的唯一对象 ID),所有其他线程都在等待同一锁帐户对象。 account对象就是你id为9的那个。

为什么会这样?在事务 853 中,一个线程启动第一个 long 运行 事务(针对帐户 9)。其他线程继续处理其他事务。但是,当任何线程到达账户 9 的另一笔交易时,它必须停止并等待。交易857、861、862也是账户9的,每个都阻塞一个线程,所以此时我的线程都阻塞了(在我的四核上)。

如何解决这个问题?这取决于您的用例。

如果在您的实际程序中保证只要帐户 X 有另一笔交易 运行 就不会有给定帐户 X 的传入交易,则您无需更改任何内容。

如果您的帐户数量与线程数量相比非常多,则问题不太可能出现,因此您可能会决定忍受它。

如果你的账户数量相对较少(假设可能少于一百个左右),你应该(如彼得所说)每个账户有一个(无休止的-运行)线程,每个线程都有它的自己的事务队列。这可能会更有效,因为线程不需要 "fight" 通过共享队列。

另一种解决方案是实施某种形式的 "work stealing"。这意味着每当一个线程被阻塞时,它就会寻找其他的工作来做。要实现这一点,您首先需要能够检查线程是否可以获得给定帐户的锁定。在 Java 中使用 synchronized 这是不可能的,所以你需要像 ReentrantLock.tryLock(). You also need to be able to directly access the transaction queue from each thread, so I guess you cannot use ExecutorService here but need to implement transaction handling yourself (using a LinkedBlockingQueue).

这样的东西

现在每个线程都会在循环中轮询队列中的事务。首先,它尝试使用 tryLock() 获取相应帐户的锁。如果失败,将事务添加到(特定于线程的)列表,从队列中获取下一个事务,然后尝试这个事务,直到找到可以处理的事务。事务完成后,首先在列表中查找现在可能处理的事务,然后再从全局队列中拉出另一个事务。代码大致如下:

public BlockingQueue<Transaction> queue = ...; // the global queue for all threads

public void run() {
   LinkedList<Transaction> myTransactions = new LinkedList<>();
   while (true) {
     Transaction t = queue.take();
     while (!t.getLock().tryLock()) {
        myTransactions.add(t);
     }
     try {
       // here we hold the lock for t
       t.makeTransaction();
     } finally {
       t.getLock().unlock();
     }

     Iterator<Transaction> iter = myTransactions.iterator();
     while (iter.hasNext()) {
       t = iter.next();
       if (t.getLock().tryLock()) {
         try {
           t.makeTransaction();
         } finally {
           t.getLock().unlock();
         }
         iter.remove();
       }
     }
   }
 }

请注意,这至少还有以下您可能需要解决的问题:

  • 当线程在 queue.take() 中挂起时,它不会检查其列表中的事务是否可用。因此,如果有一段时间 queue 为空(例如在处理结束时),则可能有交易卡在列表中未被处理。
  • 如果某些线程持有大量锁,则其余线程可能会处理大量它们现在无法处理的事务,因此它们只会填满它们的本地列表,耗尽全局队列。释放锁时,许多事务可能已从全局队列中删除,从而在线程可以完成的工作之间造成不平衡(一些线程可能空闲,而其他线程仍在处理长期积压的事务)。

一个更简单的替代方法可能是 put() 如果您无法为它们获取锁,则将事务放入队列(最后),但这会使它们以非常任意的顺序执行(这可能会发生上面的解决方案也是,但可能不是那么极端。

编辑: 更好的解决方案可能是将队列附加到每个帐户而不是特定于线程的列表。然后,只要发现该帐户被阻止,线程就会将交易添加到相应帐户的队列中。当线程完成账户 X 的交易时,它应该首先查看账户 X 的队列,如果有任何交易已添加到那里,然后再查看全局列表。