超时无法完成的未来
Completable Future with Timeout not working
我是新来的 Completable Future。我正在尝试为元素列表(它们是参数)调用并行方法,然后组合结果以创建最终响应。我还尝试设置 50 毫秒的超时,这样如果调用在 50 毫秒内没有 return,我将 return 默认值。
到目前为止我已经试过了:
{
List<ItemGroup> result = Collections.synchronizedList(Lists.newArrayList());
try {
List<CompletableFuture> completableFutures = response.getItemGroupList().stream()
.map(inPutItemGroup ->
CompletableFuture.runAsync(() -> {
final ItemGroup itemGroup = getUpdatedItemGroup(inPutItemGroup); //call which I am tryin to make parallel
// this is thread safe
if (null != itemGroup) {
result.add(itemGroup); //output of the call
}
}, executorService).acceptEither(timeoutAfter(50, TimeUnit.MILLISECONDS),inPutItemGroup)) //this line throws error
.collect(Collectors.toList());
// this will wait till all threads are completed
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()]))
.join();
} catch (final Throwable t) {
final String errorMsg = String.format("Exception occurred while rexecuting parallel call");
log.error(errorMsg, e);
result = response.getItemGroupList(); //default value - return the input value if error
}
Response finalResponse = Response.builder()
.itemGroupList(result)
.build();
}
private <T> CompletableFuture<T> timeoutAfter(final long timeout, final TimeUnit unit) {
CompletableFuture<T> result = new CompletableFuture<T>();
//Threadpool with 1 thread for scheduling a future that completes after a timeout
ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1);
String message = String.format("Process timed out after %s %s", timeout, unit.name().toLowerCase());
delayer.schedule(() -> result.completeExceptionally(new TimeoutException(message)), timeout, unit);
return result;
}
但我一直收到错误提示:
error: incompatible types: ItemGroup cannot be converted to Consumer<? super Void>
[javac] itemGroup))
incompatible types: inference variable T has incompatible bounds
[javac] .collect(Collectors.toList());
[javac] ^
[javac] equality constraints: CompletableFuture
[javac] lower bounds: Object
[javac] where T is a type-variable:
有人可以告诉我我做错了什么吗?如果我走错了方向,请纠正我。
谢谢。
acceptEither
的签名如下所示:
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other,
Consumer<? super T> action
) {
抛出错误的行如下所示:
.acceptEither(
timeoutAfter(50, TimeUnit.MILLISECONDS),
inPutItemGroup
)
所以您看到您尝试将 ItemGroup
作为 Consumer<? super T>
传递,其中 T
被推断为 Void
,因此您得到了预期的错误:
error: incompatible types: ItemGroup cannot be converted to Consumer<? super Void>
而不是
acceptEither(timeoutAfter(50, TimeUnit.MILLISECONDS), inPutItemGroup))
你需要
applyToEither(timeoutAfter(50, TimeUnit.MILLISECONDS), x -> inPutItemGroup)
编译代码。 “accept”是一个使用一个值而不返回一个新值的动作,“apply”是一个产生一个新值的动作。
但是,还是有逻辑错误。 timeoutAfter
返回的未来将 异常完成 ,因此依赖阶段也将异常完成,无需评估函数,因此这种链接方法不适合将异常替换为默认值。
更糟糕的是,修复此问题会创建一个新的未来,该未来将由任一源未来完成,但这不会影响在其中一个源未来中执行的 result.add(itemGroup)
操作。在您的代码中,生成的未来仅用于等待完成,而不用于评估结果。因此,当您的超时结束时,您将停止等待完成,而仍然可能有后台线程修改列表。
正确的逻辑是将获取值的步骤分开,超时时可以用默认值取代该值,并将结果(获取的值或默认值)添加到结果列表的步骤。然后,您可以等待所有 add
操作完成。超时时,可能仍在进行 getUpdatedItemGroup
评估(无法停止它们的执行),但它们的结果将被忽略,因此不会影响结果列表。
同样值得指出的是,为每个列表元素创建一个新的 ScheduledExecutorService
(使用后不关闭,更糟糕的是)不是正确的方法。
// result must be effectively final
List<ItemGroup> result = Collections.synchronizedList(new ArrayList<>());
List<ItemGroup> endResult = result;
ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1);
try {
CompletableFuture<?>[] completableFutures = response.getItemGroupList().stream()
.map(inPutItemGroup ->
timeoutAfter(delayer, 50, TimeUnit.MILLISECONDS,
CompletableFuture.supplyAsync(
() -> getUpdatedItemGroup(inPutItemGroup), executorService),
inPutItemGroup)
.thenAccept(itemGroup -> {
// this is thread safe, but questionable,
// e.g. the result list order is not maintained
if(null != itemGroup) result.add(itemGroup);
})
)
.toArray(CompletableFuture<?>[]::new);
// this will wait till all threads are completed
CompletableFuture.allOf(completableFutures).join();
} catch(final Throwable t) {
String errorMsg = String.format("Exception occurred while executing parallel call");
log.error(errorMsg, e);
endResult = response.getItemGroupList();
}
finally {
delayer.shutdown();
}
Response finalResponse = Response.builder()
.itemGroupList(endResult)
.build();
private <T> CompletableFuture<T> timeoutAfter(ScheduledExecutorService es,
long timeout, TimeUnit unit, CompletableFuture<T> f, T value) {
es.schedule(() -> f.complete(value), timeout, unit);
return f;
}
此处,supplyAsync
生成一个 CompletableFuture
,它将提供 getUpdatedItemGroup
评估的结果。 timeoutAfter
调用将安排在超时后使用默认值完成,而不创建新的未来,然后,通过 thenAccept
链接的依赖操作会将结果值添加到 result
列表.
注意synchronizedList
允许从多个线程添加元素,但是从多个线程添加将导致不可预测的顺序,与源列表的顺序无关。
我是新来的 Completable Future。我正在尝试为元素列表(它们是参数)调用并行方法,然后组合结果以创建最终响应。我还尝试设置 50 毫秒的超时,这样如果调用在 50 毫秒内没有 return,我将 return 默认值。
到目前为止我已经试过了:
{
List<ItemGroup> result = Collections.synchronizedList(Lists.newArrayList());
try {
List<CompletableFuture> completableFutures = response.getItemGroupList().stream()
.map(inPutItemGroup ->
CompletableFuture.runAsync(() -> {
final ItemGroup itemGroup = getUpdatedItemGroup(inPutItemGroup); //call which I am tryin to make parallel
// this is thread safe
if (null != itemGroup) {
result.add(itemGroup); //output of the call
}
}, executorService).acceptEither(timeoutAfter(50, TimeUnit.MILLISECONDS),inPutItemGroup)) //this line throws error
.collect(Collectors.toList());
// this will wait till all threads are completed
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()]))
.join();
} catch (final Throwable t) {
final String errorMsg = String.format("Exception occurred while rexecuting parallel call");
log.error(errorMsg, e);
result = response.getItemGroupList(); //default value - return the input value if error
}
Response finalResponse = Response.builder()
.itemGroupList(result)
.build();
}
private <T> CompletableFuture<T> timeoutAfter(final long timeout, final TimeUnit unit) {
CompletableFuture<T> result = new CompletableFuture<T>();
//Threadpool with 1 thread for scheduling a future that completes after a timeout
ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1);
String message = String.format("Process timed out after %s %s", timeout, unit.name().toLowerCase());
delayer.schedule(() -> result.completeExceptionally(new TimeoutException(message)), timeout, unit);
return result;
}
但我一直收到错误提示:
error: incompatible types: ItemGroup cannot be converted to Consumer<? super Void>
[javac] itemGroup))
incompatible types: inference variable T has incompatible bounds
[javac] .collect(Collectors.toList());
[javac] ^
[javac] equality constraints: CompletableFuture
[javac] lower bounds: Object
[javac] where T is a type-variable:
有人可以告诉我我做错了什么吗?如果我走错了方向,请纠正我。
谢谢。
acceptEither
的签名如下所示:
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other,
Consumer<? super T> action
) {
抛出错误的行如下所示:
.acceptEither(
timeoutAfter(50, TimeUnit.MILLISECONDS),
inPutItemGroup
)
所以您看到您尝试将 ItemGroup
作为 Consumer<? super T>
传递,其中 T
被推断为 Void
,因此您得到了预期的错误:
error: incompatible types: ItemGroup cannot be converted to Consumer<? super Void>
而不是
acceptEither(timeoutAfter(50, TimeUnit.MILLISECONDS), inPutItemGroup))
你需要
applyToEither(timeoutAfter(50, TimeUnit.MILLISECONDS), x -> inPutItemGroup)
编译代码。 “accept”是一个使用一个值而不返回一个新值的动作,“apply”是一个产生一个新值的动作。
但是,还是有逻辑错误。 timeoutAfter
返回的未来将 异常完成 ,因此依赖阶段也将异常完成,无需评估函数,因此这种链接方法不适合将异常替换为默认值。
更糟糕的是,修复此问题会创建一个新的未来,该未来将由任一源未来完成,但这不会影响在其中一个源未来中执行的 result.add(itemGroup)
操作。在您的代码中,生成的未来仅用于等待完成,而不用于评估结果。因此,当您的超时结束时,您将停止等待完成,而仍然可能有后台线程修改列表。
正确的逻辑是将获取值的步骤分开,超时时可以用默认值取代该值,并将结果(获取的值或默认值)添加到结果列表的步骤。然后,您可以等待所有 add
操作完成。超时时,可能仍在进行 getUpdatedItemGroup
评估(无法停止它们的执行),但它们的结果将被忽略,因此不会影响结果列表。
同样值得指出的是,为每个列表元素创建一个新的 ScheduledExecutorService
(使用后不关闭,更糟糕的是)不是正确的方法。
// result must be effectively final
List<ItemGroup> result = Collections.synchronizedList(new ArrayList<>());
List<ItemGroup> endResult = result;
ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1);
try {
CompletableFuture<?>[] completableFutures = response.getItemGroupList().stream()
.map(inPutItemGroup ->
timeoutAfter(delayer, 50, TimeUnit.MILLISECONDS,
CompletableFuture.supplyAsync(
() -> getUpdatedItemGroup(inPutItemGroup), executorService),
inPutItemGroup)
.thenAccept(itemGroup -> {
// this is thread safe, but questionable,
// e.g. the result list order is not maintained
if(null != itemGroup) result.add(itemGroup);
})
)
.toArray(CompletableFuture<?>[]::new);
// this will wait till all threads are completed
CompletableFuture.allOf(completableFutures).join();
} catch(final Throwable t) {
String errorMsg = String.format("Exception occurred while executing parallel call");
log.error(errorMsg, e);
endResult = response.getItemGroupList();
}
finally {
delayer.shutdown();
}
Response finalResponse = Response.builder()
.itemGroupList(endResult)
.build();
private <T> CompletableFuture<T> timeoutAfter(ScheduledExecutorService es,
long timeout, TimeUnit unit, CompletableFuture<T> f, T value) {
es.schedule(() -> f.complete(value), timeout, unit);
return f;
}
此处,supplyAsync
生成一个 CompletableFuture
,它将提供 getUpdatedItemGroup
评估的结果。 timeoutAfter
调用将安排在超时后使用默认值完成,而不创建新的未来,然后,通过 thenAccept
链接的依赖操作会将结果值添加到 result
列表.
注意synchronizedList
允许从多个线程添加元素,但是从多个线程添加将导致不可预测的顺序,与源列表的顺序无关。