Java 中的多线程,跟踪成功和失败的任务
Multi threading in Java, track success & failure tasks
我正在尝试 运行 在 Spring 中每 5 分钟安排一次异步方法,以在每个 运行 结束时使用 100 threads.At 来处理 1000 个任务需要弄清楚有多少任务失败和成功。
我尝试使用以下示例代码使用 Completable future,但我面临 2 个主要问题。
- 如果出现一些异常,则计划重新启动而不完成 运行。
- 如何在 run.I 之后获取 success/failure 任务编号 想在最后打印成功任务:[1,2,4,5] 失败任务:[9,10,7, 8]
//ScheduledTask
public void processTask(){
List<CompletableFuture<Integer>> futures=new ArrayList<>();
for(int I=0;i<300;i++){
futures.add(service.performTask(i));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
//MyAsyncService
@Async
public CompletableFuture<Integer> performTask(int i){
try{
Thread.sleep(1000);
final Thread currentThread=Thread.currentThread();
final String oldName = currentThread.getName();
**currentThread.setName(oldName+"-"+i);**
int test=(int) (i+10)/RandomNumber(0,10); // generate random number between 0 to 10 and divide I+10 by that to fail some tasks randomly.
return CompletableFuture.completeFuture(i);
}catch(Exception e){
CompletableFuture<Integer> f = new CompletableFuture<>();
f.completeExceptionally(e);
return f;
}
}
//MyAsyncConfig
public Executor getAsyncExecutor() {
final ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("async-thread-");
threadPoolTaskExecutor.setCorePoolSize(100);
threadPoolTaskExecutor.setMaxPoolSize(300);
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
如果任何任务失败,对 CompleteableFuture 上的 join() 的调用将抛出异常,因此这很可能是问题所在。
不要调用 join,而是使用 whenComplete 方法尝试类似这样的操作来获取成功/失败的计数:
public void processTask(){
List<CompletableFuture<Integer>> futures = new ArrayList<>();
List<Integer> success = new ArrayList<>();
List<Integer> failures = new ArrayList<>();
for (int i = 0; i < 300; i++) {
int rec = i;
futures.add(CompletableFuture.supplyAsync(() -> performTask(rec)).whenComplete((a, ex) -> {
synchronized (success) {
if (ex == null)
success.add(rec);
else
failures.add(rec);
}
}));
}
for (CompletableFuture<Integer> f : futures) {
try {
f.join();
} catch (CompletionException ex) {
// ignore
}
}
System.out.println("Successes = " + success);
System.out.println("Failures = " + failures);
}
public Integer performTask(int i) {
try {
Thread.sleep(10);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
int test = (int) (i + 10) / RandomUtils.nextInt(0, 10);
return i;
}
编辑:报告失败次数时出现问题,我已修复。
编辑 #2:我错误地开始 whenComplete 等待完成后再继续。那是不正确的,代码已被调整。
编辑 #3:post 中所写的方法 performTask 实际上并不异步执行任何工作。我有 re-written 它是异步的。
嗯,我想我们还需要join来等待计划任务的整套CompletableFutures完成:)
另外,为了确保定时任务完成,我们需要使用 exceptionally() 或 handle() 来捕获异常。
public void processTask() {
List<CompletableFuture<Integer>> futures=new ArrayList<>();
List<Integer> successes = new ArrayList<>();
List<Integer> failures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
int record = i;
CompletableFuture<Integer> integerCompletableFuture = performTask(i).handle(((integer, throwable) -> {
if (throwable != null) {
failures.add(record);
} else {
successes.add(record);
}
return integer;
}));
futures.add(integerCompletableFuture);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
log.info("successes: {}", successes);
log.info("failures: {}", failures);
}
这将打印如下内容:
[main] INFO rxjava.CompletableFutureConcurrency - successes: [0, 1, 3, 4, 5, 6, 7, 9]
[main] INFO rxjava.CompletableFutureConcurrency - failures: [2, 8]
如果您不需要成功记录,也可以这样。
public void processTask(){
List<CompletableFuture<Integer>> futures=new ArrayList<>();
List<Integer> failures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
int record = i;
CompletableFuture<Integer> integerCompletableFuture = performTask(i)
.exceptionally(throwable -> {
failures.add(record);
return record;
});
futures.add(integerCompletableFuture);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
log.info("failures: {}", failures);
这将打印如下内容:
[main] INFO rxjava.CompletableFutureConcurrency - failures: [2, 8]
PS:这里我用的是int record = i
。你可以在不知道我为什么这样做的情况下玩 ;)
更新于 26.03.2022
performTask 的异步执行由 @Async
注释处理。正如 @user3134221 所问,如果我们想使用我们自己的执行器,我们可以使用 supplyAsync
of CompletableFuture
.
我在这里做了一个demo(抱歉,仓促完成,不够完美:/),你可以看看Example of how to run performTask in different threads
为了回答为什么重复使用同一个线程的评论,我认为以下代码是负责的,因为它同步而不是异步地完成未来。
for(int I=0;i<300;i++){
futures.add(service.performTask(i));
}
performTask方法returns一个完成的未来,调用者等待响应。因此,一切都是顺序执行的,而不是在多个线程中并行执行的。
相反,在将 performTask o/p 添加到期货集合时尝试使用 supplyAsync。
我正在尝试 运行 在 Spring 中每 5 分钟安排一次异步方法,以在每个 运行 结束时使用 100 threads.At 来处理 1000 个任务需要弄清楚有多少任务失败和成功。 我尝试使用以下示例代码使用 Completable future,但我面临 2 个主要问题。
- 如果出现一些异常,则计划重新启动而不完成 运行。
- 如何在 run.I 之后获取 success/failure 任务编号 想在最后打印成功任务:[1,2,4,5] 失败任务:[9,10,7, 8]
//ScheduledTask
public void processTask(){
List<CompletableFuture<Integer>> futures=new ArrayList<>();
for(int I=0;i<300;i++){
futures.add(service.performTask(i));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
//MyAsyncService
@Async
public CompletableFuture<Integer> performTask(int i){
try{
Thread.sleep(1000);
final Thread currentThread=Thread.currentThread();
final String oldName = currentThread.getName();
**currentThread.setName(oldName+"-"+i);**
int test=(int) (i+10)/RandomNumber(0,10); // generate random number between 0 to 10 and divide I+10 by that to fail some tasks randomly.
return CompletableFuture.completeFuture(i);
}catch(Exception e){
CompletableFuture<Integer> f = new CompletableFuture<>();
f.completeExceptionally(e);
return f;
}
}
//MyAsyncConfig
public Executor getAsyncExecutor() {
final ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("async-thread-");
threadPoolTaskExecutor.setCorePoolSize(100);
threadPoolTaskExecutor.setMaxPoolSize(300);
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
如果任何任务失败,对 CompleteableFuture 上的 join() 的调用将抛出异常,因此这很可能是问题所在。
不要调用 join,而是使用 whenComplete 方法尝试类似这样的操作来获取成功/失败的计数:
public void processTask(){
List<CompletableFuture<Integer>> futures = new ArrayList<>();
List<Integer> success = new ArrayList<>();
List<Integer> failures = new ArrayList<>();
for (int i = 0; i < 300; i++) {
int rec = i;
futures.add(CompletableFuture.supplyAsync(() -> performTask(rec)).whenComplete((a, ex) -> {
synchronized (success) {
if (ex == null)
success.add(rec);
else
failures.add(rec);
}
}));
}
for (CompletableFuture<Integer> f : futures) {
try {
f.join();
} catch (CompletionException ex) {
// ignore
}
}
System.out.println("Successes = " + success);
System.out.println("Failures = " + failures);
}
public Integer performTask(int i) {
try {
Thread.sleep(10);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
int test = (int) (i + 10) / RandomUtils.nextInt(0, 10);
return i;
}
编辑:报告失败次数时出现问题,我已修复。
编辑 #2:我错误地开始 whenComplete 等待完成后再继续。那是不正确的,代码已被调整。
编辑 #3:post 中所写的方法 performTask 实际上并不异步执行任何工作。我有 re-written 它是异步的。
嗯,我想我们还需要join来等待计划任务的整套CompletableFutures完成:)
另外,为了确保定时任务完成,我们需要使用 exceptionally() 或 handle() 来捕获异常。
public void processTask() {
List<CompletableFuture<Integer>> futures=new ArrayList<>();
List<Integer> successes = new ArrayList<>();
List<Integer> failures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
int record = i;
CompletableFuture<Integer> integerCompletableFuture = performTask(i).handle(((integer, throwable) -> {
if (throwable != null) {
failures.add(record);
} else {
successes.add(record);
}
return integer;
}));
futures.add(integerCompletableFuture);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
log.info("successes: {}", successes);
log.info("failures: {}", failures);
}
这将打印如下内容:
[main] INFO rxjava.CompletableFutureConcurrency - successes: [0, 1, 3, 4, 5, 6, 7, 9]
[main] INFO rxjava.CompletableFutureConcurrency - failures: [2, 8]
如果您不需要成功记录,也可以这样。
public void processTask(){
List<CompletableFuture<Integer>> futures=new ArrayList<>();
List<Integer> failures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
int record = i;
CompletableFuture<Integer> integerCompletableFuture = performTask(i)
.exceptionally(throwable -> {
failures.add(record);
return record;
});
futures.add(integerCompletableFuture);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
log.info("failures: {}", failures);
这将打印如下内容:
[main] INFO rxjava.CompletableFutureConcurrency - failures: [2, 8]
PS:这里我用的是int record = i
。你可以在不知道我为什么这样做的情况下玩 ;)
更新于 26.03.2022
performTask 的异步执行由 @Async
注释处理。正如 @user3134221 所问,如果我们想使用我们自己的执行器,我们可以使用 supplyAsync
of CompletableFuture
.
我在这里做了一个demo(抱歉,仓促完成,不够完美:/),你可以看看Example of how to run performTask in different threads
为了回答为什么重复使用同一个线程的评论,我认为以下代码是负责的,因为它同步而不是异步地完成未来。
for(int I=0;i<300;i++){
futures.add(service.performTask(i));
}
performTask方法returns一个完成的未来,调用者等待响应。因此,一切都是顺序执行的,而不是在多个线程中并行执行的。
相反,在将 performTask o/p 添加到期货集合时尝试使用 supplyAsync。