Project Reactor 超时处理
Project Reactor timeout handling
我有三个与 Project Reactor 相关的问题,我将在下面提出。从我拥有的代码开始(它将被简化以便更容易理解问题)。
Mono<Integer> doWithSession(Function<String, Mono<Integer>> callback, long timeout) {
return Mono.just("hello")
.compose(monostr -> monostr
.doOnSuccess(str -> System.out.println("Suppose I want to release session here after all")) //(1)
.doOnCancel(() -> System.out.println("cancelled")) //(2)
.then(callback::apply)
.timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout)))
);
}
并测试:
@Test
public void testDoWithSession2() throws Exception {
Function<String, Mono<Integer>> fun1 = str -> Mono.fromCallable(() -> {
System.out.println("do some long timed work");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("work has completed");
return str.length();
});
StepVerifier.create(doWithSession(fun1,1000))
.verifyError(TimeoutException.class);
}
所以和问题:
- 如何中断
fun1
的调用并立即出现 return 错误? (也许我做错了什么,但看起来错误 returns 不是在超时之后而是在所有回调调用之后)
- 为什么
doOnSuccess
和doOnCancel
同时调用? (我预计将调用 (1) 或 (2) 但不会同时调用两者)
- 以及如何处理以下情况:
- 假设在代码中
Mono.just("hello")
正在获取连接;
- 在
callback
我正在做一些有联系的事情并得到一些结果(Mono<Integer>
在我的例子中);
- 最后(成功或失败)我想释放会话(我尝试在 (1) 中这样做)。
对于第一个问题,答案似乎是使用调度程序:
Mono<Integer> doWithSession(Function<String, Mono<Integer>> callback, long timeout) {
Scheduler single = Schedulers.single();
return Mono.just("hello")
.compose(monostr -> monostr
.publishOn(single) // use scheduler
.then(callback::apply)
.timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout)))
);
}
第三题可以这样解决:
private Mono<Integer> doWithSession3(Function<String, Mono<Integer>> callback, long timeout) {
Scheduler single = Schedulers.single();
return Mono.just("hello")
.then(str -> Mono.just(str) // here wrapping our string to new Mono
.publishOn(single)
.then(callback::apply)
.timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout)))
.doAfterTerminate((res, throwable) -> System.out.println("Do anything with your string" + str))
);
}
1) 如您所见,使用 .publishOn(Schedulers.single())
。这将确保可调用对象在另一个线程上被调用并且只阻塞所述线程。此外,它还允许取消可调用对象。
2) 链的顺序很重要。您将 .doOnSuccess
放在 compose
的开头(顺便说一下,您并不真正需要那个特定示例,除非您想提取该组合函数以供以后重用)。因此,这意味着它基本上从 Mono.just
获取通知,并在查询源时立即运行,甚至在您的处理发生之前......与 doOnCancel
相同。取消来自 timeout
触发...
3) 有一个工厂可以从资源中创建序列并确保资源被清理:Mono.using
。所以它看起来像这样:
public <T> Mono<T> doWithConnection(Function<String, Mono<T>> callback, long timeout) {
return Mono.using(
//the resource supplier:
() -> {
System.out.println("connection acquired");
return "hello";
},
//create a Mono out of the resource. On any termination, the resource is cleaned up
connection -> Mono.just(connection)
//the blocking callable needs own thread:
.publishOn(Schedulers.single())
//execute the callable and get result...
.then(callback::apply)
//...but cancel if it takes too long
.timeoutMillis(timeout)
//for demonstration we'll log when timeout triggers:
.doOnError(TimeoutException.class, e -> System.out.println("timed out")),
//the resource cleanup:
connection -> System.out.println("cleaned up " + connection));
}
returns 一个 Mono<T>
的可调用的 T 值。在生产代码中,您将订阅它来处理该值。测试中,StepVerifier.create()
会为您订阅
让我们用你的长 运行 任务来证明这一点,看看它输出什么:
@Test
public void testDoWithSession2() throws Exception {
Function<String, Mono<Integer>> fun1 = str -> Mono.fromCallable(() -> {
System.out.println("start some long timed work");
//for demonstration we'll print some clock ticks
for (int i = 1; i <= 5; i++) {
try {
Thread.sleep(1000);
System.out.println(i + "s...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("work has completed");
return str.length();
});
//let two ticks show up
StepVerifier.create(doWithConnection(fun1,2100))
.verifyError(TimeoutException.class);
}
这输出:
connection acquired
start some long timed work
1s...
2s...
timed out
cleaned up hello
如果我们将超时设置为超过 5000,我们将得到以下结果。 (存在断言错误,因为 StepVerifier 需要超时):
connection acquired
start some long timed work
1s...
2s...
3s...
4s...
5s...
work has completed
cleaned up hello
java.lang.AssertionError: expectation "expectError(Class)" failed (expected: onError(TimeoutException); actual: onNext(5)
我有三个与 Project Reactor 相关的问题,我将在下面提出。从我拥有的代码开始(它将被简化以便更容易理解问题)。
Mono<Integer> doWithSession(Function<String, Mono<Integer>> callback, long timeout) {
return Mono.just("hello")
.compose(monostr -> monostr
.doOnSuccess(str -> System.out.println("Suppose I want to release session here after all")) //(1)
.doOnCancel(() -> System.out.println("cancelled")) //(2)
.then(callback::apply)
.timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout)))
);
}
并测试:
@Test
public void testDoWithSession2() throws Exception {
Function<String, Mono<Integer>> fun1 = str -> Mono.fromCallable(() -> {
System.out.println("do some long timed work");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("work has completed");
return str.length();
});
StepVerifier.create(doWithSession(fun1,1000))
.verifyError(TimeoutException.class);
}
所以和问题:
- 如何中断
fun1
的调用并立即出现 return 错误? (也许我做错了什么,但看起来错误 returns 不是在超时之后而是在所有回调调用之后) - 为什么
doOnSuccess
和doOnCancel
同时调用? (我预计将调用 (1) 或 (2) 但不会同时调用两者) - 以及如何处理以下情况:
- 假设在代码中
Mono.just("hello")
正在获取连接; - 在
callback
我正在做一些有联系的事情并得到一些结果(Mono<Integer>
在我的例子中); - 最后(成功或失败)我想释放会话(我尝试在 (1) 中这样做)。
- 假设在代码中
对于第一个问题,答案似乎是使用调度程序:
Mono<Integer> doWithSession(Function<String, Mono<Integer>> callback, long timeout) {
Scheduler single = Schedulers.single();
return Mono.just("hello")
.compose(monostr -> monostr
.publishOn(single) // use scheduler
.then(callback::apply)
.timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout)))
);
}
第三题可以这样解决:
private Mono<Integer> doWithSession3(Function<String, Mono<Integer>> callback, long timeout) {
Scheduler single = Schedulers.single();
return Mono.just("hello")
.then(str -> Mono.just(str) // here wrapping our string to new Mono
.publishOn(single)
.then(callback::apply)
.timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout)))
.doAfterTerminate((res, throwable) -> System.out.println("Do anything with your string" + str))
);
}
1) 如您所见,使用 .publishOn(Schedulers.single())
。这将确保可调用对象在另一个线程上被调用并且只阻塞所述线程。此外,它还允许取消可调用对象。
2) 链的顺序很重要。您将 .doOnSuccess
放在 compose
的开头(顺便说一下,您并不真正需要那个特定示例,除非您想提取该组合函数以供以后重用)。因此,这意味着它基本上从 Mono.just
获取通知,并在查询源时立即运行,甚至在您的处理发生之前......与 doOnCancel
相同。取消来自 timeout
触发...
3) 有一个工厂可以从资源中创建序列并确保资源被清理:Mono.using
。所以它看起来像这样:
public <T> Mono<T> doWithConnection(Function<String, Mono<T>> callback, long timeout) {
return Mono.using(
//the resource supplier:
() -> {
System.out.println("connection acquired");
return "hello";
},
//create a Mono out of the resource. On any termination, the resource is cleaned up
connection -> Mono.just(connection)
//the blocking callable needs own thread:
.publishOn(Schedulers.single())
//execute the callable and get result...
.then(callback::apply)
//...but cancel if it takes too long
.timeoutMillis(timeout)
//for demonstration we'll log when timeout triggers:
.doOnError(TimeoutException.class, e -> System.out.println("timed out")),
//the resource cleanup:
connection -> System.out.println("cleaned up " + connection));
}
returns 一个 Mono<T>
的可调用的 T 值。在生产代码中,您将订阅它来处理该值。测试中,StepVerifier.create()
会为您订阅
让我们用你的长 运行 任务来证明这一点,看看它输出什么:
@Test
public void testDoWithSession2() throws Exception {
Function<String, Mono<Integer>> fun1 = str -> Mono.fromCallable(() -> {
System.out.println("start some long timed work");
//for demonstration we'll print some clock ticks
for (int i = 1; i <= 5; i++) {
try {
Thread.sleep(1000);
System.out.println(i + "s...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("work has completed");
return str.length();
});
//let two ticks show up
StepVerifier.create(doWithConnection(fun1,2100))
.verifyError(TimeoutException.class);
}
这输出:
connection acquired
start some long timed work
1s...
2s...
timed out
cleaned up hello
如果我们将超时设置为超过 5000,我们将得到以下结果。 (存在断言错误,因为 StepVerifier 需要超时):
connection acquired
start some long timed work
1s...
2s...
3s...
4s...
5s...
work has completed
cleaned up hello
java.lang.AssertionError: expectation "expectError(Class)" failed (expected: onError(TimeoutException); actual: onNext(5)