Java 8 多线程:如何实现并行性以及单个线程的超时?

Java 8 mulithreading: How can I achieve parallelism along with a timeout for individual threads?

我要实现的总结:

我想并行执行 N 个任务,这样每个任务都不会 运行 超过两秒(我们可以将此类任务标记为失败)。作为输出,我想 return 成功任务的输出和失败任务的状态为失败。此外,一个任务的超时不应导致断路,即其他任务的执行不应停止。


注意:我被限制使用Java 8.

我参考了 this article 进行并行处理。我正在做与本文示例中给出的类似的并行处理:

public void parallelProcessing() {
    try {
        ExecutorService executorService = Executors.newWorkStealingPool(10);

        List<CompletableFuture<Integer>> futuresList = new ArrayList<CompletableFuture<Integer>>();
        futuresList.add(CompletableFuture.supplyAsync(()->(addFun1(10, 5)), executorService));
        futuresList.add(CompletableFuture.supplyAsync(()->(subFun1(10, 5)), executorService));
        futuresList.add(CompletableFuture.supplyAsync(()->(mulFun1(10, 5)), executorService));

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[futuresList.size()]));
        CompletableFuture<List<Integer>> allCompletableFuture = allFutures.thenApply(future -> futuresList.stream().map(completableFuture -> completableFuture.join())
                .collect(Collectors.toList()));
        CompletableFuture<List<Integer>> completableFuture = allCompletableFuture.toCompletableFuture();
        List<Integer> finalList = (List<Integer>) completableFuture.get();
    } catch (Exception ex) {

    }
}


public static Integer addFun1(int a, int b) {
    System.out.println(Thread.currentThread().getName());

    for (int i = 0; i < 10; i++) {

        System.out.print(Thread.currentThread().getName() + i);
    }

    return a + b;
}

public static Integer subFun1(int a, int b) {

    System.out.println(Thread.currentThread().getName());

    for (int i = 0; i < 10; i++) {

        System.out.print(Thread.currentThread().getName() + i);
    }

    return a - b;
}


public static Integer mulFun1(int a, int b) {

    System.out.println(Thread.currentThread().getName());

    for (int i = 0; i < 10; i++) {

        System.out.print(Thread.currentThread().getName() + i);
    }

    return a * b;
}

这很好用。但是我想为单个线程设置超时。我知道我可以在最后一行使用重载的 get 函数。但这会设置组合期货的超时时间,对吧?例如,如果我希望单个线程不会被阻塞超过 2 秒,如果我在最后一行设置 2 秒超时,那么它会合并超时,对吗?

get(long timeout, TimeUnit unit)

这是我想要实现的最终结果:

假设有五个线程,四个按时完成,一个超时(由于运行宁超过两秒)。在这种情况下,我想发送四个线程的输出并在结果中发送第五个线程的错误。

我的input/output格式是这样的:

示例输入:List<Input> 每个项目 运行 在一个单独的线程中,其中每个输入都有一个 uniqueIdentifier

示例输出:List<Output> 这样:

Output :{
    uniqueIdentifier: // Same as input to map for which input this output was generated
    result: success/fail // This Field I want to add. Currently it's not there
    data: {
        // From output, e.g., addFun1 and subFun1
    }
}

您想要实现的目标的语义非常重要。一方面,您说您想要 orTimeout 的 Java 8 的替代方案;另一方面,你有点暗示你想放弃执行某个 CompletableFuture 如果它超过某个阈值。

这些是非常不同的东西,因为 orTimeout 在文档中说:

Exceptionally completes this CompletableFuture with a TimeoutException if not otherwise completed before the given timeout.

所以像这样:

CompletableFuture<Integer> addAsy =
    supplyAsync(() -> addFun1(10,5), executorService)
    .orTimeout(5, TimeUnit.MILLISECONDS);

将导致 CompletableFuture 异常 完成 (假设 addFun1 花费超过 5 毫秒)。同时,这:

CompletableFuture<Void> allFutures = CompletableFuture
        .allOf(futuresList.toArray(new CompletableFuture[0]));

allOf 中的文档所述:

... If any of the given CompletableFutures complete exceptionally, then the returned CompletableFuture also does so...

意味着 allFutures 是一个 CompletableFuture 也完成了 异常 (因为 addAsy 是)。

现在,因为你有这个:

    CompletableFuture<List<Integer>> allCompletableFuture = allFutures.thenApply(future -> {
        return futuresList.stream().map(CompletableFuture::join)
            .collect(Collectors.toList());
    });

再一次,thenApply 的文档说:

Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function.

你的allFutures没有正常完成,因此甚至没有调用。

因此您需要了解您想要实现的目标。对于 orTimeout 的反向移植,您可以从这里开始。


您仍然需要为 orTimeout 提供某种向后移植。我将使用该方法,就好像它已经存在一样。

static void parallelProcessing() throws Exception {

    ExecutorService executorService = Executors.newFixedThreadPool(10);

    List<CompletableFuture<Integer>> futuresList = new ArrayList<>();
    futuresList.add(CompletableFuture.supplyAsync(() -> addFun1(10,5), executorService).orTimeout(2, TimeUnit.SECONDS));
    futuresList.add(CompletableFuture.supplyAsync(() -> subFun1(10,5), executorService));
    futuresList.add(CompletableFuture.supplyAsync(() -> mulFun1(10,5), executorService));


    CompletableFuture<Void> all = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[0]));
    Map<Boolean, List<CompletableFuture<Integer>>> map =
            all.thenApply(x -> both(futuresList)).exceptionally(x -> both(futuresList)).get();

    List<CompletableFuture<Integer>> failed = map.get(Boolean.TRUE);
    List<CompletableFuture<Integer>> ok = map.get(Boolean.FALSE);

    System.out.println("failed = " + failed.size());
    System.out.println("ok = " + ok.size());

}

private static Map<Boolean, List<CompletableFuture<Integer>>> both(
        List<CompletableFuture<Integer>> futuresList) {
    return futuresList.stream().collect(Collectors.partitioningBy(
                    CompletableFuture::isCompletedExceptionally
            ));
}

假设你想要10个线程运行,想要一个返回值,你可以使用Callable<Boolean>接口,提交给ExecutorService,然后使用[=13]得到结果=] 作为布尔值返回。

这是一个用法示例。

final int NUM_THREADS = 10;
List<Boolean> results = new ArrayList<Boolean>();
List<Callable<Boolean>> callables = new ArrayList<Callable<Boolean>>();

for(int i=0; i<NUM_THREADS; ++i)
{
    callables.add(new Callable<Boolean>()
    {
        public Boolean call()
        {
            // Add your task here
            return isTaskCompleted;
        }
    });
}

ExecutorService executorService = ExecutorService.newFixedThreadPool(NUM_THREADS); // Run 10 threads

for(Callable<Boolean> callable:callables)
{
    Future<Boolean> future = executor.submit(callable);
    try
    {
        results.add(future.get(2, TimeUnit.SECONDS)); // Timeout 2 seconds and add the result
    }
    catch(Exception ex)
    {
        results.add(false); // Set result to false if task throw TimeOutExeption
    }
}

如果您想了解有关这些 类 的更多信息,可以阅读这本书:O'Reilly - Learning Java,第 9 章:线程。

下面是一个single-file mre(将整个代码粘贴到RunParallelTasks.java和运行)。它是我在评论中建议的结构原型,旨在通过简单的方式实现所需的功能:

import java.util.Optional;

public class RunParallelTasks {

    public static void main(String[] args) {

        new Thread(()->{
            long duration = 3000;
            Callback<Long> cb = new LongTask(duration);
            Output<Long> output = new TaskExecuter<Long>().work(cb);
            System.out.println( output);
        }).start();

        new Thread(()->{
            long duration = 300;
            Callback<Long> cb = new LongTask(duration);
            Output<Long> output = new TaskExecuter<Long>().work(cb);
            System.out.println( output);
        }).start();

        new Thread(()->{
            long duration = 4000;
            Callback<Long> cb = new LongTask(duration);
            Output<Long> output = new TaskExecuter<Long>().work(cb);
            System.out.println( output);
        }).start();

        new Thread(()->{
            long duration = 1000;
            Callback<Long> cb = new LongTask(duration);
            Output<Long> output = new TaskExecuter<Long>().work(cb);
            System.out.println( output);
        }).start();

    }
}

class TaskExecuter<T>{

    private static final long TIMEOUT = 2000;//millis
    private T value = null;
    public Output<T> work(Callback<T> call){

        Thread t = new Thread(()->{
            value = call.work();
        });
        t.start();

        try {
            t.join(TIMEOUT);
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }

        return new Output<>(t.getId(), value == null ?  Optional.empty() : Optional.of(value)) ;
    }
}

interface Callback<T> {
    T work();
}

class LongTask implements Callback<Long>{

    private final long durationInMillis;

    public LongTask(long durationInMillis) {
        this.durationInMillis = durationInMillis;
    }

    @Override
    public Long work() {
        try {
            Thread.sleep(durationInMillis);
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
        return durationInMillis;
    }
}

class Output<T> {

    private final long id;
    private boolean success = false;
    private T data;

    public Output(long id, Optional<T> op) {
        this.id = id;
        if(!op.isEmpty()) {
            data = op.get();
            success = true;
        }
    }
    
    //todo add getters 
    
    @Override
    public String toString() {
        return "task "+ id+ (success ? " Completed, returned "+data : " Failed" );
    }
}

我认为,如果执行时间过长,为了实现取消任务,您需要两个任务:

  1. 任务本身进行计算
  2. 另一个任务,如果第一个任务花费的时间太长,它会取消第一个任务

这是受我这里answer的启发,至少目前我还没有想出更好的办法。

假设这是 Output:

public class Output {

    private final String uniqueIdentifier;
    private final boolean result;
    private final Object data;

    //all arguments constructor and getters

    @Override
    public String toString() {
        return "Output{" +
                "uniqueIdentifier='" + uniqueIdentifier + '\'' +
                ", result=" + result +
                ", data=" + data +
                '}';
    }
}

为简单起见,我将仅使用您示例中的 add integers 任务,将其包装在 Supplier.

public class AddIntegerTask implements Supplier<Integer> {

    private static final long NANOSECONDS_IN_SECOND = 1_000_000_000;

    private final String uniqueIdentifier;
    private final boolean tooLong;
    private final int a;
    private final int b;

    public AddIntegerTask(boolean tooLong, int a, int b) {
        this.uniqueIdentifier = UUID.randomUUID().toString();
        this.tooLong = tooLong;
        this.a = a;
        this.b = b;
    }

    @Override
    public Integer get() {
        long nanoseconds = this.tooLong ? 3 * NANOSECONDS_IN_SECOND : NANOSECONDS_IN_SECOND;
        long start = System.nanoTime();
        long toEnd = start + nanoseconds;
        //simulate long execution
        while (System.nanoTime() <= toEnd) {
            //check for interruption at crucial points
            boolean interrupted = Thread.currentThread().isInterrupted();
            if (interrupted) {
                //custom exception extending RuntimeException
                throw new TooLongExecutionException();
            }
        }
        return this.a + this.b;
    }

    public String getUniqueIdentifier() {
        return this.uniqueIdentifier;
    }
}

这里最重要的是,您需要在自己的实现中的关键时刻检查当前线程是否中断。

取消任务非常简单:

public class CancelTask implements Runnable {

    private final Future<?> future;

    public CancelTask(Future<?> future) {
        this.future = future;
    }

    @Override
    public void run() {
        this.future.cancel(true);
    }
}

Future 的取消包装在 Runnable 中,因此可以安排在适当的延迟后执行。

Runnable,将结果包装在Output中,并提交执行:

public class MyRunnable implements Runnable {

    private final Map<String, Output> outputMap;
    private final AddIntegerTask calcFunction;
    private final CountDownLatch latch;

    public MyRunnable(Map<String, Output> outputMap, AddIntegerTask calcFunction, CountDownLatch latch) {
        this.outputMap = outputMap;
        this.calcFunction = calcFunction;
        this.latch = latch;
    }

    @Override
    public void run() {
        String uniqueIdentifier = this.calcFunction.getUniqueIdentifier();
        Output output;
        try {
            Integer result = this.calcFunction.get();
            output = new Output(uniqueIdentifier, true, result);
        } catch (TooLongExecutionException exc) {
            output = new Output(uniqueIdentifier, false, null);
        }
        this.outputMap.replace(uniqueIdentifier, output);
        this.latch.countDown();
    }
}

这里需要注意的是:CountDownLatch,看来你事先知道任务的数量,所以强制主线程等待所有任务完成是一个不错的选择。 TooLongExecutionException 是扩展 RuntimeException 的自定义异常。如果作业完成,则设置为成功并有结果,如果作业被中断则设置为不成功且没有结果。

还有一个 main 来组合和测试所有这些:

public class CancelingMain {

    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);

        int taskCount = 6;
        CountDownLatch latch = new CountDownLatch(taskCount);
        long start = System.nanoTime();
        Map<String, Output> outputMap = new LinkedHashMap<>();
        for (int i = 1; i <= taskCount; i++) {
            boolean tooLong = i % 2 == 0;
            AddIntegerTask task = new AddIntegerTask(tooLong, 10, 7);
            outputMap.put(task.getUniqueIdentifier(), null);

            MyRunnable runnable = new MyRunnable(outputMap, task, latch);
            Future<?> future = executorService.submit(runnable);
            //schedule cancel task to run once, 2 seconds after scheduling
            executorService.schedule(new CancelTask(future), 2, TimeUnit.SECONDS);
        }
        latch.await();
        System.out.println("execution took - " + (System.nanoTime() - start) / 1_000_000_000D);
        executorService.shutdown();
        outputMap.values().forEach(System.out::println);
    }
}

我正在使用 LinkedHashMap 以保持任务的提交顺序。

这是我在对该问题的评论中所建议内容的充实版本。这个想法是将对 get(long timeout, TimeUnit unit) 的调用包装到另一个未来。我将所需的逻辑封装到一个 BetterFuture class 中,它在底层委托给一个 CompletableFuture

import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.stream.Stream.concat;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Stream;

public class BetterFuture<T> {
    private final CompletableFuture<T> delegate;

    private BetterFuture(CompletableFuture<T> delegate) {
        this.delegate = delegate;
    }

    public static <T> BetterFuture<T> completed(T value) {
        return new BetterFuture<>(completedFuture(value));
    }

    public static <T> BetterFuture<T> future(Executor executor, Callable<T> callable) {
        CompletableFuture<T> delegate = new CompletableFuture<T>();

        runAsync(() -> {
            try {
                delegate.complete(callable.call());
            } catch (Throwable e) {
                delegate.completeExceptionally(e);
            }
        }, executor);

        return new BetterFuture<>(delegate);
    }

    public static <T> BetterFuture<Optional<T>> future(Executor executor, Callable<T> callable, Duration timeout) {
        return future(executor, () -> future(executor, callable).get(timeout));
    }

    public <R> BetterFuture<R> map(Function<T, R> fn) {
        return new BetterFuture<>(delegate.thenApply(fn));
    }

    public <R> BetterFuture<R> andThen(Function<T, BetterFuture<R>> fn) {
        return new BetterFuture<>(
            delegate.thenCompose(value -> fn.apply(value).delegate));
    }

    public static <T> BetterFuture<Stream<T>> collect(Stream<BetterFuture<T>> futures) {
        return futures
            .map(future -> future.map(Stream::of))
            .reduce(
                BetterFuture.completed(Stream.empty()),
                (future1, future2) ->
                future1
                    .andThen(stream1 ->
                future2
                    .map(stream2 ->
                concat(stream1, stream2)))
            );
    }

    public T get() throws ExecutionException, InterruptedException {
        return delegate.get();
    }

    public Optional<T> get(Duration timeout) throws ExecutionException, InterruptedException {
        try {
            return Optional.of(delegate.get(timeout.toMillis(), TimeUnit.MILLISECONDS));
        } catch (TimeoutException e) {
            return Optional.empty();
        }
    }

}

大多数方法只是委托给底层 CompletableFuture 而没有添加很多额外的功能。

要启动一个超时的异步任务,使用方法

<T> BetterFuture<Optional<T>> future(Executor executor, Callable<T> callable, Duration timeout) 

如果发生超时,它会以 empty 结束,否则可以选择 T

另外方法

public static <T> BetterFuture<Stream<T>> collect(Stream<BetterFuture<T>> futures) 

提供了一种将未来流收集到相同类型流的未来的便捷方式:

Stream<BetterFuture<Optional<String>>> futures = ...
BetterFuture<Stream<Optional<String>>> futureStream = BetterFuture.collect(futures);

这是一个完整的示例,其中第一个 future 超时,第二个 future 成功完成:

@Test
public void timeoutTest() throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newCachedThreadPool();

    BetterFuture<Optional<String>> fa = BetterFuture.future(executor, () -> {
        Thread.sleep(3000);
        return "a";
    }, Duration.ofSeconds(2));

    BetterFuture<Optional<String>> fb = BetterFuture.future(executor, () -> {
        Thread.sleep(1000);
        return "b";
    }, Duration.ofSeconds(2));

    Stream<BetterFuture<Optional<String>>> futures = Stream.of(fa, fb);
    BetterFuture<Stream<Optional<String>>> c = BetterFuture.collect(futures);

    System.out.println(c.get().toList());
}

当运行时,它打印

[Optional.empty, Optional[b]]

最后一点,当超时发生时,实现不会对 运行ning 线程做任何事情。也就是说,它只会超时但不会中断 运行ning 线程。线程将在后台保持 运行ning 直到 in 自然完成。

这完全取决于您并行 运行 的任务/计算是否真的可以取消它们。请记住,Java 运行时不是操作系统,您不能像处理进程那样强行终止线程。

因此,如果您想中断 long-running 计算,您必须以某种方式编写时间,以便它们定期检查是否应停止执行。对于等待其他一些东西(睡眠,在其他线程上同步等),这是一个完全不同的策略:您可以中断这些线程并且代码接收到一个 InterruptedException 可以用来真正停止代码而更少来自代码的合作。

我在这里准备了一个小例子来告诉你区别:

package examples.Whosebug.q71322315;

import java.util.concurrent.*;

public class Q71322315 {
    public static final long COUNTER = 10000000000L;
    public static final boolean SLEEP = false;

    private static final ExecutorService taskExec = Executors.newCachedThreadPool();

    public static void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
        Future<?> task = taskExec.submit(r);
        try {
            task.get(timeout, unit);
            System.out.println("completed");
        } catch (TimeoutException e) {
            // task will be cancelled below
            System.out.println("timeout");
        } catch (ExecutionException e) {
            System.out.println("exex");
            // exception thrown in task; rethrow
            throw new RuntimeException(e.getCause());
        } finally {
            // Harmless if task already completed
            task.cancel(true); // interrupt if running
        }
    }

    public static void main(String[] args) throws InterruptedException {
        timedRun(new Task(SLEEP), 2000, TimeUnit.MILLISECONDS);
        taskExec.shutdown();
        System.out.println("finish");
    }
    private static class Task implements Runnable {

        private final boolean sleep;
        private Task(boolean sleep) {
            this.sleep = sleep;
        }
        @Override
        public void run() {
            try {
                if (sleep) {
                    Thread.sleep(5000L);
                } else {
                    longRunningMethod(COUNTER);
                }
                System.out.println("Success");
            } catch (Exception e) {
                e.printStackTrace();
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
            }
        }

        private void longRunningMethod(long counter) {
            for (long y = 0; y < counter; y++) {
                Math.sqrt(y);
            }
        }
    }
}

该示例基于已经提到的Java Concurrency in Practice-“7.10 使用 Future 取消任务”的一些示例代码。

上面的代码执行一个不关心任何中断的长 运行 计算。 (您可能必须增加 COUNTER 的值,只需在末尾添加零,直到整个方法花费的时间超过 2 秒。)

您会看到您首先收到“超时”消息,指示任务未在所需超时时间内完成。但是代码继续 运行 并且还打印出“完成”和“成功”。

当您将 SLEEP 常量翻转为 true 时,它使用对 Thread.sleep() 的可中断调用,并且输出将不包含“成功”消息。


在您设法构建可取消/可中断的计算之后,您可以设置多个线程,每个线程并行执行 timedRun 执行,因此任务并行启动并在超时后中断。

这还不包括结果的收集,但是您可以收集结果而不是 completedtimeout 的系统输出统计超时任务。

(如果您想在生产中使用该代码,请非常彻底地清理它,它确实有一些巨大的气味,永远不应该出现在任何生产就绪代码中;-)

我们有类似的要求,我们需要捕获每个线程的超时并忽略结果。 Java 8 没有内置此功能。 我们实现它的方法之一,

List<CompletableFuture<?>> futures = new ArrayList<>();
List<?> results = new ArrayList<>(); // It can be anything you collect
futures.add(asyncService.fetchMethod()
.acceptEither(
    timeoutAfter(timeout, TimeUnit.SECONDS),
    results:add)
.handle(
   (result, ex) -> {
     //Handle the timeout exception
        results.add(...);
       return result
    });
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

private <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
  CompletableFuture<T> result = new CompletableFuture<>();
  // We need a separate executor here
  scheduledExecutor.schedule(
    () -> result.completeExceptionally(new TimeoutException()), timeout, unit);
  );
  return result;
}