return 如何直接从 Uni<T> 中获取值而不将其转换为 CompletableFuture
how can one return a value directly from Uni<T> without converting that into a CompletableFuture
给个上下文,我在我的class中有两个方法都是returns Uni,其中第一个方法以如下方式依赖于第二个方法
public Uni<String> greeting(String name) {
log.info("\n\n");
log.info("\t`greeting(String name)` Executing on Thread {}",Thread.currentThread().getName());
return Uni
.createFrom()
.item(name) //synchronous now imagine you have retrieve a value from an I/O call you will have to pass a supplier, ref README.md#Links.1
.emitOn(emitExecutor)
.onItem()
.transform(parameter-> {
log.debug("`(p)>Transform` invoked on Thread {}",Thread.currentThread().getName());
assert Thread.currentThread().getName().equals(threadName);
try {
return ioSimulation(parameter,Thread.currentThread().getName()).subscribeAsCompletionStage().get();
} catch (InterruptedException | ExecutionException e) {
log.error("failed to execute ioSimulation due to {}",e.getMessage());
throw new RuntimeException("failed to communicate with client {}"+e.getMessage());
}
}).onFailure()
.retry()
.atMost(2);
}
public Uni<String> ioSimulation(String param,String threadName){
log.debug("`ioSimulation(String param)` Executing on Thread {}",Thread.currentThread().getName());
assert Thread.currentThread().getName().equals(threadName);
return MockServer.client
.getAbs("http://localhost:80")
.addQueryParam("name",param)
.send()
.onItem().transform(response-> {
if (response.statusCode() == 200){
return response.bodyAsString();
}else{
throw new IllegalStateException(response.bodyAsString());
}
});
现在在 greeting(String name)
(第一个)方法中 return 我必须使用 subscribeAsCompletionStage().get()
的字符串值,否则 return 类型将是 Uni<Uni<String>>
.
我的问题如下
- 是一种方法,我可以在不使用
subscribeAsCompletionStage().get()
的情况下 return 第二种方法 ioSimulation
的实际结果(包裹在 Uni 中的精确值)如果是,那我怎么能这样做吗?
- 这样做有什么更好的办法吗?比如重构现有的方法。
如果有人感兴趣,可以找到完整的代码here
感谢@Ladicek,有一种开箱即用的方法可以在 transform
块中处理异步 Uni<T>
return,解决方案是
public Uni<String> greeting(String name) {
log.info("\n\n");
log.info("\t`greeting(String name)` Executing on Thread {}",Thread.currentThread().getName());
return Uni
.createFrom()
.item(name) //synchronous now imagine you have retrieve a value from an I/O call you will have to pass a supplier, ref README.md#Links.1
.emitOn(emitExecutor)
.onItem()
.transformToUni(parameter -> ioSimulation(parameter,Thread.currentThread().getName()))
.onFailure()
.retry()
.atMost(2);
}
public Uni<String> ioSimulation(String param,String threadName){
log.debug("`ioSimulation(String param)` Executing on Thread {}",Thread.currentThread().getName());
assert Thread.currentThread().getName().equals(threadName);
return MockServer.client
.getAbs("http://localhost:80")
.addQueryParam("name",param)
.send()
.onItem().transform(response-> {
if (response.statusCode() == 200){
return response.bodyAsString();
}else{
throw new IllegalStateException(response.bodyAsString());
}
});
}
transformToUni
异步转换接收到的项目并 return 生成一个新的 Uni<T>
更新:
另一种方法(我正在寻找的方法)是使用回调。
public Uni<String> callbackGreeting(String name) {
log.info("\n\n");
log.info("\t`callbackGreeting(String name)` Executing on Thread {}", Thread.currentThread().getName());
return Uni
.createFrom()
.item(name) //synchronous now imagine you have retrieve a value from an I/O call you will have to pass a supplier, ref README.md#Links.1
.emitOn(emitExecutor)
.onItem()
.transformToUni(param -> {
return Uni.createFrom().emitter(em -> {
ioSimulation(param, Thread.currentThread().getName())
.subscribe()
.with(success -> em.complete(success.bodyAsString()), //putting result of uni ioSimulation into the callback
exception -> {
log.info("the following error occurred before sending to em.complete {}", exception.getMessage());
em.fail(exception);
});
});
})
.onFailure()
.retry()
.atMost(2)
.map(item -> "Operation completed with item " + item);
}
此示例仅用于演示目的,我知道这不是最佳解决方案。完整代码为here
给个上下文,我在我的class中有两个方法都是returns Uni,其中第一个方法以如下方式依赖于第二个方法
public Uni<String> greeting(String name) {
log.info("\n\n");
log.info("\t`greeting(String name)` Executing on Thread {}",Thread.currentThread().getName());
return Uni
.createFrom()
.item(name) //synchronous now imagine you have retrieve a value from an I/O call you will have to pass a supplier, ref README.md#Links.1
.emitOn(emitExecutor)
.onItem()
.transform(parameter-> {
log.debug("`(p)>Transform` invoked on Thread {}",Thread.currentThread().getName());
assert Thread.currentThread().getName().equals(threadName);
try {
return ioSimulation(parameter,Thread.currentThread().getName()).subscribeAsCompletionStage().get();
} catch (InterruptedException | ExecutionException e) {
log.error("failed to execute ioSimulation due to {}",e.getMessage());
throw new RuntimeException("failed to communicate with client {}"+e.getMessage());
}
}).onFailure()
.retry()
.atMost(2);
}
public Uni<String> ioSimulation(String param,String threadName){
log.debug("`ioSimulation(String param)` Executing on Thread {}",Thread.currentThread().getName());
assert Thread.currentThread().getName().equals(threadName);
return MockServer.client
.getAbs("http://localhost:80")
.addQueryParam("name",param)
.send()
.onItem().transform(response-> {
if (response.statusCode() == 200){
return response.bodyAsString();
}else{
throw new IllegalStateException(response.bodyAsString());
}
});
现在在 greeting(String name)
(第一个)方法中 return 我必须使用 subscribeAsCompletionStage().get()
的字符串值,否则 return 类型将是 Uni<Uni<String>>
.
我的问题如下
- 是一种方法,我可以在不使用
subscribeAsCompletionStage().get()
的情况下 return 第二种方法ioSimulation
的实际结果(包裹在 Uni 中的精确值)如果是,那我怎么能这样做吗? - 这样做有什么更好的办法吗?比如重构现有的方法。
如果有人感兴趣,可以找到完整的代码here
感谢@Ladicek,有一种开箱即用的方法可以在 transform
块中处理异步 Uni<T>
return,解决方案是
public Uni<String> greeting(String name) {
log.info("\n\n");
log.info("\t`greeting(String name)` Executing on Thread {}",Thread.currentThread().getName());
return Uni
.createFrom()
.item(name) //synchronous now imagine you have retrieve a value from an I/O call you will have to pass a supplier, ref README.md#Links.1
.emitOn(emitExecutor)
.onItem()
.transformToUni(parameter -> ioSimulation(parameter,Thread.currentThread().getName()))
.onFailure()
.retry()
.atMost(2);
}
public Uni<String> ioSimulation(String param,String threadName){
log.debug("`ioSimulation(String param)` Executing on Thread {}",Thread.currentThread().getName());
assert Thread.currentThread().getName().equals(threadName);
return MockServer.client
.getAbs("http://localhost:80")
.addQueryParam("name",param)
.send()
.onItem().transform(response-> {
if (response.statusCode() == 200){
return response.bodyAsString();
}else{
throw new IllegalStateException(response.bodyAsString());
}
});
}
transformToUni
异步转换接收到的项目并 return 生成一个新的 Uni<T>
更新:
另一种方法(我正在寻找的方法)是使用回调。
public Uni<String> callbackGreeting(String name) {
log.info("\n\n");
log.info("\t`callbackGreeting(String name)` Executing on Thread {}", Thread.currentThread().getName());
return Uni
.createFrom()
.item(name) //synchronous now imagine you have retrieve a value from an I/O call you will have to pass a supplier, ref README.md#Links.1
.emitOn(emitExecutor)
.onItem()
.transformToUni(param -> {
return Uni.createFrom().emitter(em -> {
ioSimulation(param, Thread.currentThread().getName())
.subscribe()
.with(success -> em.complete(success.bodyAsString()), //putting result of uni ioSimulation into the callback
exception -> {
log.info("the following error occurred before sending to em.complete {}", exception.getMessage());
em.fail(exception);
});
});
})
.onFailure()
.retry()
.atMost(2)
.map(item -> "Operation completed with item " + item);
}
此示例仅用于演示目的,我知道这不是最佳解决方案。完整代码为here