Java 中的多线程,跟踪成功和失败的任务

Multi threading in Java, track success & failure tasks

我正在尝试 运行 在 Spring 中每 5 分钟安排一次异步方法,以在每个 运行 结束时使用 100 threads.At 来处理 1000 个任务需要弄清楚有多少任务失败和成功。 我尝试使用以下示例代码使用 Completable future,但我面临 2 个主要问题。

  1. 如果出现一些异常,则计划重新启动而不完成 运行。
  2. 如何在 run.I 之后获取 success/failure 任务编号 想在最后打印成功任务:[1,2,4,5] 失败任务:[9,10,7, 8]
//ScheduledTask
public void processTask(){
List<CompletableFuture<Integer>> futures=new ArrayList<>();
for(int I=0;i<300;i++){
 futures.add(service.performTask(i));
 }
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}

//MyAsyncService

@Async

public CompletableFuture<Integer> performTask(int i){
try{
Thread.sleep(1000);
final Thread currentThread=Thread.currentThread();
final String oldName = currentThread.getName();
**currentThread.setName(oldName+"-"+i);**
int test=(int) (i+10)/RandomNumber(0,10); // generate random number between 0 to 10 and divide I+10 by that to fail some tasks randomly.
return CompletableFuture.completeFuture(i);
}catch(Exception e){
CompletableFuture<Integer> f = new CompletableFuture<>();
f.completeExceptionally(e);
return f;
}

}

//MyAsyncConfig

public Executor getAsyncExecutor() {
        final ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();

        threadPoolTaskExecutor.setThreadNamePrefix("async-thread-");
        threadPoolTaskExecutor.setCorePoolSize(100);
        threadPoolTaskExecutor.setMaxPoolSize(300);

        threadPoolTaskExecutor.initialize();

        return threadPoolTaskExecutor;
    }

如果任何任务失败,对 CompleteableFuture 上的 join() 的调用将抛出异常,因此这很可能是问题所在。

不要调用 join,而是使用 whenComplete 方法尝试类似这样的操作来获取成功/失败的计数:

    public void processTask(){
        List<CompletableFuture<Integer>> futures = new ArrayList<>();
        List<Integer> success = new ArrayList<>();
        List<Integer> failures = new ArrayList<>();

        for (int i = 0; i < 300; i++) {
            int rec = i;
            futures.add(CompletableFuture.supplyAsync(() -> performTask(rec)).whenComplete((a, ex) -> {
                synchronized (success) {
                    if (ex == null)
                        success.add(rec);
                    else
                        failures.add(rec);
                }
            }));
        }

        for (CompletableFuture<Integer> f : futures) {
            try {
                f.join();
            } catch (CompletionException ex) {
                // ignore
            }
        }

        System.out.println("Successes = " + success);
        System.out.println("Failures = " + failures);
    }

    public Integer performTask(int i) {
        try {
            Thread.sleep(10);
        } catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }

        int test = (int) (i + 10) / RandomUtils.nextInt(0, 10);
        return i;
    }

编辑:报告失败次数时出现问题,我已修复。

编辑 #2:我错误地开始 whenComplete 等待完成后再继续。那是不正确的,代码已被调整。

编辑 #3:post 中所写的方法 performTask 实际上并不异步执行任何工作。我有 re-written 它是异步的。

嗯,我想我们还需要join来等待计划任务的整套CompletableFutures完成:)

另外,为了确保定时任务完成,我们需要使用 exceptionally() 或 handle() 来捕获异常。

    public void processTask() {
        List<CompletableFuture<Integer>> futures=new ArrayList<>();
        List<Integer> successes = new ArrayList<>();
        List<Integer> failures = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            int record = i;
            CompletableFuture<Integer> integerCompletableFuture = performTask(i).handle(((integer, throwable) -> {
                if (throwable != null) {
                    failures.add(record);
                } else {
                    successes.add(record);
                }
                return integer;
            }));

            futures.add(integerCompletableFuture);
        }
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        log.info("successes: {}", successes);
        log.info("failures: {}", failures);
    }

这将打印如下内容:

[main] INFO rxjava.CompletableFutureConcurrency - successes: [0, 1, 3, 4, 5, 6, 7, 9]
[main] INFO rxjava.CompletableFutureConcurrency - failures: [2, 8]

如果您不需要成功记录,也可以这样。

public void processTask(){
        List<CompletableFuture<Integer>> futures=new ArrayList<>();
        List<Integer> failures = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            int record = i;
            CompletableFuture<Integer> integerCompletableFuture = performTask(i)
                    .exceptionally(throwable -> {
                         failures.add(record);
                         return record;
                    });
            futures.add(integerCompletableFuture);
        }
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        log.info("failures: {}", failures);

这将打印如下内容:

[main] INFO rxjava.CompletableFutureConcurrency - failures: [2, 8]

PS:这里我用的是int record = i。你可以在不知道我为什么这样做的情况下玩 ;)

更新于 26.03.2022

performTask 的异步执行由 @Async 注释处理。正如 @user3134221 所问,如果我们想使用我们自己的执行器,我们可以使用 supplyAsync of CompletableFuture.

CompletableFuture.supplyAsync

我在这里做了一个demo(抱歉,仓促完成,不够完美:/),你可以看看Example of how to run performTask in different threads

为了回答为什么重复使用同一个线程的评论,我认为以下代码是负责的,因为它同步而不是异步地完成未来。

for(int I=0;i<300;i++){
 futures.add(service.performTask(i));
 }

performTask方法returns一个完成的未来,调用者等待响应。因此,一切都是顺序执行的,而不是在多个线程中并行执行的。

相反,在将 performTask o/p 添加到期货集合时尝试使用 supplyAsync。