block()/blockFirst()/blockLast() 在调用 bodyToMono AFTER exchange() 时出现阻塞错误
block()/blockFirst()/blockLast() are blocking error when calling bodyToMono AFTER exchange()
我正在尝试使用Webflux将生成的文件流式传输到另一个位置,但是,如果生成文件运行出错,api returns成功,但是在生成文件而不是文件本身时使用 DTO 详细说明错误。这是使用非常老旧且设计不佳的 api,因此请原谅使用 post 和 api 设计。
来自 api 调用 (exchange()) 的响应是 ClientResponse。从这里我可以使用 bodyToMono 转换为 ByteArrayResource,它可以流式传输到文件,或者,如果在创建文件时出错,那么我也可以使用 bodyToMono 转换为 DTO。但是,我似乎不能做或取决于 ClientResponse.header 的内容。
在 运行 时间我得到了一个由
引起的 IllegalStateException
block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-client-epoll-12
我认为我的问题是我不能在同一个函数链中调用 block() 两次。
我的代码片段是这样的:
webClient.post()
.uri(uriBuilder -> uriBuilder.path("/file/")
.queryParams(params).build())
.exchange()
.doOnSuccess(cr -> {
if (MediaType.APPLICATION_JSON_UTF8.equals(cr.headers().contentType().get())) {
NoPayloadResponseDto dto = cr.bodyToMono(NoPayloadResponseDto.class).block();
createErrorFile(dto);
}
else {
ByteArrayResource bAr = cr.bodyToMono(ByteArrayResource.class).block();
createSpreadsheet(bAr);
}
}
)
.block();
基本上我想根据 header 中定义的 MediaType 以不同方式处理 ClientResponse。
这可能吗?
首先,有几件事可以帮助您理解解决此用例的代码片段。
- 您永远不应该在 returns 反应类型的方法中调用阻塞方法;您将阻塞应用程序的几个线程之一,这对应用程序非常不利
- 无论如何从 Reactor 3.2 开始,blocking within a reactive pipeline throws an error
- 如评论中所建议的,调用
subscribe
也不是一个好主意。它或多或少类似于在单独的线程中将作业作为任务启动。完成后你会得到一个回调(subscribe
方法可以被赋予 lambda 表达式),但实际上你正在将当前管道与该任务解耦。在这种情况下,在您有机会读取完整的响应正文并将其写入文件之前,可以关闭客户端 HTTP 响应并清理资源
- 如果您不想在内存中缓冲整个响应,Spring 提供
DataBuffer
(想想可以合并的 ByteBuffer 实例)。
- 如果您正在实施的方法本身是阻塞的(例如返回
void
),例如在测试用例中,您可以调用 block。
这是您可以用来执行此操作的代码片段:
Mono<Void> fileWritten = WebClient.create().post()
.uri(uriBuilder -> uriBuilder.path("/file/").build())
.exchange()
.flatMap(response -> {
if (MediaType.APPLICATION_JSON_UTF8.equals(response.headers().contentType().get())) {
Mono<NoPayloadResponseDto> dto = response.bodyToMono(NoPayloadResponseDto.class);
return createErrorFile(dto);
}
else {
Flux<DataBuffer> body = response.bodyToFlux(DataBuffer.class);
return createSpreadsheet(body);
}
});
// Once you get that Mono, you should give plug it into an existing
// reactive pipeline, or call block on it, depending on the situation
如您所见,我们没有在任何地方阻塞,处理 I/O 的方法正在返回 Mono<Void>
,这是 done(error)
回调的反应式等价物,它在事情发生时发出信号已完成,如果发生错误。
因为我不确定 createErrorFile
方法应该做什么,所以我提供了 createSpreadsheet
的示例,它只是将正文字节写入文件。请注意,由于数据缓冲区可能 recycled/pooled,我们需要在完成后释放它们。
private Mono<Void> createSpreadsheet(Flux<DataBuffer> body) {
try {
Path file = //...
WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
return DataBufferUtils.write(body, channel).map(DataBufferUtils::release).then();
} catch (IOException exc) {
return Mono.error(exc);
}
}
通过此实现,您的应用程序将在给定时间在内存中保存几个 DataBuffer
实例(出于性能原因,反应式运算符会预取值)并且将以反应式方式写入字节。
RestResultMessage message= createWebClient()
.get()
.uri(uri)
.exchange()
.map(clientResponse -> {
//delegation
ClientResponseWrapper wrapper = new
ClientResponseWrapper(clientResponse);
return Mono.just(wrapper);
})
.block() //wait until request is not done
.map(result -> {
//convert to any data
if (!result.statusCode().isError()){
//extract the result from request
return create(RestResultMessage.Result.success, result.bodyToMono(String.class).block());}
} else {
return create(RestResultMessage.Result.error, result.statusCode().name());
}
})
.block();
[更新 2021/10/19]
toProcessor()
现已弃用。
考虑使用
myMono.toFuture().get();
正如投票最多的答案中所述,永远不应阻止。就我而言,这是唯一的选择,因为我们在一段命令式代码中使用反应式库。阻止可以通过 wrapping the mono in a processor:
myMono.toProcessor().block()
要在服务器请求池外执行客户端请求,请使用myWebClientMono.share().block();
我正在尝试使用Webflux将生成的文件流式传输到另一个位置,但是,如果生成文件运行出错,api returns成功,但是在生成文件而不是文件本身时使用 DTO 详细说明错误。这是使用非常老旧且设计不佳的 api,因此请原谅使用 post 和 api 设计。
来自 api 调用 (exchange()) 的响应是 ClientResponse。从这里我可以使用 bodyToMono 转换为 ByteArrayResource,它可以流式传输到文件,或者,如果在创建文件时出错,那么我也可以使用 bodyToMono 转换为 DTO。但是,我似乎不能做或取决于 ClientResponse.header 的内容。
在 运行 时间我得到了一个由
引起的 IllegalStateExceptionblock()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-client-epoll-12
我认为我的问题是我不能在同一个函数链中调用 block() 两次。
我的代码片段是这样的:
webClient.post()
.uri(uriBuilder -> uriBuilder.path("/file/")
.queryParams(params).build())
.exchange()
.doOnSuccess(cr -> {
if (MediaType.APPLICATION_JSON_UTF8.equals(cr.headers().contentType().get())) {
NoPayloadResponseDto dto = cr.bodyToMono(NoPayloadResponseDto.class).block();
createErrorFile(dto);
}
else {
ByteArrayResource bAr = cr.bodyToMono(ByteArrayResource.class).block();
createSpreadsheet(bAr);
}
}
)
.block();
基本上我想根据 header 中定义的 MediaType 以不同方式处理 ClientResponse。
这可能吗?
首先,有几件事可以帮助您理解解决此用例的代码片段。
- 您永远不应该在 returns 反应类型的方法中调用阻塞方法;您将阻塞应用程序的几个线程之一,这对应用程序非常不利
- 无论如何从 Reactor 3.2 开始,blocking within a reactive pipeline throws an error
- 如评论中所建议的,调用
subscribe
也不是一个好主意。它或多或少类似于在单独的线程中将作业作为任务启动。完成后你会得到一个回调(subscribe
方法可以被赋予 lambda 表达式),但实际上你正在将当前管道与该任务解耦。在这种情况下,在您有机会读取完整的响应正文并将其写入文件之前,可以关闭客户端 HTTP 响应并清理资源 - 如果您不想在内存中缓冲整个响应,Spring 提供
DataBuffer
(想想可以合并的 ByteBuffer 实例)。 - 如果您正在实施的方法本身是阻塞的(例如返回
void
),例如在测试用例中,您可以调用 block。
这是您可以用来执行此操作的代码片段:
Mono<Void> fileWritten = WebClient.create().post()
.uri(uriBuilder -> uriBuilder.path("/file/").build())
.exchange()
.flatMap(response -> {
if (MediaType.APPLICATION_JSON_UTF8.equals(response.headers().contentType().get())) {
Mono<NoPayloadResponseDto> dto = response.bodyToMono(NoPayloadResponseDto.class);
return createErrorFile(dto);
}
else {
Flux<DataBuffer> body = response.bodyToFlux(DataBuffer.class);
return createSpreadsheet(body);
}
});
// Once you get that Mono, you should give plug it into an existing
// reactive pipeline, or call block on it, depending on the situation
如您所见,我们没有在任何地方阻塞,处理 I/O 的方法正在返回 Mono<Void>
,这是 done(error)
回调的反应式等价物,它在事情发生时发出信号已完成,如果发生错误。
因为我不确定 createErrorFile
方法应该做什么,所以我提供了 createSpreadsheet
的示例,它只是将正文字节写入文件。请注意,由于数据缓冲区可能 recycled/pooled,我们需要在完成后释放它们。
private Mono<Void> createSpreadsheet(Flux<DataBuffer> body) {
try {
Path file = //...
WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
return DataBufferUtils.write(body, channel).map(DataBufferUtils::release).then();
} catch (IOException exc) {
return Mono.error(exc);
}
}
通过此实现,您的应用程序将在给定时间在内存中保存几个 DataBuffer
实例(出于性能原因,反应式运算符会预取值)并且将以反应式方式写入字节。
RestResultMessage message= createWebClient()
.get()
.uri(uri)
.exchange()
.map(clientResponse -> {
//delegation
ClientResponseWrapper wrapper = new
ClientResponseWrapper(clientResponse);
return Mono.just(wrapper);
})
.block() //wait until request is not done
.map(result -> {
//convert to any data
if (!result.statusCode().isError()){
//extract the result from request
return create(RestResultMessage.Result.success, result.bodyToMono(String.class).block());}
} else {
return create(RestResultMessage.Result.error, result.statusCode().name());
}
})
.block();
[更新 2021/10/19]
toProcessor()
现已弃用。
考虑使用
myMono.toFuture().get();
正如投票最多的答案中所述,永远不应阻止。就我而言,这是唯一的选择,因为我们在一段命令式代码中使用反应式库。阻止可以通过 wrapping the mono in a processor:
myMono.toProcessor().block()
要在服务器请求池外执行客户端请求,请使用myWebClientMono.share().block();