如何 return Mono.subscribe() 的 Mono<ServerResponse>(作为副作用?)?
How to return a Mono<ServerResponse> (as a side effect?) of Mono.subscribe()?
我有以下代码,"works"...到目前为止。 "works",我的意思是 Flux<DemoPOJO>
被 service.getAll()
编辑 return ,并且“hasElements().subscribe(this::foo)
”导致 foo()
生成正确反映 Flux<DemoPOJO>
有任何元素。
期望的最终状态是 return 一个 ServerResponse
对象,包装 Flux<DemoPOJO>
, 反映returned Flux 为空还是"hasElements".
我的问题是 Mono.subscribe()
return 有一个 reactor.core.Disposable
,我想以某种方式到达 Mono<ServerResponse>
。或者,我是"barking up the wrong tree"?
添加注意:我已经看到一些使用 Flux.flatMap()
的例子,但是如果 returned Flux 有很多元素(即,检查 hasElements()
似乎比潜在地平面映射所有元素要好得多。
@Component
public class DemoPOJOHandler {
public static final String PATH_VAR_ID = "id";
@Autowired
private DemoPOJOService service;
public Mono<ServerResponse> getAll(ServerRequest request) {
Mono<ServerResponse> response = null;
Flux<DemoPOJO> entities = service.getAll();
entities.hasElements().subscribe(this::foo);
// just return something, for now
return ServerResponse.ok().build();
}
private Mono<ServerRequest> foo(Boolean hasElements) {
System.out.println("DEBUG >> Mono has elements -> " + hasElements);
return Mono.empty();
}
}
这是 DemoPOJOService 实现...
@Component
public class DemoPOJOService {
@Autowired
private DemoPOJORepo demoPOJORepo;
public Flux<DemoPOJO> getAll() {
return Flux.fromArray(demoPOJORepo.getAll());
}
// more implementation, omitted for brevity
}
而且,这是 DemoPOJORepo 实现...
@Component
public class DemoPOJORepo {
private static final int NUM_OBJS =20;
private static DemoPOJORepo demoRepo = null;
private Map<Integer, DemoPOJO> demoPOJOMap;
private DemoPOJORepo() {
initMap();
}
public static DemoPOJORepo getInstance() {
if (demoRepo == null) {
demoRepo = new DemoPOJORepo();
}
return demoRepo;
}
public DemoPOJO[] getAll() {
return demoPOJOMap.values().toArray(new DemoPOJO[demoPOJOMap.size()]);
}
// more implementation, omitted for brevity
private void initMap() {
demoPOJOMap = new TreeMap<Integer, DemoPOJO>();
for(int ndx=1; ndx<( NUM_OBJS + 1 ); ndx++) {
demoPOJOMap.put(ndx, new DemoPOJO(ndx, "foo_" + ndx, ndx+100));
}
}
}
首先,您不负责在控制器中订阅flux。从控制器中获取数据并 returning 它只是整个管道的一小部分。它基本上意味着您只需要提供业务逻辑,而框架会向您的数据添加其他转换。例如,它序列化响应,然后在完成后订阅它。
订阅业务代码中的流量意味着您启动另一个管道,它可能完全独立于从控制器return编辑的数据,但它恰好在那里被订阅。如果您拥有相同的 Flux 并在其他地方订阅它,结果将完全相同。
总结:你需要取 entities.hasElements()
的 return 值(即 Mono<Boolean>
)并将 bool 包装到响应中:
public Mono<ServerResponse> getAll(ServerRequest request) {
Mono<ServerResponse> response = null;
Flux<DemoPOJO> entities = service.getAll();
return entities.hasElements()
.flatMap(this::foo);
}
private Mono<ServerResponse> foo(Boolean hasElements) {
System.out.println("DEBUG >> Mono has elements -> " + hasElements);
return ServerResponse.ok().syncBody(hasElements);
}
所以,我真的很想 down-vote 之前的答案是 "non-responsive"(因为我没有订阅 Flux<DemoPOJO>
“entities”,但对于由 'entities.hasElements()' 产生的 Mono。并且,提供的 "solution" 没有解决我对能够 return 基于 having/not-having 内容 return 的适当 ServerResponse 的担忧。
然而,它确实让我相信 "right track" 可以解决我原来的问题,所以...
我修改后的DemoPOJOHandler如下。它似乎正确 return 一个 ServerResponse.ok() 包裹 'Flux' return 由 service.getAll() 编辑,或者 ServerResponse.noContent() 如果 Flux 是空的。
虽然这个 "works" 看起来比我以前拥有的要好得多,但非常感谢任何改进、评论或建议,因为我仍在努力研究 Reactor。
@Component
public class DemoPOJOHandler {
public static final String PATH_VAR_ID = "id";
@Autowired
private DemoPOJOService service;
public Mono<ServerResponse> getAll(ServerRequest request) {
Flux<DemoPOJO> entities = service.getAll();
return entities.hasElements().flatMap(hasElement -> {
return hasElement ? ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(entities, DemoPOJO.class)
: ServerResponse.noContent().build();
});
}
}
@SoCal 你的答案似乎可行,但它有一个缺点:getAll()
数据库调用进行了两次。
困难在于,您只能在开始接收数据后才能决定状态码。
但是由于您似乎并不真正需要正文的异步特性(您不是在流式传输各个元素而是生成一次性 JSON 响应),在这种情况下您可以收集整个结果集并将其映射到响应。
因此调用 DB,收集 Mono<List>
中的元素,map
A) 如果列表为空则返回 404 空响应或 B) 返回 200 成功 JSON否则响应(注意 syncBody
的使用):
@Component
public class DemoPOJOHandler {
public static final String PATH_VAR_ID = "id";
@Autowired
private DemoPOJOService service;
public Mono<ServerResponse> getAll(ServerRequest request) {
Flux<DemoPOJO> entities = service.getAll();
Mono<List<DemoPOJO>> collected = entities.collectList();
return collected.map(list -> list.isEmpty() ?
ServerResponse.noContent().build() :
ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.syncBody(list)
);
}
}
旁注:我认为 ResponseEntity
是带注释的控制器的首选类型,而不是 ServerResponse
,参见 https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#webflux-ann-responseentity。
我有以下代码,"works"...到目前为止。 "works",我的意思是 Flux<DemoPOJO>
被 service.getAll()
编辑 return ,并且“hasElements().subscribe(this::foo)
”导致 foo()
生成正确反映 Flux<DemoPOJO>
有任何元素。
期望的最终状态是 return 一个 ServerResponse
对象,包装 Flux<DemoPOJO>
, 反映returned Flux 为空还是"hasElements".
我的问题是 Mono.subscribe()
return 有一个 reactor.core.Disposable
,我想以某种方式到达 Mono<ServerResponse>
。或者,我是"barking up the wrong tree"?
添加注意:我已经看到一些使用 Flux.flatMap()
的例子,但是如果 returned Flux 有很多元素(即,检查 hasElements()
似乎比潜在地平面映射所有元素要好得多。
@Component
public class DemoPOJOHandler {
public static final String PATH_VAR_ID = "id";
@Autowired
private DemoPOJOService service;
public Mono<ServerResponse> getAll(ServerRequest request) {
Mono<ServerResponse> response = null;
Flux<DemoPOJO> entities = service.getAll();
entities.hasElements().subscribe(this::foo);
// just return something, for now
return ServerResponse.ok().build();
}
private Mono<ServerRequest> foo(Boolean hasElements) {
System.out.println("DEBUG >> Mono has elements -> " + hasElements);
return Mono.empty();
}
}
这是 DemoPOJOService 实现...
@Component
public class DemoPOJOService {
@Autowired
private DemoPOJORepo demoPOJORepo;
public Flux<DemoPOJO> getAll() {
return Flux.fromArray(demoPOJORepo.getAll());
}
// more implementation, omitted for brevity
}
而且,这是 DemoPOJORepo 实现...
@Component
public class DemoPOJORepo {
private static final int NUM_OBJS =20;
private static DemoPOJORepo demoRepo = null;
private Map<Integer, DemoPOJO> demoPOJOMap;
private DemoPOJORepo() {
initMap();
}
public static DemoPOJORepo getInstance() {
if (demoRepo == null) {
demoRepo = new DemoPOJORepo();
}
return demoRepo;
}
public DemoPOJO[] getAll() {
return demoPOJOMap.values().toArray(new DemoPOJO[demoPOJOMap.size()]);
}
// more implementation, omitted for brevity
private void initMap() {
demoPOJOMap = new TreeMap<Integer, DemoPOJO>();
for(int ndx=1; ndx<( NUM_OBJS + 1 ); ndx++) {
demoPOJOMap.put(ndx, new DemoPOJO(ndx, "foo_" + ndx, ndx+100));
}
}
}
首先,您不负责在控制器中订阅flux。从控制器中获取数据并 returning 它只是整个管道的一小部分。它基本上意味着您只需要提供业务逻辑,而框架会向您的数据添加其他转换。例如,它序列化响应,然后在完成后订阅它。
订阅业务代码中的流量意味着您启动另一个管道,它可能完全独立于从控制器return编辑的数据,但它恰好在那里被订阅。如果您拥有相同的 Flux 并在其他地方订阅它,结果将完全相同。
总结:你需要取 entities.hasElements()
的 return 值(即 Mono<Boolean>
)并将 bool 包装到响应中:
public Mono<ServerResponse> getAll(ServerRequest request) {
Mono<ServerResponse> response = null;
Flux<DemoPOJO> entities = service.getAll();
return entities.hasElements()
.flatMap(this::foo);
}
private Mono<ServerResponse> foo(Boolean hasElements) {
System.out.println("DEBUG >> Mono has elements -> " + hasElements);
return ServerResponse.ok().syncBody(hasElements);
}
所以,我真的很想 down-vote 之前的答案是 "non-responsive"(因为我没有订阅 Flux<DemoPOJO>
“entities”,但对于由 'entities.hasElements()' 产生的 Mono。并且,提供的 "solution" 没有解决我对能够 return 基于 having/not-having 内容 return 的适当 ServerResponse 的担忧。
然而,它确实让我相信 "right track" 可以解决我原来的问题,所以...
我修改后的DemoPOJOHandler如下。它似乎正确 return 一个 ServerResponse.ok() 包裹 'Flux' return 由 service.getAll() 编辑,或者 ServerResponse.noContent() 如果 Flux 是空的。
虽然这个 "works" 看起来比我以前拥有的要好得多,但非常感谢任何改进、评论或建议,因为我仍在努力研究 Reactor。
@Component
public class DemoPOJOHandler {
public static final String PATH_VAR_ID = "id";
@Autowired
private DemoPOJOService service;
public Mono<ServerResponse> getAll(ServerRequest request) {
Flux<DemoPOJO> entities = service.getAll();
return entities.hasElements().flatMap(hasElement -> {
return hasElement ? ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(entities, DemoPOJO.class)
: ServerResponse.noContent().build();
});
}
}
@SoCal 你的答案似乎可行,但它有一个缺点:getAll()
数据库调用进行了两次。
困难在于,您只能在开始接收数据后才能决定状态码。
但是由于您似乎并不真正需要正文的异步特性(您不是在流式传输各个元素而是生成一次性 JSON 响应),在这种情况下您可以收集整个结果集并将其映射到响应。
因此调用 DB,收集 Mono<List>
中的元素,map
A) 如果列表为空则返回 404 空响应或 B) 返回 200 成功 JSON否则响应(注意 syncBody
的使用):
@Component
public class DemoPOJOHandler {
public static final String PATH_VAR_ID = "id";
@Autowired
private DemoPOJOService service;
public Mono<ServerResponse> getAll(ServerRequest request) {
Flux<DemoPOJO> entities = service.getAll();
Mono<List<DemoPOJO>> collected = entities.collectList();
return collected.map(list -> list.isEmpty() ?
ServerResponse.noContent().build() :
ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.syncBody(list)
);
}
}
旁注:我认为 ResponseEntity
是带注释的控制器的首选类型,而不是 ServerResponse
,参见 https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#webflux-ann-responseentity。