Spring Webflux SSE 服务器:如何向客户端发送错误并关闭连接
Spring Webflux SSE server: how to send error to the client and close connection
我有一个 Spring WebFlux 应用程序服务器,可以在最终用户连接时无限地向他们发送 SSE。我想实现某种 id
验证,它依赖于从第 3 方服务请求 ID,即 returns Mono<Boolean>
。如果上面的 Mono
包含 false
.
,我想关闭错误连接
问题是我无法从 handleSse
方法 return Mono.error()
(因为它必须 return Flux<ServerSentEvent<UserUpdateResponse>>
)。
如何正确地向用户发送错误并在之后关闭连接?
这是我的代码:
@GetMapping("/sse")
public Flux<ServerSentEvent<UserUpdateResponse>> handleSse(String id) {
return usersSink.asFlux()
.filter(update -> id.equals(update.getId()))
.map(this::wrapIntoSse);
}
private ServerSentEvent<UserUpdateResponse> wrapIntoSse(UserUpdate userUpdate) {
return ServerSentEvent.builder(new UserResponse(userUpdate.getUserCode()))
.event("user-update")
.build();
}
所以在为此苦苦挣扎了一段时间之后,我得出了这个解决方案:
@GetMapping("/sse")
public Flux<ServerSentEvent<UserUpdateResponse>> handleSse(@RequestParam(required = false) String id) {
if (StringUtils.isBlank(id)) {
throw new WebServiceRuntimeException(HttpStatus.BAD_REQUEST, "The \"id\" parameter is missing");
}
final Mono<Boolean> cached = cacheService.getgetUser(id);
return Flux.from(cached).flatMap(exists -> {
if (exists) {
return userUpdateSink.asFlux()
.filter(update -> id.equals(update.getgetUser()))
.map(this::wrapIntoSse);
}
return Flux.error(new WebServiceRuntimeException(HttpStatus.NOT_FOUND,
"The specified \"user\" does not exist: " + id + "."));
}
);
}
@GetMapping("/sse")
public Flux<ServerSentEvent> handleSse(@RequestParam(required = false) String id) {
if (StringUtils.isBlank(id)) {
throw new WebServiceRuntimeException(HttpStatus.BAD_REQUEST, "The "id" parameter is missing");
}
final Mono cached = cacheService.getgetUser(id);
return Flux.from(cached).flatMap(exists -> {
if (exists) {
return userUpdateSink.asFlux().filter(update -> id.equals(update.getgetUser())).map(this::wrapIntoSse);
}
return Flux.error(new WebServiceRuntimeException(HttpStatus.NOT_FOUND, "The specified "user" does not exist: " + id + "."));
});
}
我有一个 Spring WebFlux 应用程序服务器,可以在最终用户连接时无限地向他们发送 SSE。我想实现某种 id
验证,它依赖于从第 3 方服务请求 ID,即 returns Mono<Boolean>
。如果上面的 Mono
包含 false
.
问题是我无法从 handleSse
方法 return Mono.error()
(因为它必须 return Flux<ServerSentEvent<UserUpdateResponse>>
)。
如何正确地向用户发送错误并在之后关闭连接?
这是我的代码:
@GetMapping("/sse")
public Flux<ServerSentEvent<UserUpdateResponse>> handleSse(String id) {
return usersSink.asFlux()
.filter(update -> id.equals(update.getId()))
.map(this::wrapIntoSse);
}
private ServerSentEvent<UserUpdateResponse> wrapIntoSse(UserUpdate userUpdate) {
return ServerSentEvent.builder(new UserResponse(userUpdate.getUserCode()))
.event("user-update")
.build();
}
所以在为此苦苦挣扎了一段时间之后,我得出了这个解决方案:
@GetMapping("/sse")
public Flux<ServerSentEvent<UserUpdateResponse>> handleSse(@RequestParam(required = false) String id) {
if (StringUtils.isBlank(id)) {
throw new WebServiceRuntimeException(HttpStatus.BAD_REQUEST, "The \"id\" parameter is missing");
}
final Mono<Boolean> cached = cacheService.getgetUser(id);
return Flux.from(cached).flatMap(exists -> {
if (exists) {
return userUpdateSink.asFlux()
.filter(update -> id.equals(update.getgetUser()))
.map(this::wrapIntoSse);
}
return Flux.error(new WebServiceRuntimeException(HttpStatus.NOT_FOUND,
"The specified \"user\" does not exist: " + id + "."));
}
);
}
@GetMapping("/sse")
public Flux<ServerSentEvent> handleSse(@RequestParam(required = false) String id) {
if (StringUtils.isBlank(id)) {
throw new WebServiceRuntimeException(HttpStatus.BAD_REQUEST, "The "id" parameter is missing");
}
final Mono cached = cacheService.getgetUser(id);
return Flux.from(cached).flatMap(exists -> {
if (exists) {
return userUpdateSink.asFlux().filter(update -> id.equals(update.getgetUser())).map(this::wrapIntoSse);
}
return Flux.error(new WebServiceRuntimeException(HttpStatus.NOT_FOUND, "The specified "user" does not exist: " + id + "."));
});
}