如何 return Mono.subscribe() 的 Mono<ServerResponse>(作为副作用?)?

How to return a Mono<ServerResponse> (as a side effect?) of Mono.subscribe()?

我有以下代码,"works"...到目前为止。 "works",我的意思是 Flux<DemoPOJO>service.getAll() 编辑 return ,并且“hasElements().subscribe(this::foo)”导致 foo() 生成正确反映 Flux<DemoPOJO>有任何元素。

期望的最终状态是 return 一个 ServerResponse 对象,包装 Flux<DemoPOJO>, 反映returned Flux 为空还是"hasElements".

我的问题是 Mono.subscribe() return 有一个 reactor.core.Disposable,我想以某种方式到达 Mono<ServerResponse>。或者,我是"barking up the wrong tree"?

添加注意:我已经看到一些使用 Flux.flatMap() 的例子,但是如果 returned Flux 有很多元素(即,检查 hasElements() 似乎比潜在地平面映射所有元素要好得多。

@Component
public class DemoPOJOHandler {

    public static final String PATH_VAR_ID = "id";

    @Autowired
    private DemoPOJOService service;

    public Mono<ServerResponse> getAll(ServerRequest request) {
        Mono<ServerResponse> response = null;
        Flux<DemoPOJO>       entities = service.getAll();

        entities.hasElements().subscribe(this::foo);
        // just return something, for now
        return ServerResponse.ok().build();
    }

    private Mono<ServerRequest> foo(Boolean hasElements) {
        System.out.println("DEBUG >> Mono has elements -> " + hasElements);
        return Mono.empty();
    }
}

这是 DemoPOJOService 实现...

@Component
public class DemoPOJOService {

    @Autowired
    private DemoPOJORepo demoPOJORepo;

    public Flux<DemoPOJO> getAll() {
        return Flux.fromArray(demoPOJORepo.getAll());
    }

    // more implementation, omitted for brevity
}

而且,这是 DemoPOJORepo 实现...

@Component
public class DemoPOJORepo {

    private static final int NUM_OBJS =20;

    private static DemoPOJORepo demoRepo = null;

    private Map<Integer, DemoPOJO> demoPOJOMap;

    private DemoPOJORepo() {
        initMap();
    }

    public static DemoPOJORepo getInstance() {
        if (demoRepo == null) {
            demoRepo = new DemoPOJORepo();
        }
        return demoRepo;
    }

    public DemoPOJO[] getAll() {
        return demoPOJOMap.values().toArray(new DemoPOJO[demoPOJOMap.size()]);
    }

    // more implementation, omitted for brevity

    private void initMap() {
        demoPOJOMap = new TreeMap<Integer, DemoPOJO>();

        for(int ndx=1; ndx<( NUM_OBJS + 1 ); ndx++) {
            demoPOJOMap.put(ndx, new DemoPOJO(ndx, "foo_" + ndx, ndx+100));
        }
    }
}

首先,您不负责在控制器中订阅flux。从控制器中获取数据并 returning 它只是整个管道的一小部分。它基本上意味着您只需要提供业务逻辑,而框架会向您的数据添加其他转换。例如,它序列化响应,然后在完成后订阅它。

订阅业务代码中的流量意味着您启动另一个管道,它可能完全独立于从控制器return编辑的数据,但它恰好在那里被订阅。如果您拥有相同的 Flux 并在其他地方订阅它,结果将完全相同。

总结:你需要取 entities.hasElements() 的 return 值(即 Mono<Boolean>)并将 bool 包装到响应中:

 public Mono<ServerResponse> getAll(ServerRequest request) {
        Mono<ServerResponse> response = null;
        Flux<DemoPOJO>       entities = service.getAll();

        return entities.hasElements()
           .flatMap(this::foo);

    }

    private Mono<ServerResponse> foo(Boolean hasElements) {
        System.out.println("DEBUG >> Mono has elements -> " + hasElements);
        return ServerResponse.ok().syncBody(hasElements);
    }

所以,我真的很想 down-vote 之前的答案是 "non-responsive"(因为我没有订阅 Flux<DemoPOJO>entities”,但对于由 'entities.hasElements()' 产生的 Mono。并且,提供的 "solution" 没有解决我对能够 return 基于 having/not-having 内容 return 的适当 ServerResponse 的担忧。

然而,它确实让我相信 "right track" 可以解决我原来的问题,所以...

我修改后的DemoPOJOHandler如下。它似乎正确 return 一个 ServerResponse.ok() 包裹 'Flux' return 由 service.getAll() 编辑,或者 ServerResponse.noContent() 如果 Flux 是空的。

虽然这个 "works" 看起来比我以前拥有的要好得多,但非常感谢任何改进、评论或建议,因为我仍在努力研究 Reactor。

@Component
public class DemoPOJOHandler {

    public static final String PATH_VAR_ID = "id";

    @Autowired
    private DemoPOJOService service;

    public Mono<ServerResponse> getAll(ServerRequest request) {
        Flux<DemoPOJO> entities = service.getAll();

        return entities.hasElements().flatMap(hasElement -> {
            return hasElement ? ServerResponse.ok()
                                              .contentType(MediaType.APPLICATION_JSON)
                                              .body(entities, DemoPOJO.class)
                              : ServerResponse.noContent().build();
            });
    }
}

@SoCal 你的答案似乎可行,但它有一个缺点:getAll() 数据库调用进行了两次。

困难在于,您只能在开始接收数据后才能决定状态码。

但是由于您似乎并不真正需要正文的异步特性(您不是在流式传输各个元素而是生成一次性 JSON 响应),在这种情况下您可以收集整个结果集并将其映射到响应。

因此调用 DB,收集 Mono<List> 中的元素,map A) 如果列表为空则返回 404 空响应或 B) 返回 200 成功 JSON否则响应(注意 syncBody 的使用):

@Component
public class DemoPOJOHandler {

    public static final String PATH_VAR_ID = "id";

    @Autowired
    private DemoPOJOService service;

    public Mono<ServerResponse> getAll(ServerRequest request) {
        Flux<DemoPOJO> entities = service.getAll();
        Mono<List<DemoPOJO>> collected = entities.collectList();

        return collected.map(list -> list.isEmpty() ? 
            ServerResponse.noContent().build() :
            ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .syncBody(list)
        );
    }
}

旁注:我认为 ResponseEntity 是带注释的控制器的首选类型,而不是 ServerResponse,参见 https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#webflux-ann-responseentity