短路 CompletionStage 链

Short circuiting the chain of CompletionStage

我正在使用 Java 8,我有一个 CompletionStage 链,我正在尝试 运行.

我不想使用 join()get(),我想明确地完成 CompletionStage

我正在尝试 运行 两个数据库查询,第二个查询依赖于第一个查询的结果。我正在使用会话启动数据库事务,运行宁写查询 1,写查询 2,只有当两者都成功时我才想提交事务或回滚它。 事务和会话是 Neo4j java API https://neo4j.com/docs/api/java-driver/current/org/neo4j/driver/async/AsyncSession.html#writeTransactionAsync-org.neo4j.driver.async.AsyncTransactionWork-

的一部分

在 运行 两个查询之后 success/failure 我想关闭会话(标准数据库实践)

这是伪代码-

DB Session starts transaction
    run Write Query1
    run Write Query2
    
    if both are successful
       commit transaction
    else
       rollback transaction
close session

我想要实现的是,如果 query1/query2 失败,那么它应该只回滚事务并关闭会话。

如果查询 1 的结果不正确(小于某个阈值),查询 1 也可以抛出 CustomException。在这种情况下,它应该回滚事务。我正在为每个查询回滚 exceptionally 块中的事务。

快乐路径在下面的代码中运行良好,但是当我想抛出 CustomException 时,Query2 块没有被调用,甚至 Completable.allOf 也从未被调用。

CompletableFuture<String> firstFuture = new CompletableFuture();
CompletableFuture<String> secondFuture = new CompletableFuture();
CompletableFuture<String> lastFuture = new CompletableFuture();


//Lambda that executes transaction
TransactionWork<CompletionStage<String>> runTransactionWork = transaction -> {

     //Write Query1
       transaction.runAsync("DB WRITE QUERY1") //Running Write Query 1
              .thenCompose(someFunctionThatReturnsCompletionStage)
              .thenApply(val -> {
                     //throw CustomException if value less then threshold
                     if(val < threshold){
                         throw new CustomException("Incorrect value found");
                     }else{
                       //if value is correct then complete future
                       firstFuture.complete(val);
                     }
                  firstQuery.complete(val);
              }).exceptionally(error -> {
                        //Since failure occured in Query1 want to roll back
                        transaction.rollbackAsync();
                        firstFuture.completeExceptionally(error);
                        throw new RuntimeException("There has been an error in first query " + error.getMessage());
                  });

         //after the first write query is done then run the second write query
         firstFuture.thenCompose(val -> transaction.runAsync("DB Write QUERY 2"))
                   .thenCompose(someFunctionThatReturnsCompletionStage)
                   .thenApply(val -> {                      
                       //if value is correct then complete
                       secondFuture.complete(val);
                     }
                   }).exceptionally(error -> {
                        //Incase of failure in Query2 want to roll back
                        transaction.rollbackAsync();
                        secondFuture.completeExceptionally(error);
                        throw new RuntimeException("There has been an error in second query " + error.getMessage());
                  });


   //wait for both to complete and then complete the last future
   CompletableFuture.allOf(firstFuture, secondFuture)
                    .handle((empty, ex) -> {
                        if(ex != null){
                            lastFuture.completeExceptionally(ex);
                        }else{
                            //commit the transaction
                            transaction.commitAsync();
                            lastFuture.complete("OK");
                        }

                        return lastFuture;
                    });

            return lastFuture;
}

 //Create a database session
 Session session = driver.session();

 //runTransactionWork is lambda that has access to transaction
 session.writeTransactionAsync(runTransactionWork)
      .handle((val, err) -> {
         if(val != null){
            session.closeAsync();
            //send message to some broker about success
         }else{
            //fail logic 
         }
      });


如何实现短路异常以确保回滚事务并直接进入会话中的异常块。

这些是我对根据不同用例调用的代码块的观察,注意这些是基于我在代码中放置的调试点 -

  1. 快乐之路 - firstFuture(成功)-> secondFuture(成功)-> LastFuture(成功)-> 调用会话块成功(工作正常)
  2. 第一个 Future 失败 - firstFuture(因异常而失败)-> secondFuture(从未调用过)-> LastFuture(从未调用过)-> 会话块失败(从未调用过)
  3. 第二个 Future 失败 - firstFuture(成功)-> secondFuture(因异常而失败)-> LastFuture(从未调用)-> 会话块失败(从未调用)

我希望 #2 和 #3 也能正常工作,应该回滚相应的事务并关闭会话。

我的问题是,为什么 allOfhandle 中的例外部分在未来 completesExceptionally 之一时没有被调用?

当您抛出 CustomException 时,firstFuture 未完成。事实上,它什么也没有发生。因为没有完成(成功),所以这个:

firstFuture.thenCompose...

不会被执行。 thenCompose 的文档说:

When this stage completes normally, the given function is invoked with this stage's result as the argument...

既然不是这样,那段代码显然是不会被触发的。因此,secondFuture 也没有任何反应,因此 CompletableFuture::allOf 必须正好为零。可能是一个简化的例子会有所帮助:

public class CF {

  public static void main(String[] args) {
    CompletableFuture<Void> one = CompletableFuture.runAsync(CF::db1);
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
    System.out.println(one.isCompletedExceptionally());

    CompletableFuture<Void> two = one.thenRun(CF::db2);

    System.out.println("first is done : " + FIRST_FUTURE.isDone());
    System.out.println("second is done : " + SECOND_FUTURE.isDone());
    CompletableFuture.allOf(FIRST_FUTURE, SECOND_FUTURE).thenRun(() -> {
      System.out.println("allOf");
    });
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
  }

  private static final boolean FAIL = true;
  private static final CompletableFuture<String> FIRST_FUTURE = new CompletableFuture<>();
  private static final CompletableFuture<String> SECOND_FUTURE = new CompletableFuture<>();

  private static void db1() {
    if(FAIL) {
      throw new RuntimeException("failed one");
    } else {
      FIRST_FUTURE.complete("42");
    }
  }

  private static void db2() {
    System.out.println("Running");
    SECOND_FUTURE.complete("42");
  }

}

如果你运行这个,你会发现没有打印任何东西...


很遗憾,我不熟悉 Neo4j,但您很可能可以根据自己的需要调整此示例:

public class CF {

  public static void main(String[] args) {
    CompletableFuture<Void> one = CompletableFuture.runAsync(CF::db1);

    CompletableFuture<Void> terminal =
    one.whenComplete((ok, th) -> {
      if(th != null || FIRST_FUTURE.isCompletedExceptionally()) {
        // no need to schedule the second one, need to rollback whatever the first one did
        // transaction.rollbackAsync();
        System.out.println("rollback because first one failed");
        LAST_FUTURE.completeExceptionally(new RuntimeException("because first one failed"));
      } else {
        CompletableFuture<Void> two = CompletableFuture.runAsync(CF::db2);
        two.whenComplete((ok2, th2) -> {
          if(th2 != null || SECOND_FUTURE.isCompletedExceptionally()) {
            System.out.println("rollback because second one failed");
            // transaction.rollbackAsync();
            LAST_FUTURE.completeExceptionally(new RuntimeException("because second one failed"));
          } else {
            LAST_FUTURE.complete("OK");
          }
        });
      }
    });

    // simulate that someone will call this
    terminal.join();
    System.out.println(LAST_FUTURE.join());

  }

  private static final boolean FAIL_ONE = false;
  private static final boolean FAIL_TWO = true;
  private static final CompletableFuture<String> FIRST_FUTURE = new CompletableFuture<>();
  private static final CompletableFuture<String> SECOND_FUTURE = new CompletableFuture<>();
  private static final CompletableFuture<String> LAST_FUTURE = new CompletableFuture<>();

  private static void db1() {
    if(FAIL_ONE) {
      LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
      RuntimeException ex = new RuntimeException("failed one");;
      FIRST_FUTURE.completeExceptionally(ex);
    } else {
      FIRST_FUTURE.complete("42");
    }
  }

  private static void db2() {
    if(FAIL_TWO) {
      LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
      RuntimeException ex = new RuntimeException("failed one");;
      SECOND_FUTURE.completeExceptionally(ex);
    } else {
      SECOND_FUTURE.complete("42");
    }
  }

}