ListenableFuture回调执行顺序

ListenableFuture callback execution order

Guava 的 ListenableFuture 库提供了一种为未来任务添加回调的机制。这是按如下方式完成的:

ListenableFuture<MyClass> future = myExecutor.submit(myCallable);
Futures.addCallback(future, new FutureCallback<MyClass>() {
    @Override
    public void onSuccess(@Nullable MyClass myClass) {
      doSomething(myClass);
    }

    @Override
    public void onFailure(Throwable t) {
      printWarning(t);
    }}, myCallbackExecutor);
}

您可以通过调用 get 函数等待 ListenableFuture 完成。例如:

MyClass myClass = future.get();

我的问题是,在 get 终止之前,某个未来的所有回调都保证 运行 吗? IE。如果未来有许多回调执行者注册了很多回调,所有的回调都会在 get returns 之前完成吗?

编辑

我的用例是,我将构建器传递给许多 classes。每个 class 填充构建器的一个字段。我希望异步填充所有字段,因为每个字段都需要外部查询来生成该字段的数据。我希望呼叫我的 asyncPopulateBuilder 的用户收到一个 Future,她可以在该 get 上呼叫 get 并确保所有字段都已填充。我想的方法如下:

final Builder b;
ListenableFuture<MyClass> future = myExecutor.submit(myCallable);
Futures.addCallback(future, new FutureCallback<MyClass>() {
  @Override
  public void onSuccess(@Nullable MyClass myClass) {
    b.setMyClass(myClass);
  }

  @Override
  public void onFailure(Throwable t) {
    printWarning(t);
  }}, myCallbackExecutor);
}
// Do the same thing for all other fields.

在这种情况下,在填充所有字段之前,推荐的阻止方式是什么?

不保证在 get return 秒之前回调 运行。更多内容见下文。

至于如何解决这个用例,我建议将每个字段数据的查询变成一个单独的 Future,将它们与 allAsList+transform 结合起来,然后采取对此采取行动。 (我们可能有一天会提供 a shortcut for the "combine" step。)

ListenableFuture<MyClass> future = myExecutor.submit(myCallable);

final ListenableFuture<Foo> foo =
    Futures.transform(
        future,
        new Function<MyClass, Foo>() { ... },
        myCallbackExecutor);
final ListenableFuture<Bar> bar = ...;
final ListenableFuture<Baz> baz = ...;

ListenableFuture<?> allAvailable = Futures.allAsList(foo, bar, baz);
ListenableFuture<?> allSet = Futures.transform(
    allAvailable, 
    new Function<Object, Object>() {
      @Override
      public Object apply(Object ignored) {
        // Use getUnchecked, since we know they already succeeded:
        builder.setFoo(Futures.getUnchecked(foo));
        builder.setFoo(Futures.getUnchecked(bar));
        builder.setFoo(Futures.getUnchecked(baz));
        return null;
      }
    }
};

现在用户可以调用 allSet.get() 等待填充。

(或者您可能希望 allSet 成为 Future<Builder> 以便用户获得对构建器的引用。或者您可能不需要完整的 Future,只有一个 CountDownLatch,您可以在其中使用 addCallback 而不是 transform,并在回调结束时对闩锁进行倒计时。)

这种方法还可以简化错误处理。


回复:"Do callbacks run before get?"

首先,我很确定我们不会在规范中的任何地方保证这一点,所以感谢您的询问而不是仅仅去争取它:) 如果您最终想要依赖当前实现的某些行为,请 file an issue 以便我们可以添加文档和测试。

其次,如果我从字面上理解你的问题,你所要求的是不可能的:如果 get() 等待所有听众完成,那么任何调用 get() 的听众都会挂!

你的问题的一个稍微宽松的版本是 "Will all the listeners at least start before get() returns?" 结果证明这也是不可能的:假设我将两个监听器附加到同一个 Future 上 运行 directExecutor()。两个听众都简单地调用 get() 和 return。其中一位听众必须先 运行。当它调用 get() 时,它将挂起,因为第二个侦听器尚未启动——在第一个侦听器完成之前也不能。 (更一般地说,依赖任何给定的 Executor 来迅速执行任务可能是危险的。)

一个更宽松的版本是 "Will the Future at least call submit() for each of the listeners before get() returns?" 但这最终会在我刚才描述的相同场景中出现问题:在 directExecutor() 运行 上调用 submit(firstListener) s 任务和调用 get(),在第二个侦听器启动之前无法完成,在第一个侦听器完成之前无法完成。

如果有的话,听起来更有可能 get() return 任何听众执行之前。但是由于线程调度的不可预测性,我们也不能依赖它。 (再说一次:它没有记录在案,所以请不要依赖它,除非你要求它被记录在案!)

final Builder b;
CountDownLatch latch = new CountDownLatch(1);
ListenableFuture<MyClass> future = myExecutor.submit(myCallable);

Futures.addCallback(future, new FutureCallback<MyClass>() {
 @Override
 public void onSuccess(@Nullable MyClass myClass) {
 b.setMyClass(myClass);
 latch.countDown();
}
 @Override
 public void onFailure(Throwable t) {
 printWarning(t);
 latch.countDown();
}, myCallbackExecutor);

try {
        latch.await();
    } catch (InterruptedException e) {
        LOG.error("something InterruptedException", e);
    } finally {
        myCallbackExecutor.shutdown();
    }

编辑

代码的灵感来自@Chris Povirk

(Or maybe you want for allSet to be a Future so that the user is handed a reference to the builder. Or maybe you don't need a full-on Future at all, only a CountDownLatch, in which you could use addCallback instead of transform and count down the latch at the end of the callback.) This approach may also simplify error handling.