顺序调用 CompletableFutures

Sequential call CompletableFutures

我有无限的承诺队列 (completablefuture) 作为输入。 目标是 运行 一个接一个地承诺,直到结果满足条件并停止处理,并且 return 来自当前承诺的结果。

我的迭代解决方案如下所示:

volatile boolean shouldKeepReading = true;
....
CompletableFuture<Integer> result = promisesQueue.poll().get();

while (shouldKeepReading) {
      result = result.thenCompose(res -> {
        if (conditionPass(res)) {
          shouldKeepReading = false;
          return CompletableFuture.completedFuture(0));
        } else {
          if (shouldKeepReading) {
            return promisesQueue.poll().get();
          } else {
            return CompletableFuture.completedFuture(0));
          }
        }
      });
  1. 我使用带有可变标志的无限循环来控制处理。 volatile 保证内存对所有读者可见。一旦满足条件,控制标志将设置为 false 以停止处理。
  2. 我在阅读下一项之前进行了双重检查。
if (shouldKeepReading) {
            return promisesQueue.poll().get();

代码似乎工作正常但注意到这里不需要 volatile 关键字,它不会改变处理。为什么 ?我错过了什么吗? 您看到该代码有什么问题吗?

HotSpot JVM 相当保守。很容易将其他线程进行的写入操作看成是其他不相关的具有更强内存保证的读取和写入操作的副作用。

例如,在您的情况下,thenCompose 检查未来的完成状态,而函数的特定实现调用者将更改完成状态。即使状态为“未完成”,这似乎也能达到预期的效果,在这种情况下,没有正式的 happens-before 关系,或者在下一个链上实际调用 thenApply 时future 也没有建立 happens-before 关系,因为它是一个不同的变量。

换句话说,它似乎可以在没有 volatile 的情况下使用此 JVM 实现,但不能保证,因此您永远不应依赖这种行为。

更糟糕的是,即使使用 volatile,您的代码也不能保证正常工作。
您的代码的基本形状是

CompletableFuture<Integer> result = …

while (shouldKeepReading) {
  result = result.thenCompose(…);
}

这意味着只要初始未来尚未完成,此循环就可以链接任意数量的依赖操作,直到依赖链的完成设法赶上。这个循环引起的系统负载甚至可能阻止链赶上,直到遇到 OutOfMemoryError.

只要完成链设法赶上,您就不会注意到差异,因为一旦 shouldKeepReading 变为 false,所有链式操作的计算结果都是相同的,即零.

由于原始 future 源自 promisesQueue.poll().get() 范围之外,我们可以通过插入一个小延迟来模拟更高的工作量。然后,添加一个计数器以查看最终结果没有说明什么,例如

AtomicInteger chainedOps = new AtomicInteger();

CompletableFuture<Integer> result = promisesQueue.poll().get();
result = result.whenCompleteAsync(
    (x,y) -> LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2)));

while(shouldKeepReading) {
    result = result.thenCompose(res -> {
        chainedOps.incrementAndGet();
        if(conditionPass(res)) {
            shouldKeepReading = false;
            return CompletableFuture.completedFuture(0);
        } else {
            if (shouldKeepReading) {
                return promisesQueue.poll().get();
            } else {
                return CompletableFuture.completedFuture(0);
            }
        }
    });
}
result.join();
System.out.println(chainedOps.get() + " chained ops");

在我的机器上,循环很容易链接超过五百万个动作,即使第一个 conditionPass returns true 也是如此。

解决方法很简单。既不使用标志变量也不使用循环

result = result.thenCompose(new Function<Integer, CompletionStage<Integer>>() {
    @Override
    public CompletionStage<Integer> apply(Integer res) {
        // for testing, do chainedOps.incrementAndGet();
        return conditionPass(res)? CompletableFuture.completedFuture(0):
            promisesQueue.poll().get().thenCompose(this);
      }
});

仅当条件未满足时才调用 thenCompose,因此绝不会链接不必要的操作。由于它要求函数本身通过 thenCompose(this) 链接,lambda 必须由匿名内部 class 替换。如果你不喜欢这样,你可以求助于递归解决方案

CompletableFuture<Integer> retryPoll() {
    CompletableFuture<Integer> result = promisesQueue.poll().get();
    return result.thenComposeAsync(res ->
        conditionPass(res)? CompletableFuture.completedFuture(0): retryPoll());
}

这里非常简单,因为重试不依赖于先前评估的结果(否则您需要引入参数),而是依赖于 promisesQueue.poll().get() 对程序状态所做的更改。

此方法使用 thenComposeAsync 来避免深度递归,如果有大量已完成的期货结果被 conditionPass 拒绝。如果您确定 conditionPass 会在相当少量的重试后成功,您可以将 thenComposeAsync 更改为 thenCompose