ListenableFuture回调执行顺序
ListenableFuture callback execution order
Guava 的 ListenableFuture
库提供了一种为未来任务添加回调的机制。这是按如下方式完成的:
ListenableFuture<MyClass> future = myExecutor.submit(myCallable);
Futures.addCallback(future, new FutureCallback<MyClass>() {
@Override
public void onSuccess(@Nullable MyClass myClass) {
doSomething(myClass);
}
@Override
public void onFailure(Throwable t) {
printWarning(t);
}}, myCallbackExecutor);
}
您可以通过调用 get
函数等待 ListenableFuture
完成。例如:
MyClass myClass = future.get();
我的问题是,在 get
终止之前,某个未来的所有回调都保证 运行 吗? IE。如果未来有许多回调执行者注册了很多回调,所有的回调都会在 get
returns 之前完成吗?
编辑
我的用例是,我将构建器传递给许多 classes。每个 class 填充构建器的一个字段。我希望异步填充所有字段,因为每个字段都需要外部查询来生成该字段的数据。我希望呼叫我的 asyncPopulateBuilder
的用户收到一个 Future
,她可以在该 get
上呼叫 get
并确保所有字段都已填充。我想的方法如下:
final Builder b;
ListenableFuture<MyClass> future = myExecutor.submit(myCallable);
Futures.addCallback(future, new FutureCallback<MyClass>() {
@Override
public void onSuccess(@Nullable MyClass myClass) {
b.setMyClass(myClass);
}
@Override
public void onFailure(Throwable t) {
printWarning(t);
}}, myCallbackExecutor);
}
// Do the same thing for all other fields.
在这种情况下,在填充所有字段之前,推荐的阻止方式是什么?
不保证在 get
return 秒之前回调 运行。更多内容见下文。
至于如何解决这个用例,我建议将每个字段数据的查询变成一个单独的 Future
,将它们与 allAsList
+transform
结合起来,然后采取对此采取行动。 (我们可能有一天会提供 a shortcut for the "combine" step。)
ListenableFuture<MyClass> future = myExecutor.submit(myCallable);
final ListenableFuture<Foo> foo =
Futures.transform(
future,
new Function<MyClass, Foo>() { ... },
myCallbackExecutor);
final ListenableFuture<Bar> bar = ...;
final ListenableFuture<Baz> baz = ...;
ListenableFuture<?> allAvailable = Futures.allAsList(foo, bar, baz);
ListenableFuture<?> allSet = Futures.transform(
allAvailable,
new Function<Object, Object>() {
@Override
public Object apply(Object ignored) {
// Use getUnchecked, since we know they already succeeded:
builder.setFoo(Futures.getUnchecked(foo));
builder.setFoo(Futures.getUnchecked(bar));
builder.setFoo(Futures.getUnchecked(baz));
return null;
}
}
};
现在用户可以调用 allSet.get()
等待填充。
(或者您可能希望 allSet
成为 Future<Builder>
以便用户获得对构建器的引用。或者您可能不需要完整的 Future
,只有一个 CountDownLatch
,您可以在其中使用 addCallback
而不是 transform
,并在回调结束时对闩锁进行倒计时。)
这种方法还可以简化错误处理。
回复:"Do callbacks run before get
?"
首先,我很确定我们不会在规范中的任何地方保证这一点,所以感谢您的询问而不是仅仅去争取它:) 如果您最终想要依赖当前实现的某些行为,请 file an issue 以便我们可以添加文档和测试。
其次,如果我从字面上理解你的问题,你所要求的是不可能的:如果 get()
等待所有听众完成,那么任何调用 get()
的听众都会挂!
你的问题的一个稍微宽松的版本是 "Will all the listeners at least start before get()
returns?" 结果证明这也是不可能的:假设我将两个监听器附加到同一个 Future
上 运行 directExecutor()
。两个听众都简单地调用 get()
和 return。其中一位听众必须先 运行。当它调用 get()
时,它将挂起,因为第二个侦听器尚未启动——在第一个侦听器完成之前也不能。 (更一般地说,依赖任何给定的 Executor
来迅速执行任务可能是危险的。)
一个更宽松的版本是 "Will the Future
at least call submit()
for each of the listeners before get()
returns?" 但这最终会在我刚才描述的相同场景中出现问题:在 directExecutor()
运行 上调用 submit(firstListener)
s 任务和调用 get()
,在第二个侦听器启动之前无法完成,在第一个侦听器完成之前无法完成。
如果有的话,听起来更有可能 get()
return 在 任何听众执行之前。但是由于线程调度的不可预测性,我们也不能依赖它。 (再说一次:它没有记录在案,所以请不要依赖它,除非你要求它被记录在案!)
final Builder b;
CountDownLatch latch = new CountDownLatch(1);
ListenableFuture<MyClass> future = myExecutor.submit(myCallable);
Futures.addCallback(future, new FutureCallback<MyClass>() {
@Override
public void onSuccess(@Nullable MyClass myClass) {
b.setMyClass(myClass);
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
printWarning(t);
latch.countDown();
}, myCallbackExecutor);
try {
latch.await();
} catch (InterruptedException e) {
LOG.error("something InterruptedException", e);
} finally {
myCallbackExecutor.shutdown();
}
编辑
代码的灵感来自@Chris Povirk
(Or maybe you want for allSet to be a Future so that the user is handed a reference to the builder. Or maybe you don't need a full-on Future at all, only a CountDownLatch, in which you could use addCallback instead of transform and count down the latch at the end of the callback.)
This approach may also simplify error handling.
Guava 的 ListenableFuture
库提供了一种为未来任务添加回调的机制。这是按如下方式完成的:
ListenableFuture<MyClass> future = myExecutor.submit(myCallable);
Futures.addCallback(future, new FutureCallback<MyClass>() {
@Override
public void onSuccess(@Nullable MyClass myClass) {
doSomething(myClass);
}
@Override
public void onFailure(Throwable t) {
printWarning(t);
}}, myCallbackExecutor);
}
您可以通过调用 get
函数等待 ListenableFuture
完成。例如:
MyClass myClass = future.get();
我的问题是,在 get
终止之前,某个未来的所有回调都保证 运行 吗? IE。如果未来有许多回调执行者注册了很多回调,所有的回调都会在 get
returns 之前完成吗?
编辑
我的用例是,我将构建器传递给许多 classes。每个 class 填充构建器的一个字段。我希望异步填充所有字段,因为每个字段都需要外部查询来生成该字段的数据。我希望呼叫我的 asyncPopulateBuilder
的用户收到一个 Future
,她可以在该 get
上呼叫 get
并确保所有字段都已填充。我想的方法如下:
final Builder b;
ListenableFuture<MyClass> future = myExecutor.submit(myCallable);
Futures.addCallback(future, new FutureCallback<MyClass>() {
@Override
public void onSuccess(@Nullable MyClass myClass) {
b.setMyClass(myClass);
}
@Override
public void onFailure(Throwable t) {
printWarning(t);
}}, myCallbackExecutor);
}
// Do the same thing for all other fields.
在这种情况下,在填充所有字段之前,推荐的阻止方式是什么?
不保证在 get
return 秒之前回调 运行。更多内容见下文。
至于如何解决这个用例,我建议将每个字段数据的查询变成一个单独的 Future
,将它们与 allAsList
+transform
结合起来,然后采取对此采取行动。 (我们可能有一天会提供 a shortcut for the "combine" step。)
ListenableFuture<MyClass> future = myExecutor.submit(myCallable);
final ListenableFuture<Foo> foo =
Futures.transform(
future,
new Function<MyClass, Foo>() { ... },
myCallbackExecutor);
final ListenableFuture<Bar> bar = ...;
final ListenableFuture<Baz> baz = ...;
ListenableFuture<?> allAvailable = Futures.allAsList(foo, bar, baz);
ListenableFuture<?> allSet = Futures.transform(
allAvailable,
new Function<Object, Object>() {
@Override
public Object apply(Object ignored) {
// Use getUnchecked, since we know they already succeeded:
builder.setFoo(Futures.getUnchecked(foo));
builder.setFoo(Futures.getUnchecked(bar));
builder.setFoo(Futures.getUnchecked(baz));
return null;
}
}
};
现在用户可以调用 allSet.get()
等待填充。
(或者您可能希望 allSet
成为 Future<Builder>
以便用户获得对构建器的引用。或者您可能不需要完整的 Future
,只有一个 CountDownLatch
,您可以在其中使用 addCallback
而不是 transform
,并在回调结束时对闩锁进行倒计时。)
这种方法还可以简化错误处理。
回复:"Do callbacks run before get
?"
首先,我很确定我们不会在规范中的任何地方保证这一点,所以感谢您的询问而不是仅仅去争取它:) 如果您最终想要依赖当前实现的某些行为,请 file an issue 以便我们可以添加文档和测试。
其次,如果我从字面上理解你的问题,你所要求的是不可能的:如果 get()
等待所有听众完成,那么任何调用 get()
的听众都会挂!
你的问题的一个稍微宽松的版本是 "Will all the listeners at least start before get()
returns?" 结果证明这也是不可能的:假设我将两个监听器附加到同一个 Future
上 运行 directExecutor()
。两个听众都简单地调用 get()
和 return。其中一位听众必须先 运行。当它调用 get()
时,它将挂起,因为第二个侦听器尚未启动——在第一个侦听器完成之前也不能。 (更一般地说,依赖任何给定的 Executor
来迅速执行任务可能是危险的。)
一个更宽松的版本是 "Will the Future
at least call submit()
for each of the listeners before get()
returns?" 但这最终会在我刚才描述的相同场景中出现问题:在 directExecutor()
运行 上调用 submit(firstListener)
s 任务和调用 get()
,在第二个侦听器启动之前无法完成,在第一个侦听器完成之前无法完成。
如果有的话,听起来更有可能 get()
return 在 任何听众执行之前。但是由于线程调度的不可预测性,我们也不能依赖它。 (再说一次:它没有记录在案,所以请不要依赖它,除非你要求它被记录在案!)
final Builder b;
CountDownLatch latch = new CountDownLatch(1);
ListenableFuture<MyClass> future = myExecutor.submit(myCallable);
Futures.addCallback(future, new FutureCallback<MyClass>() {
@Override
public void onSuccess(@Nullable MyClass myClass) {
b.setMyClass(myClass);
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
printWarning(t);
latch.countDown();
}, myCallbackExecutor);
try {
latch.await();
} catch (InterruptedException e) {
LOG.error("something InterruptedException", e);
} finally {
myCallbackExecutor.shutdown();
}
编辑
代码的灵感来自@Chris Povirk
(Or maybe you want for allSet to be a Future so that the user is handed a reference to the builder. Or maybe you don't need a full-on Future at all, only a CountDownLatch, in which you could use addCallback instead of transform and count down the latch at the end of the callback.) This approach may also simplify error handling.