completableFuture whenComplete 奇怪的行为

completableFuture whenComplete strange behavior

我在使用 completableFutures 时遇到一些奇怪的行为 请参阅下面的代码片段,以便于更好地理解。 我遍历消息列表并在每条消息上调用 handleMessage。 handleMessage 方法首先调用 getDataContentIdByService 最后调用 mappingService.process 方法。

但问题是一旦 getDataContentIdByService 方法处理完成,在 mappingService.process 方法执行完成之前,调用返回到 whenComplete 阶段。

我想要的是 getDataContentIdByService 和 mappingService.process 方法都应该按顺序完成执行,然后调用 whenComplete 阶段。

我的代码有问题吗..或者?有人可以帮忙吗?


Completablefuture.allOf(messages.getList().stream()
        .filter(this::msgOkOrLog)
        .filter(this::notOnDenyList)
        .map(msg -> handleMessage(msg, messages.getTrackingIdentifier(), messages.getMessageType()))
        .toArray(CompletableFuture<?>[]::new))
      .whenComplete((input, exception) -> countErrorsInContainer(messages));

handleMessage 函数


    protected CompletableFuture<Void> handleMessage(InternalProxyMessage message,
        TrackingIdentifier containerTid, MessageType messageType) {
    return getDataContentIdByService(message)
                       .thenAccept(message::setContentId)
                       .thenAccept(  
              mappingService.process(message)
                      .exceptionally(
                              ex -> {
                                  throw new InternalServerErrorException("Unmanaged Error", ex);
                                }
                              })));
    }

您的示例可以大大简化:

public static void main(String[] args) {

    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "initiated");
    CompletableFuture<Void> cf2 = cf1.thenAccept(x -> {
        CompletableFuture<String> response = composed(x);
        return;
    });
    System.out.println("Is it done : " + cf2.isDone());
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));

}

private static CompletableFuture<String> composed(String s) {
    return CompletableFuture.supplyAsync(() -> {
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        System.out.println("composed is actually finished");
        return "done";
    });
}

cf1完成时,会调用thenAccept;但当然,由于 composed 在另一个线程(即 CompletableFuture::supplyAsync)中被调用,因此 return 语句将在完成之前被命中。 运行 上面的例子显示:

Is it done : true
composed is actually finished

另一方面,如果您将代码更改为:

CompletableFuture<String> cf2 = cf1.thenCompose(x -> {
     CompletableFuture<String> response = composed(x);
     return response;
});

您现在返回 composed 的结果。所以当cf1完成后,thenCompose就会被调用;所以cf2只有当内在的未来完成时才会完成。