如何在地图中 return 带有长 运行 任务的 Mono?

How to return a Mono with a long running task inside map?

我想在 Mono 的 map 函数中转换数据:

long result = 0.0;

return Mono.just(result).map(value -> {
    // do some long running transformation here
    // and assign it to result (maybe 5 seconds task)
    // in our case a request:

    Mono<Result> resultObject = service.getResult();

    resultObject.subscribe(new Subscriber<Result>() {
        @Override
        public void onSubscribe(Subscription s) {
            System.out.println("subscribe: " + System.currentTimeMillis());

            s.request(1);
        }

        @Override
        public void onNext(Result result) {
            System.out.println("on next: " + System.currentTimeMillis());

            value = result.getValue(); // this is not 0.0
        }

        @Override
        public void onError(Throwable t) {
            System.out.println("error " + t);
        }

        @Override
        public void onComplete() {
             System.out.println("complete");
        }
    });

    return value;
});

如果我调用它,我总是得到 0.0 作为结果,所以它在 map 函数完成之前返回。对我来说,这没有多大意义。在返回之前我还应该如何转换我的结果?

编辑

我可以执行以下操作,但我认为这不是最佳解决方案:

final CountDownLatch latch = new CountDownLatch(1);
long result = 0.0;

return Mono.just(result).map(value -> {
    // do some long running transformation here
    // and assign it to result (maybe 5 seconds task)
    // in our case a request:

    Mono<Result> resultObject = service.getResult();

    resultObject.subscribe(new Subscriber<Result>() {
        @Override
        public void onSubscribe(Subscription s) {
            System.out.println("subscribe: " + System.currentTimeMillis());

            s.request(1);
        }

        @Override
        public void onNext(Result result) {
            System.out.println("on next: " + System.currentTimeMillis());

            value = result.getValue(); // this is not 0.0

            latch.countDown();
        }

        @Override
        public void onError(Throwable t) {
            System.out.println("error " + t);
        }

        @Override
        public void onComplete() {
             System.out.println("complete");
        }
    });

    try {
        latch.await();

        return value;
    } catch(InterruptedException e) {
        e.printStackTrace();

        return -1.0;
    }
});

这听起来和 flatMap 的用途完全一样:如果您的长 运行 任务是异步的并且可以表示为 Publisher<T> 那么它可以被 [=13 异步触发=].

请注意 Mono#flatMap(Function)3.0.x 中被称为 Mono#then(Function)

所以在 3.0.7 中:

Mono.just(param).then(p -> service.getResult(p));

在 3.1.0.M3 中:

Mono.just(param).flatMap(p -> service.getResult(p));

请注意,如果您不使用该值(服务没有参数),那么您可能可以简单地提供延续 Mono,使用 then(Mono)(这在两者中都有效3.0.x 和 3.1.x):

Mono.just(paramThatGetsIgnored).then(service.getResult());

(但在那种情况下,Mono.just(...) 的起点并不是非常相关)