如何在地图中 return 带有长 运行 任务的 Mono?
How to return a Mono with a long running task inside map?
我想在 Mono 的 map 函数中转换数据:
long result = 0.0;
return Mono.just(result).map(value -> {
// do some long running transformation here
// and assign it to result (maybe 5 seconds task)
// in our case a request:
Mono<Result> resultObject = service.getResult();
resultObject.subscribe(new Subscriber<Result>() {
@Override
public void onSubscribe(Subscription s) {
System.out.println("subscribe: " + System.currentTimeMillis());
s.request(1);
}
@Override
public void onNext(Result result) {
System.out.println("on next: " + System.currentTimeMillis());
value = result.getValue(); // this is not 0.0
}
@Override
public void onError(Throwable t) {
System.out.println("error " + t);
}
@Override
public void onComplete() {
System.out.println("complete");
}
});
return value;
});
如果我调用它,我总是得到 0.0
作为结果,所以它在 map 函数完成之前返回。对我来说,这没有多大意义。在返回之前我还应该如何转换我的结果?
编辑
我可以执行以下操作,但我认为这不是最佳解决方案:
final CountDownLatch latch = new CountDownLatch(1);
long result = 0.0;
return Mono.just(result).map(value -> {
// do some long running transformation here
// and assign it to result (maybe 5 seconds task)
// in our case a request:
Mono<Result> resultObject = service.getResult();
resultObject.subscribe(new Subscriber<Result>() {
@Override
public void onSubscribe(Subscription s) {
System.out.println("subscribe: " + System.currentTimeMillis());
s.request(1);
}
@Override
public void onNext(Result result) {
System.out.println("on next: " + System.currentTimeMillis());
value = result.getValue(); // this is not 0.0
latch.countDown();
}
@Override
public void onError(Throwable t) {
System.out.println("error " + t);
}
@Override
public void onComplete() {
System.out.println("complete");
}
});
try {
latch.await();
return value;
} catch(InterruptedException e) {
e.printStackTrace();
return -1.0;
}
});
这听起来和 flatMap
的用途完全一样:如果您的长 运行 任务是异步的并且可以表示为 Publisher<T>
那么它可以被 [=13 异步触发=].
请注意 Mono#flatMap(Function)
在 3.0.x
中被称为 Mono#then(Function)
。
所以在 3.0.7 中:
Mono.just(param).then(p -> service.getResult(p));
在 3.1.0.M3 中:
Mono.just(param).flatMap(p -> service.getResult(p));
请注意,如果您不使用该值(服务没有参数),那么您可能可以简单地提供延续 Mono
,使用 then(Mono)
(这在两者中都有效3.0.x 和 3.1.x):
Mono.just(paramThatGetsIgnored).then(service.getResult());
(但在那种情况下,Mono.just(...)
的起点并不是非常相关)
我想在 Mono 的 map 函数中转换数据:
long result = 0.0;
return Mono.just(result).map(value -> {
// do some long running transformation here
// and assign it to result (maybe 5 seconds task)
// in our case a request:
Mono<Result> resultObject = service.getResult();
resultObject.subscribe(new Subscriber<Result>() {
@Override
public void onSubscribe(Subscription s) {
System.out.println("subscribe: " + System.currentTimeMillis());
s.request(1);
}
@Override
public void onNext(Result result) {
System.out.println("on next: " + System.currentTimeMillis());
value = result.getValue(); // this is not 0.0
}
@Override
public void onError(Throwable t) {
System.out.println("error " + t);
}
@Override
public void onComplete() {
System.out.println("complete");
}
});
return value;
});
如果我调用它,我总是得到 0.0
作为结果,所以它在 map 函数完成之前返回。对我来说,这没有多大意义。在返回之前我还应该如何转换我的结果?
编辑
我可以执行以下操作,但我认为这不是最佳解决方案:
final CountDownLatch latch = new CountDownLatch(1);
long result = 0.0;
return Mono.just(result).map(value -> {
// do some long running transformation here
// and assign it to result (maybe 5 seconds task)
// in our case a request:
Mono<Result> resultObject = service.getResult();
resultObject.subscribe(new Subscriber<Result>() {
@Override
public void onSubscribe(Subscription s) {
System.out.println("subscribe: " + System.currentTimeMillis());
s.request(1);
}
@Override
public void onNext(Result result) {
System.out.println("on next: " + System.currentTimeMillis());
value = result.getValue(); // this is not 0.0
latch.countDown();
}
@Override
public void onError(Throwable t) {
System.out.println("error " + t);
}
@Override
public void onComplete() {
System.out.println("complete");
}
});
try {
latch.await();
return value;
} catch(InterruptedException e) {
e.printStackTrace();
return -1.0;
}
});
这听起来和 flatMap
的用途完全一样:如果您的长 运行 任务是异步的并且可以表示为 Publisher<T>
那么它可以被 [=13 异步触发=].
请注意 Mono#flatMap(Function)
在 3.0.x
中被称为 Mono#then(Function)
。
所以在 3.0.7 中:
Mono.just(param).then(p -> service.getResult(p));
在 3.1.0.M3 中:
Mono.just(param).flatMap(p -> service.getResult(p));
请注意,如果您不使用该值(服务没有参数),那么您可能可以简单地提供延续 Mono
,使用 then(Mono)
(这在两者中都有效3.0.x 和 3.1.x):
Mono.just(paramThatGetsIgnored).then(service.getResult());
(但在那种情况下,Mono.just(...)
的起点并不是非常相关)