短路 CompletionStage 链
Short circuiting the chain of CompletionStage
我正在使用 Java 8,我有一个 CompletionStage 链,我正在尝试 运行.
我不想使用 join()
或 get()
,我想明确地完成 CompletionStage
。
我正在尝试 运行 两个数据库查询,第二个查询依赖于第一个查询的结果。我正在使用会话启动数据库事务,运行宁写查询 1,写查询 2,只有当两者都成功时我才想提交事务或回滚它。
事务和会话是 Neo4j java API https://neo4j.com/docs/api/java-driver/current/org/neo4j/driver/async/AsyncSession.html#writeTransactionAsync-org.neo4j.driver.async.AsyncTransactionWork-
的一部分
在 运行 两个查询之后 success/failure 我想关闭会话(标准数据库实践)
这是伪代码-
DB Session starts transaction
run Write Query1
run Write Query2
if both are successful
commit transaction
else
rollback transaction
close session
我想要实现的是,如果 query1/query2 失败,那么它应该只回滚事务并关闭会话。
如果查询 1 的结果不正确(小于某个阈值),查询 1 也可以抛出 CustomException
。在这种情况下,它应该回滚事务。我正在为每个查询回滚 exceptionally
块中的事务。
快乐路径在下面的代码中运行良好,但是当我想抛出 CustomException
时,Query2 块没有被调用,甚至 Completable.allOf
也从未被调用。
CompletableFuture<String> firstFuture = new CompletableFuture();
CompletableFuture<String> secondFuture = new CompletableFuture();
CompletableFuture<String> lastFuture = new CompletableFuture();
//Lambda that executes transaction
TransactionWork<CompletionStage<String>> runTransactionWork = transaction -> {
//Write Query1
transaction.runAsync("DB WRITE QUERY1") //Running Write Query 1
.thenCompose(someFunctionThatReturnsCompletionStage)
.thenApply(val -> {
//throw CustomException if value less then threshold
if(val < threshold){
throw new CustomException("Incorrect value found");
}else{
//if value is correct then complete future
firstFuture.complete(val);
}
firstQuery.complete(val);
}).exceptionally(error -> {
//Since failure occured in Query1 want to roll back
transaction.rollbackAsync();
firstFuture.completeExceptionally(error);
throw new RuntimeException("There has been an error in first query " + error.getMessage());
});
//after the first write query is done then run the second write query
firstFuture.thenCompose(val -> transaction.runAsync("DB Write QUERY 2"))
.thenCompose(someFunctionThatReturnsCompletionStage)
.thenApply(val -> {
//if value is correct then complete
secondFuture.complete(val);
}
}).exceptionally(error -> {
//Incase of failure in Query2 want to roll back
transaction.rollbackAsync();
secondFuture.completeExceptionally(error);
throw new RuntimeException("There has been an error in second query " + error.getMessage());
});
//wait for both to complete and then complete the last future
CompletableFuture.allOf(firstFuture, secondFuture)
.handle((empty, ex) -> {
if(ex != null){
lastFuture.completeExceptionally(ex);
}else{
//commit the transaction
transaction.commitAsync();
lastFuture.complete("OK");
}
return lastFuture;
});
return lastFuture;
}
//Create a database session
Session session = driver.session();
//runTransactionWork is lambda that has access to transaction
session.writeTransactionAsync(runTransactionWork)
.handle((val, err) -> {
if(val != null){
session.closeAsync();
//send message to some broker about success
}else{
//fail logic
}
});
如何实现短路异常以确保回滚事务并直接进入会话中的异常块。
这些是我对根据不同用例调用的代码块的观察,注意这些是基于我在代码中放置的调试点 -
- 快乐之路 - firstFuture(成功)-> secondFuture(成功)-> LastFuture(成功)-> 调用会话块成功(工作正常)
- 第一个 Future 失败 - firstFuture(因异常而失败)-> secondFuture(从未调用过)-> LastFuture(从未调用过)-> 会话块失败(从未调用过)
- 第二个 Future 失败 - firstFuture(成功)-> secondFuture(因异常而失败)-> LastFuture(从未调用)-> 会话块失败(从未调用)
我希望 #2 和 #3 也能正常工作,应该回滚相应的事务并关闭会话。
我的问题是,为什么 allOf
的 handle
中的例外部分在未来 completesExceptionally
之一时没有被调用?
当您抛出 CustomException
时,firstFuture
未完成。事实上,它什么也没有发生。因为没有完成(成功),所以这个:
firstFuture.thenCompose...
不会被执行。 thenCompose
的文档说:
When this stage completes normally, the given function is invoked with this stage's result as the argument...
既然不是这样,那段代码显然是不会被触发的。因此,secondFuture
也没有任何反应,因此 CompletableFuture::allOf
必须正好为零。可能是一个简化的例子会有所帮助:
public class CF {
public static void main(String[] args) {
CompletableFuture<Void> one = CompletableFuture.runAsync(CF::db1);
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
System.out.println(one.isCompletedExceptionally());
CompletableFuture<Void> two = one.thenRun(CF::db2);
System.out.println("first is done : " + FIRST_FUTURE.isDone());
System.out.println("second is done : " + SECOND_FUTURE.isDone());
CompletableFuture.allOf(FIRST_FUTURE, SECOND_FUTURE).thenRun(() -> {
System.out.println("allOf");
});
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
}
private static final boolean FAIL = true;
private static final CompletableFuture<String> FIRST_FUTURE = new CompletableFuture<>();
private static final CompletableFuture<String> SECOND_FUTURE = new CompletableFuture<>();
private static void db1() {
if(FAIL) {
throw new RuntimeException("failed one");
} else {
FIRST_FUTURE.complete("42");
}
}
private static void db2() {
System.out.println("Running");
SECOND_FUTURE.complete("42");
}
}
如果你运行这个,你会发现没有打印任何东西...
很遗憾,我不熟悉 Neo4j
,但您很可能可以根据自己的需要调整此示例:
public class CF {
public static void main(String[] args) {
CompletableFuture<Void> one = CompletableFuture.runAsync(CF::db1);
CompletableFuture<Void> terminal =
one.whenComplete((ok, th) -> {
if(th != null || FIRST_FUTURE.isCompletedExceptionally()) {
// no need to schedule the second one, need to rollback whatever the first one did
// transaction.rollbackAsync();
System.out.println("rollback because first one failed");
LAST_FUTURE.completeExceptionally(new RuntimeException("because first one failed"));
} else {
CompletableFuture<Void> two = CompletableFuture.runAsync(CF::db2);
two.whenComplete((ok2, th2) -> {
if(th2 != null || SECOND_FUTURE.isCompletedExceptionally()) {
System.out.println("rollback because second one failed");
// transaction.rollbackAsync();
LAST_FUTURE.completeExceptionally(new RuntimeException("because second one failed"));
} else {
LAST_FUTURE.complete("OK");
}
});
}
});
// simulate that someone will call this
terminal.join();
System.out.println(LAST_FUTURE.join());
}
private static final boolean FAIL_ONE = false;
private static final boolean FAIL_TWO = true;
private static final CompletableFuture<String> FIRST_FUTURE = new CompletableFuture<>();
private static final CompletableFuture<String> SECOND_FUTURE = new CompletableFuture<>();
private static final CompletableFuture<String> LAST_FUTURE = new CompletableFuture<>();
private static void db1() {
if(FAIL_ONE) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
RuntimeException ex = new RuntimeException("failed one");;
FIRST_FUTURE.completeExceptionally(ex);
} else {
FIRST_FUTURE.complete("42");
}
}
private static void db2() {
if(FAIL_TWO) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
RuntimeException ex = new RuntimeException("failed one");;
SECOND_FUTURE.completeExceptionally(ex);
} else {
SECOND_FUTURE.complete("42");
}
}
}
我正在使用 Java 8,我有一个 CompletionStage 链,我正在尝试 运行.
我不想使用 join()
或 get()
,我想明确地完成 CompletionStage
。
我正在尝试 运行 两个数据库查询,第二个查询依赖于第一个查询的结果。我正在使用会话启动数据库事务,运行宁写查询 1,写查询 2,只有当两者都成功时我才想提交事务或回滚它。 事务和会话是 Neo4j java API https://neo4j.com/docs/api/java-driver/current/org/neo4j/driver/async/AsyncSession.html#writeTransactionAsync-org.neo4j.driver.async.AsyncTransactionWork-
的一部分在 运行 两个查询之后 success/failure 我想关闭会话(标准数据库实践)
这是伪代码-
DB Session starts transaction
run Write Query1
run Write Query2
if both are successful
commit transaction
else
rollback transaction
close session
我想要实现的是,如果 query1/query2 失败,那么它应该只回滚事务并关闭会话。
如果查询 1 的结果不正确(小于某个阈值),查询 1 也可以抛出 CustomException
。在这种情况下,它应该回滚事务。我正在为每个查询回滚 exceptionally
块中的事务。
快乐路径在下面的代码中运行良好,但是当我想抛出 CustomException
时,Query2 块没有被调用,甚至 Completable.allOf
也从未被调用。
CompletableFuture<String> firstFuture = new CompletableFuture();
CompletableFuture<String> secondFuture = new CompletableFuture();
CompletableFuture<String> lastFuture = new CompletableFuture();
//Lambda that executes transaction
TransactionWork<CompletionStage<String>> runTransactionWork = transaction -> {
//Write Query1
transaction.runAsync("DB WRITE QUERY1") //Running Write Query 1
.thenCompose(someFunctionThatReturnsCompletionStage)
.thenApply(val -> {
//throw CustomException if value less then threshold
if(val < threshold){
throw new CustomException("Incorrect value found");
}else{
//if value is correct then complete future
firstFuture.complete(val);
}
firstQuery.complete(val);
}).exceptionally(error -> {
//Since failure occured in Query1 want to roll back
transaction.rollbackAsync();
firstFuture.completeExceptionally(error);
throw new RuntimeException("There has been an error in first query " + error.getMessage());
});
//after the first write query is done then run the second write query
firstFuture.thenCompose(val -> transaction.runAsync("DB Write QUERY 2"))
.thenCompose(someFunctionThatReturnsCompletionStage)
.thenApply(val -> {
//if value is correct then complete
secondFuture.complete(val);
}
}).exceptionally(error -> {
//Incase of failure in Query2 want to roll back
transaction.rollbackAsync();
secondFuture.completeExceptionally(error);
throw new RuntimeException("There has been an error in second query " + error.getMessage());
});
//wait for both to complete and then complete the last future
CompletableFuture.allOf(firstFuture, secondFuture)
.handle((empty, ex) -> {
if(ex != null){
lastFuture.completeExceptionally(ex);
}else{
//commit the transaction
transaction.commitAsync();
lastFuture.complete("OK");
}
return lastFuture;
});
return lastFuture;
}
//Create a database session
Session session = driver.session();
//runTransactionWork is lambda that has access to transaction
session.writeTransactionAsync(runTransactionWork)
.handle((val, err) -> {
if(val != null){
session.closeAsync();
//send message to some broker about success
}else{
//fail logic
}
});
如何实现短路异常以确保回滚事务并直接进入会话中的异常块。
这些是我对根据不同用例调用的代码块的观察,注意这些是基于我在代码中放置的调试点 -
- 快乐之路 - firstFuture(成功)-> secondFuture(成功)-> LastFuture(成功)-> 调用会话块成功(工作正常)
- 第一个 Future 失败 - firstFuture(因异常而失败)-> secondFuture(从未调用过)-> LastFuture(从未调用过)-> 会话块失败(从未调用过)
- 第二个 Future 失败 - firstFuture(成功)-> secondFuture(因异常而失败)-> LastFuture(从未调用)-> 会话块失败(从未调用)
我希望 #2 和 #3 也能正常工作,应该回滚相应的事务并关闭会话。
我的问题是,为什么 allOf
的 handle
中的例外部分在未来 completesExceptionally
之一时没有被调用?
当您抛出 CustomException
时,firstFuture
未完成。事实上,它什么也没有发生。因为没有完成(成功),所以这个:
firstFuture.thenCompose...
不会被执行。 thenCompose
的文档说:
When this stage completes normally, the given function is invoked with this stage's result as the argument...
既然不是这样,那段代码显然是不会被触发的。因此,secondFuture
也没有任何反应,因此 CompletableFuture::allOf
必须正好为零。可能是一个简化的例子会有所帮助:
public class CF {
public static void main(String[] args) {
CompletableFuture<Void> one = CompletableFuture.runAsync(CF::db1);
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
System.out.println(one.isCompletedExceptionally());
CompletableFuture<Void> two = one.thenRun(CF::db2);
System.out.println("first is done : " + FIRST_FUTURE.isDone());
System.out.println("second is done : " + SECOND_FUTURE.isDone());
CompletableFuture.allOf(FIRST_FUTURE, SECOND_FUTURE).thenRun(() -> {
System.out.println("allOf");
});
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
}
private static final boolean FAIL = true;
private static final CompletableFuture<String> FIRST_FUTURE = new CompletableFuture<>();
private static final CompletableFuture<String> SECOND_FUTURE = new CompletableFuture<>();
private static void db1() {
if(FAIL) {
throw new RuntimeException("failed one");
} else {
FIRST_FUTURE.complete("42");
}
}
private static void db2() {
System.out.println("Running");
SECOND_FUTURE.complete("42");
}
}
如果你运行这个,你会发现没有打印任何东西...
很遗憾,我不熟悉 Neo4j
,但您很可能可以根据自己的需要调整此示例:
public class CF {
public static void main(String[] args) {
CompletableFuture<Void> one = CompletableFuture.runAsync(CF::db1);
CompletableFuture<Void> terminal =
one.whenComplete((ok, th) -> {
if(th != null || FIRST_FUTURE.isCompletedExceptionally()) {
// no need to schedule the second one, need to rollback whatever the first one did
// transaction.rollbackAsync();
System.out.println("rollback because first one failed");
LAST_FUTURE.completeExceptionally(new RuntimeException("because first one failed"));
} else {
CompletableFuture<Void> two = CompletableFuture.runAsync(CF::db2);
two.whenComplete((ok2, th2) -> {
if(th2 != null || SECOND_FUTURE.isCompletedExceptionally()) {
System.out.println("rollback because second one failed");
// transaction.rollbackAsync();
LAST_FUTURE.completeExceptionally(new RuntimeException("because second one failed"));
} else {
LAST_FUTURE.complete("OK");
}
});
}
});
// simulate that someone will call this
terminal.join();
System.out.println(LAST_FUTURE.join());
}
private static final boolean FAIL_ONE = false;
private static final boolean FAIL_TWO = true;
private static final CompletableFuture<String> FIRST_FUTURE = new CompletableFuture<>();
private static final CompletableFuture<String> SECOND_FUTURE = new CompletableFuture<>();
private static final CompletableFuture<String> LAST_FUTURE = new CompletableFuture<>();
private static void db1() {
if(FAIL_ONE) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
RuntimeException ex = new RuntimeException("failed one");;
FIRST_FUTURE.completeExceptionally(ex);
} else {
FIRST_FUTURE.complete("42");
}
}
private static void db2() {
if(FAIL_TWO) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
RuntimeException ex = new RuntimeException("failed one");;
SECOND_FUTURE.completeExceptionally(ex);
} else {
SECOND_FUTURE.complete("42");
}
}
}