来自 Callable 的 CompletableFuture?
CompletableFuture from Callable?
今天我用 Java 8 中的 "new" CompletableFuture
进行了实验,当我没有找到 runAsync(Callable)
方法时发现自己很困惑。
我可以自己做,如下所示,但为什么这个(对我来说非常明显和有用的实用方法)丢失了? 我是不是漏了什么?
public static <T> CompletableFuture<T> asFuture(Callable<? extends T> callable, Executor executor) {
CompletableFuture<T> future = new CompletableFuture<>();
executor.execute(() -> {
try {
future.complete(callable.call());
} catch (Throwable t) {
future.completeExceptionally(t);
}
});
return future;
}
你应该使用 supplyAsync(Supplier<U>)
一般来说,lambda 表达式和检查异常不能很好地协同工作,CompletableFuture
通过设计避免检查异常。虽然在你的情况下应该没问题。
相关话题:
http://cs.oswego.edu/pipermail/concurrency-interest/2012-December/010486.html
http://cs.oswego.edu/pipermail/concurrency-interest/2014-August/012911.html
对于那些需要问题中提供的功能的人:为了理想地反映现有 CompletableFuture.runAsync
和 CompletableFuture.supplyAsync
函数的行为,我对其进行了一些更改。
// Java 8 (Java 9+ version below)
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
/**
* {@link CompletableFuture} utils.
*
* @author stonar96
*
* @see CompletableFuture
*/
public final class CompletableFutureUtils {
/**
* Delegates the given Callable to
* {@link CompletableFuture#supplyAsync(Supplier)}, handles checked exceptions
* accordingly to unchecked exceptions and associates a new CompletableFuture.
*
* @param callable a function returning the value to be used to complete the
* returned CompletableFuture
* @param <U> the function's return type
* @return the new associated CompletableFuture
* @see CompletableFuture#supplyAsync(Supplier)
*/
public static <U> CompletableFuture<U> callAsync(Callable<U> callable) {
return completeAsync(new CompletableFuture<>(), callable);
}
/**
* Delegates the given Callable to
* {@link CompletableFuture#supplyAsync(Supplier)}, handles checked exceptions
* accordingly to unchecked exceptions and associates the given
* CompletableFuture.
*
* @param result the CompletableFuture to be associated
* @param callable a function returning the value to be used to complete the
* returned CompletableFuture
* @param <T> the function's return type
* @return the given associated CompletableFuture
* @see CompletableFuture#supplyAsync(Supplier)
*/
public static <T> CompletableFuture<T> completeAsync(CompletableFuture<T> result, Callable<? extends T> callable) {
if (result == null) {
throw new NullPointerException();
}
CompletableFuture<T> delegate = CompletableFuture.supplyAsync(callable == null ? null : () -> {
try {
return callable.call();
} catch (Throwable t) {
if (t instanceof Error) {
throw (Error) t;
}
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
result.completeExceptionally(t);
}
return null;
});
if (delegate == null) {
return null;
}
result.whenComplete((v, t) -> {
if (t == null) {
delegate.complete(v);
return;
}
delegate.completeExceptionally(t);
});
delegate.whenComplete((v, t) -> {
if (t == null) {
result.complete(v);
return;
}
result.completeExceptionally(t);
});
return result;
}
/**
* Delegates the given Callable and Executor to
* {@link CompletableFuture#supplyAsync(Supplier, Executor)}, handles checked
* exceptions accordingly to unchecked exceptions and associates a new
* CompletableFuture.
*
* @param callable a function returning the value to be used to complete the
* returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @param <U> the function's return type
* @return the new associated CompletableFuture
* @see CompletableFuture#supplyAsync(Supplier, Executor)
*/
public static <U> CompletableFuture<U> callAsync(Callable<U> callable, Executor executor) {
return completeAsync(new CompletableFuture<>(), callable, executor);
}
/**
* Delegates the given Callable and Executor to
* {@link CompletableFuture#supplyAsync(Supplier, Executor)}, handles checked
* exceptions accordingly to unchecked exceptions and associates the given
* CompletableFuture.
*
* @param result the CompletableFuture to be associated
* @param callable a function returning the value to be used to complete the
* returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @param <T> the function's return type
* @return the given associated CompletableFuture
* @see CompletableFuture#supplyAsync(Supplier, Executor)
*/
public static <T> CompletableFuture<T> completeAsync(CompletableFuture<T> result, Callable<? extends T> callable, Executor executor) {
if (result == null) {
throw new NullPointerException();
}
CompletableFuture<T> delegate = CompletableFuture.supplyAsync(callable == null ? null : () -> {
try {
return callable.call();
} catch (Throwable t) {
if (t instanceof Error) {
throw (Error) t;
}
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
result.completeExceptionally(t);
}
return null;
}, executor);
if (delegate == null) {
return null;
}
result.whenComplete((v, t) -> {
if (t == null) {
delegate.complete(v);
return;
}
delegate.completeExceptionally(t);
});
delegate.whenComplete((v, t) -> {
if (t == null) {
result.complete(v);
return;
}
result.completeExceptionally(t);
});
return result;
}
private CompletableFutureUtils() {
throw new AssertionError("CompletableFutureUtils cannot be instantiated");
}
}
// Java 9+
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
/**
* {@link CompletableFuture} utils.
*
* @author stonar96
*
* @see CompletableFuture
*/
public final class CompletableFutureUtils {
/**
* Delegates the given Callable to
* {@link CompletableFuture#completeAsync(Supplier)} using a new
* CompletableFuture and handles checked exceptions accordingly to unchecked
* exceptions.
*
* @param callable a function returning the value to be used to complete the
* returned CompletableFuture
* @param <U> the function's return type
* @return the new CompletableFuture
* @see CompletableFuture#completeAsync(Supplier)
*/
public static <U> CompletableFuture<U> callAsync(Callable<U> callable) {
return completeAsync(new CompletableFuture<>(), callable);
}
/**
* Delegates the given Callable to
* {@link CompletableFuture#completeAsync(Supplier)} using the given
* CompletableFuture and handles checked exceptions accordingly to unchecked
* exceptions.
*
* @param result the CompletableFuture to be used
* @param callable a function returning the value to be used to complete the
* returned CompletableFuture
* @param <T> the function's return type
* @return the given CompletableFuture
* @see CompletableFuture#completeAsync(Supplier)
*/
public static <T> CompletableFuture<T> completeAsync(CompletableFuture<T> result, Callable<? extends T> callable) {
return result.completeAsync(callable == null ? null : () -> {
try {
return callable.call();
} catch (Throwable t) {
if (t instanceof Error) {
throw (Error) t;
}
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
result.completeExceptionally(t);
}
return null;
});
}
/**
* Delegates the given Callable and Executor to
* {@link CompletableFuture#completeAsync(Supplier, Executor)} using a new
* CompletableFuture and handles checked exceptions accordingly to unchecked
* exceptions.
*
* @param callable a function returning the value to be used to complete the
* returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @param <U> the function's return type
* @return the new CompletableFuture
* @see CompletableFuture#completeAsync(Supplier, Executor)
*/
public static <U> CompletableFuture<U> callAsync(Callable<U> callable, Executor executor) {
return completeAsync(new CompletableFuture<>(), callable, executor);
}
/**
* Delegates the given Callable and Executor to
* {@link CompletableFuture#completeAsync(Supplier, Executor)} using the given
* CompletableFuture and handles checked exceptions accordingly to unchecked
* exceptions.
*
* @param result the CompletableFuture to be used
* @param callable a function returning the value to be used to complete the
* returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @param <T> the function's return type
* @return the given CompletableFuture
* @see CompletableFuture#completeAsync(Supplier, Executor)
*/
public static <T> CompletableFuture<T> completeAsync(CompletableFuture<T> result, Callable<? extends T> callable, Executor executor) {
return result.completeAsync(callable == null ? null : () -> {
try {
return callable.call();
} catch (Throwable t) {
if (t instanceof Error) {
throw (Error) t;
}
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
result.completeExceptionally(t);
}
return null;
}, executor);
}
private CompletableFutureUtils() {
throw new AssertionError("CompletableFutureUtils cannot be instantiated");
}
}
如您所见,一切都按原样委托,除了必须处理的已检查异常。
今天我用 Java 8 中的 "new" CompletableFuture
进行了实验,当我没有找到 runAsync(Callable)
方法时发现自己很困惑。
我可以自己做,如下所示,但为什么这个(对我来说非常明显和有用的实用方法)丢失了? 我是不是漏了什么?
public static <T> CompletableFuture<T> asFuture(Callable<? extends T> callable, Executor executor) {
CompletableFuture<T> future = new CompletableFuture<>();
executor.execute(() -> {
try {
future.complete(callable.call());
} catch (Throwable t) {
future.completeExceptionally(t);
}
});
return future;
}
你应该使用 supplyAsync(Supplier<U>)
一般来说,lambda 表达式和检查异常不能很好地协同工作,CompletableFuture
通过设计避免检查异常。虽然在你的情况下应该没问题。
相关话题:
http://cs.oswego.edu/pipermail/concurrency-interest/2012-December/010486.html
http://cs.oswego.edu/pipermail/concurrency-interest/2014-August/012911.html
对于那些需要问题中提供的功能的人:为了理想地反映现有 CompletableFuture.runAsync
和 CompletableFuture.supplyAsync
函数的行为,我对其进行了一些更改。
// Java 8 (Java 9+ version below)
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
/**
* {@link CompletableFuture} utils.
*
* @author stonar96
*
* @see CompletableFuture
*/
public final class CompletableFutureUtils {
/**
* Delegates the given Callable to
* {@link CompletableFuture#supplyAsync(Supplier)}, handles checked exceptions
* accordingly to unchecked exceptions and associates a new CompletableFuture.
*
* @param callable a function returning the value to be used to complete the
* returned CompletableFuture
* @param <U> the function's return type
* @return the new associated CompletableFuture
* @see CompletableFuture#supplyAsync(Supplier)
*/
public static <U> CompletableFuture<U> callAsync(Callable<U> callable) {
return completeAsync(new CompletableFuture<>(), callable);
}
/**
* Delegates the given Callable to
* {@link CompletableFuture#supplyAsync(Supplier)}, handles checked exceptions
* accordingly to unchecked exceptions and associates the given
* CompletableFuture.
*
* @param result the CompletableFuture to be associated
* @param callable a function returning the value to be used to complete the
* returned CompletableFuture
* @param <T> the function's return type
* @return the given associated CompletableFuture
* @see CompletableFuture#supplyAsync(Supplier)
*/
public static <T> CompletableFuture<T> completeAsync(CompletableFuture<T> result, Callable<? extends T> callable) {
if (result == null) {
throw new NullPointerException();
}
CompletableFuture<T> delegate = CompletableFuture.supplyAsync(callable == null ? null : () -> {
try {
return callable.call();
} catch (Throwable t) {
if (t instanceof Error) {
throw (Error) t;
}
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
result.completeExceptionally(t);
}
return null;
});
if (delegate == null) {
return null;
}
result.whenComplete((v, t) -> {
if (t == null) {
delegate.complete(v);
return;
}
delegate.completeExceptionally(t);
});
delegate.whenComplete((v, t) -> {
if (t == null) {
result.complete(v);
return;
}
result.completeExceptionally(t);
});
return result;
}
/**
* Delegates the given Callable and Executor to
* {@link CompletableFuture#supplyAsync(Supplier, Executor)}, handles checked
* exceptions accordingly to unchecked exceptions and associates a new
* CompletableFuture.
*
* @param callable a function returning the value to be used to complete the
* returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @param <U> the function's return type
* @return the new associated CompletableFuture
* @see CompletableFuture#supplyAsync(Supplier, Executor)
*/
public static <U> CompletableFuture<U> callAsync(Callable<U> callable, Executor executor) {
return completeAsync(new CompletableFuture<>(), callable, executor);
}
/**
* Delegates the given Callable and Executor to
* {@link CompletableFuture#supplyAsync(Supplier, Executor)}, handles checked
* exceptions accordingly to unchecked exceptions and associates the given
* CompletableFuture.
*
* @param result the CompletableFuture to be associated
* @param callable a function returning the value to be used to complete the
* returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @param <T> the function's return type
* @return the given associated CompletableFuture
* @see CompletableFuture#supplyAsync(Supplier, Executor)
*/
public static <T> CompletableFuture<T> completeAsync(CompletableFuture<T> result, Callable<? extends T> callable, Executor executor) {
if (result == null) {
throw new NullPointerException();
}
CompletableFuture<T> delegate = CompletableFuture.supplyAsync(callable == null ? null : () -> {
try {
return callable.call();
} catch (Throwable t) {
if (t instanceof Error) {
throw (Error) t;
}
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
result.completeExceptionally(t);
}
return null;
}, executor);
if (delegate == null) {
return null;
}
result.whenComplete((v, t) -> {
if (t == null) {
delegate.complete(v);
return;
}
delegate.completeExceptionally(t);
});
delegate.whenComplete((v, t) -> {
if (t == null) {
result.complete(v);
return;
}
result.completeExceptionally(t);
});
return result;
}
private CompletableFutureUtils() {
throw new AssertionError("CompletableFutureUtils cannot be instantiated");
}
}
// Java 9+
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
/**
* {@link CompletableFuture} utils.
*
* @author stonar96
*
* @see CompletableFuture
*/
public final class CompletableFutureUtils {
/**
* Delegates the given Callable to
* {@link CompletableFuture#completeAsync(Supplier)} using a new
* CompletableFuture and handles checked exceptions accordingly to unchecked
* exceptions.
*
* @param callable a function returning the value to be used to complete the
* returned CompletableFuture
* @param <U> the function's return type
* @return the new CompletableFuture
* @see CompletableFuture#completeAsync(Supplier)
*/
public static <U> CompletableFuture<U> callAsync(Callable<U> callable) {
return completeAsync(new CompletableFuture<>(), callable);
}
/**
* Delegates the given Callable to
* {@link CompletableFuture#completeAsync(Supplier)} using the given
* CompletableFuture and handles checked exceptions accordingly to unchecked
* exceptions.
*
* @param result the CompletableFuture to be used
* @param callable a function returning the value to be used to complete the
* returned CompletableFuture
* @param <T> the function's return type
* @return the given CompletableFuture
* @see CompletableFuture#completeAsync(Supplier)
*/
public static <T> CompletableFuture<T> completeAsync(CompletableFuture<T> result, Callable<? extends T> callable) {
return result.completeAsync(callable == null ? null : () -> {
try {
return callable.call();
} catch (Throwable t) {
if (t instanceof Error) {
throw (Error) t;
}
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
result.completeExceptionally(t);
}
return null;
});
}
/**
* Delegates the given Callable and Executor to
* {@link CompletableFuture#completeAsync(Supplier, Executor)} using a new
* CompletableFuture and handles checked exceptions accordingly to unchecked
* exceptions.
*
* @param callable a function returning the value to be used to complete the
* returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @param <U> the function's return type
* @return the new CompletableFuture
* @see CompletableFuture#completeAsync(Supplier, Executor)
*/
public static <U> CompletableFuture<U> callAsync(Callable<U> callable, Executor executor) {
return completeAsync(new CompletableFuture<>(), callable, executor);
}
/**
* Delegates the given Callable and Executor to
* {@link CompletableFuture#completeAsync(Supplier, Executor)} using the given
* CompletableFuture and handles checked exceptions accordingly to unchecked
* exceptions.
*
* @param result the CompletableFuture to be used
* @param callable a function returning the value to be used to complete the
* returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @param <T> the function's return type
* @return the given CompletableFuture
* @see CompletableFuture#completeAsync(Supplier, Executor)
*/
public static <T> CompletableFuture<T> completeAsync(CompletableFuture<T> result, Callable<? extends T> callable, Executor executor) {
return result.completeAsync(callable == null ? null : () -> {
try {
return callable.call();
} catch (Throwable t) {
if (t instanceof Error) {
throw (Error) t;
}
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
result.completeExceptionally(t);
}
return null;
}, executor);
}
private CompletableFutureUtils() {
throw new AssertionError("CompletableFutureUtils cannot be instantiated");
}
}
如您所见,一切都按原样委托,除了必须处理的已检查异常。