Java 使用 EJB 定时器服务的多线程
Java multithreading with EJB Timer Service
我需要每分钟从数据库中提取一批消息(大约 20 条消息)并需要同时处理它们。我正在使用 EJB 计时器服务(调度程序)每分钟从数据库中获取消息。
基本上我需要每分钟挑选20-30条消息,处理完这些我需要发送一些邮件。处理消息涉及的数据库操作很少。
能否建议我如何使用 java.concurrent 包中的执行程序服务框架以及如何每分钟提交这些消息?
您好,这是一个使用 Java 的 ExecutorService、CountDownLatch 和 CompletableFuture 的基本示例。这个例子只是给你指出正确的方向,绝不是完美的,它使用了很多 Java8 的东西(我假设你正在使用 Java8)。此外,我没有使用 EJB Timer 的东西,而是使用 ScheduledExecutorService,但我猜你可以轻松地交换它们。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class BatchMessageProcessingExample {
private static final int BATCH_SIZE = 20;
//Having 20 here may not entirely benefit you. Chosing this number depends on a lot of stuff.
// Its usually better to go with total number of cores you have
private final ExecutorService pool = Executors.newFixedThreadPool(BATCH_SIZE);
private final ScheduledExecutorService databasePool = Executors.newScheduledThreadPool(1);
public void schedule() {
databasePool.scheduleWithFixedDelay(() -> runBatchProcess(), 0, 1, TimeUnit.MINUTES); //Schedule the database work to execute every minute
}
private void runBatchProcess() {
List<Message> taskFromDbFetch = getMessagesFromDb(); //Get stuff from the db
CountDownLatch countDownLatch = new CountDownLatch(taskFromDbFetch.size()); //Create a latch having same size as the list
List<Task> taskList = taskFromDbFetch.stream().map(x -> new Task(countDownLatch, x)).collect(Collectors.toList()); // Create tasks using the messages and the countdown latch
taskList.forEach(pool::execute); //Submit them all in pool
CompletableFuture.runAsync(() -> sendEmailAfterCompletion(countDownLatch)); //Send an email out from a separate thread
}
private void sendEmailAfterCompletion(CountDownLatch countDownLatch) {
try {
countDownLatch.await();//Await on the latch for the batch tasks to complete
sendEmail();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void sendEmail() {
System.out.println("Sending out an email.");
}
private List<Message> getMessagesFromDb() { //Get your messages from db here
List<Message> messages = new ArrayList<>();
for(int i = 0; i < BATCH_SIZE; i++) {
final int taskNumber = i;
messages.add(() -> System.out.println("I am a db message number " + taskNumber));
}
return messages;
}
class Task implements Runnable {
private final CountDownLatch countDownLatch;
private final Message message;
public Task(CountDownLatch countDownLatch, Message message) {
this.countDownLatch = countDownLatch;
this.message = message;
}
@Override
public void run() {
message.process(); //Process the message
countDownLatch.countDown(); //Countdown the latch
}
}
interface Message {
void process();
}
public static void main(String[] args) {
new BatchMessageProcessingExample().schedule();
}
}
我需要每分钟从数据库中提取一批消息(大约 20 条消息)并需要同时处理它们。我正在使用 EJB 计时器服务(调度程序)每分钟从数据库中获取消息。
基本上我需要每分钟挑选20-30条消息,处理完这些我需要发送一些邮件。处理消息涉及的数据库操作很少。
能否建议我如何使用 java.concurrent 包中的执行程序服务框架以及如何每分钟提交这些消息?
您好,这是一个使用 Java 的 ExecutorService、CountDownLatch 和 CompletableFuture 的基本示例。这个例子只是给你指出正确的方向,绝不是完美的,它使用了很多 Java8 的东西(我假设你正在使用 Java8)。此外,我没有使用 EJB Timer 的东西,而是使用 ScheduledExecutorService,但我猜你可以轻松地交换它们。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class BatchMessageProcessingExample {
private static final int BATCH_SIZE = 20;
//Having 20 here may not entirely benefit you. Chosing this number depends on a lot of stuff.
// Its usually better to go with total number of cores you have
private final ExecutorService pool = Executors.newFixedThreadPool(BATCH_SIZE);
private final ScheduledExecutorService databasePool = Executors.newScheduledThreadPool(1);
public void schedule() {
databasePool.scheduleWithFixedDelay(() -> runBatchProcess(), 0, 1, TimeUnit.MINUTES); //Schedule the database work to execute every minute
}
private void runBatchProcess() {
List<Message> taskFromDbFetch = getMessagesFromDb(); //Get stuff from the db
CountDownLatch countDownLatch = new CountDownLatch(taskFromDbFetch.size()); //Create a latch having same size as the list
List<Task> taskList = taskFromDbFetch.stream().map(x -> new Task(countDownLatch, x)).collect(Collectors.toList()); // Create tasks using the messages and the countdown latch
taskList.forEach(pool::execute); //Submit them all in pool
CompletableFuture.runAsync(() -> sendEmailAfterCompletion(countDownLatch)); //Send an email out from a separate thread
}
private void sendEmailAfterCompletion(CountDownLatch countDownLatch) {
try {
countDownLatch.await();//Await on the latch for the batch tasks to complete
sendEmail();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void sendEmail() {
System.out.println("Sending out an email.");
}
private List<Message> getMessagesFromDb() { //Get your messages from db here
List<Message> messages = new ArrayList<>();
for(int i = 0; i < BATCH_SIZE; i++) {
final int taskNumber = i;
messages.add(() -> System.out.println("I am a db message number " + taskNumber));
}
return messages;
}
class Task implements Runnable {
private final CountDownLatch countDownLatch;
private final Message message;
public Task(CountDownLatch countDownLatch, Message message) {
this.countDownLatch = countDownLatch;
this.message = message;
}
@Override
public void run() {
message.process(); //Process the message
countDownLatch.countDown(); //Countdown the latch
}
}
interface Message {
void process();
}
public static void main(String[] args) {
new BatchMessageProcessingExample().schedule();
}
}