在许多数据库记录上实现 Java 线程的有效方法

An effective way to implement Threads in Java on many DB records

我正处于 Java 实验系统的开发阶段,它有一个沉重的 MySQL 数据库,包含数千条记录,每条记录都需要执行一些操作,并且并行

我很清楚如何使用 Java 线程,但我不知道 best/efficient 在来自数据库的大量记录上使用它的方法是什么。

假设我们查看以下数据库 table:

Table technicians
    ID          NUMBER
    DISTRICT_CODE       NUMBER NOT NULL
    EVENT_START_DATE    DATE NOT NULL
    EVENT_END_DATE      DATE NOT NULL
    INCHARGE        NUMBER NOT NULL
    EFFECTIVE_FROM      DATE DEFAULT SYSDATE NOT NULL
    EFFECTIVE_TO        DATE
    STATUS          NUMBER NOT NULL

然后我们将执行以下提取:

SELECT * FROM technicians WHERE INCHARGE = 23;

现在,我正在认真考虑是否将提取的信息放入List(例如ArrayList)或其他数据结构中,(注意每次提取包含大约4000条记录,并且每3秒发生一次一遍又一遍)以及如何为每条记录分别实现 Thread。

提出的天真的想法是,在查询数据库并接收信息后,循环遍历每个记录条目(sql.hasNext () 例如)和 运行 ThreadPoolExecutor每个记录上的对象,但我倾向于相信有更有效和更快的方法。

欢迎提出任何建议

编辑:我看到有人提出了关于对每条记录要采取的行动的问题,所以我会尽力回答。

对于每一行,我们将为每个字段 运行 几个不同的 API 以确保其答案类型的正确性(例如正确、不正确、正确但值很短等)等等。

对我来说重要的是要注意每个操作都是针对系统外部的 API 发生的(位于不同的远程服务器上),因此有时对于单个字段,多次调用将制作不同的 API,因此高功率和并行工作很重要。

例如:

对于 INCHARGE 字段 - 我们会将值发送到外部 API 来源,该来源将检查数据,如果信息正确,我们将再次将该字段发送到另一个 API我们将获得相关信息。

您似乎希望每三秒处理一次数据库中的某些行。每次,您要查询大约四千行。这些行中的每一行都需要单独处理,而不考虑 table 中的其他行。听起来您没有更新行,而是通过调用其他服务(例如进行 Web 服务调用)来发送行的数据。

是,使用执行器服务

因此,将数据加载到内存中,因为容量似乎很小。定义一个 class 来保存每一行的数据。由于我们主要使用此 class 来透明地通信数据和 immutable,因此将 class 定义为 record.

record Technician ( int id , LocalDate eventStart , … ) {}

在循环查询结果集时实例化这些 Technician 对象。

对于每个 Technician 对象,传递给实现 Callable 的 class 的构造函数。该 class 的 run 方法定义了您在处理该行的数据、传递给 Web 服务调用等方面需要做的工作。

一个 Callable return 一个值。让我们定义另一条记录来表示 success/failure 和记录的 ID。

record TechnicianProcessingResult ( int id , boolean succeeded ) {}

将该记录设为我们 Callable 的类型。

class ProcessTechnicianTask implements Callable< TechnicianProcessingResult > {

    private final Technician technician ;

    ProcessTechnicianTask( Technician t ) { // Constructor.
        this.technician = t ;
    }
 
    public TechnicianProcessingResult call() {
        System.out.println( "Processing technician Id " + this.technician.id );
        …
        return new TechnicianProcessingResult( this.technician.id , true ) ;
        … or …
        return new TechnicianProcessingResult( this.technician.id , false ) ;
    }
}

为每个 Technician 对象实例化一个任务,您为从数据库中检索的每一行实例化了该对象。收集任务。

List< ProcessTechnicianTask > tasks = new ArrayList<>() ;
…
tasks.add( new ProcessTechnicianTask( nthTechnician ) ) ;

将该任务集合提交给您已经建立的执行程序服务。通常指定几乎与 CPU 个可用内核一样多的线程。

ExecutorService executorService = Executors.newFixedThreadPool( 5 ) ;
…
List< Future< TechnicianProcessingResult > > futures = executorService.invokeAll( tasks , 3 , TimeUnit.SECONDS ) ;

注意 time-out 参数,以防万一出现问题并且您的任务需要太多时间才能完成。

检查期货列表以查看它们是否 done, and if any were canceled,并检查其结果对象。

您想每三秒重复一次。所以也创建一个single-threadedScheduledExecutorService。安排一个重复任务,一个 RunnableCallable,完成上述数据库查询工作,实例化 Technician 个对象,将每个对象分配给一个 ProcessTechnicianTask 对象,所有这些都提交给我们其他执行者服务。

请务必正常关闭您的执行程序服务对象。否则他们的后台线程池可能会 运行 无限期地继续,就像一个僵尸 ‍♂️。请参阅 Java 文档中提供的样板代码。

所有这些都已在 Stack Overflow 上多次提及。所以搜索以了解更多信息。

您似乎已经想到了这种方法。但是您想知道是否有“更有效和更快的方法”。不,我看不出有什么更好的办法。您的瓶颈是对 API 进行网络调用,大概是网络服务调用。与等待网络调用返回响应的线程相比,创建记录对象、收集它们并将它们提交给执行程序服务将非常快。

Loom 项目

在您的场景中可能会显着提高性能的一件事是 virtual threads and structured concurrency promised by Project Loom

在当前 Java 中,每个 Java 线程直接映射到主机 OS 线程。在 Web 服务调用期间,您的几个线程中的每一个都将处于空闲状态,停止执行直到这些调用 return。这些线程是重量级的,所以我们不能有很多。

在 Loom 项目中,许多虚拟线程映射到每个主机 OS 线程。这些虚拟线程是轻量级的,所以我们可以有几千,甚至几百万。当虚拟线程阻塞时,例如等待您的 Web 服务调用 return,该虚拟线程将从主机 OS 线程“停放”/“卸载”,以便另一个虚拟线程可以使用主机线。因此,其他虚拟线程可以在前一个虚拟线程等待其对 return.

的 Web 服务调用时完成工作

在您的情况下,一次可以放置更多的 Web 服务调用,而不是一次只放置几个。您机器上的 CPU 核心保持忙碌得多,在更短的时间内处理更多的行。您可能会看到 multiple-fold 处理时间有所缩短。

Loom 项目尚未完成,但正在积极开发中。实验版本是 available now,基于 early-access Java 19。请参阅文章、访谈和团队成员(包括 Ron Pressler 和 Alan Bateman)的演讲。

更好的设计:

不要将 cron 或事件用于 可能 需要比分配的时间间隔更长的时间才能完成的重复任务。

相反,有一个单独的程序运行所有 4000 个项目(需要多少时间),然后重新开始。

进一步评论:

使用 multi-processing 还是 multi-threading 可能并不重要。

如果是线程,请记住 MySQL 连接不是 thread-safe;您必须为每个线程使用单独的连接。

虽然 MySQL 可以轻松处理 4000 个 空闲 连接,但它不能很好地处理 4000 个查询 同时活动 。 (通常 100 对它来说是一个挑战。通常,解决方案是加快查询速度。)

将 4000 拆分为多个线程可能没有用。拥有比 CPU 核心更多的线程很少有用。他们以新的方式相互绊倒——OS 如何处理协调。

拆分 4000 会使这个额外的过程复杂化 -- 您可能需要一个主程序来监视其子程序以查看何时该重新开始。或者(可能更好)有 20 个线程,每个线程执行特定的 200 个任务。