WebFlux (Reactor) 中的文件处理
File handing in WebFlux (Reactor)
我正在开发一个全新的反应式项目,其中正在进行 lot 文件处理 IO。如果我以命令式阻塞方式编写 IO 代码,然后将它们包装在 Mono 中,将它们发布到 boundedElastic 调度程序上,就足够了吗? boundedElastic 池大小会限制并发操作数吗?
如果这不是正确的方法,您能否举例说明如何使用 Reactor 将字节写入文件?
Is it sufficient if I write the IO code in an imperative blocking manner and then wrap them in a Mono, publish them on boundedElastic scheduler?
这在某种程度上归结为意见 - 但不,对于反应性绿地项目恕我直言,肯定不理想。 boundedElastic()
调度程序非常适合在您 必须 时与阻塞 IO 进行交互,但是当存在真正的非阻塞解决方案时,它们并不是一个很好的替代品。 (有时这与文件处理有点争议,因为这取决于底层系统是否有可能异步执行 - 但现在通常是可能的。)
在你的情况下,我会考虑在响应式发布者中包装 AsynchronousFileChannel
。为此,您需要使用 create()
或 push()
,然后显式调用 sink
,但具体 如何 取决于你的用例。作为文件写入的 "simplest case",您可以可行地执行以下操作:
static Mono<Void> writeToFile(AsynchronousFileChannel channel, String content) {
return Mono.create(sink -> {
byte[] bytes = content.getBytes();
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();
channel.write(buffer, 0, null, new CompletionHandler<>() {
@Override
public void completed(Integer result, Object attachment) {
sink.success();
}
@Override
public void failed(Throwable exc, Object attachment) {
sink.error(exc);
}
});
});
}
可以找到桥接两个 API 的更彻底/全面的示例 here - 几乎可以肯定还有其他示例。
我正在开发一个全新的反应式项目,其中正在进行 lot 文件处理 IO。如果我以命令式阻塞方式编写 IO 代码,然后将它们包装在 Mono 中,将它们发布到 boundedElastic 调度程序上,就足够了吗? boundedElastic 池大小会限制并发操作数吗?
如果这不是正确的方法,您能否举例说明如何使用 Reactor 将字节写入文件?
Is it sufficient if I write the IO code in an imperative blocking manner and then wrap them in a Mono, publish them on boundedElastic scheduler?
这在某种程度上归结为意见 - 但不,对于反应性绿地项目恕我直言,肯定不理想。 boundedElastic()
调度程序非常适合在您 必须 时与阻塞 IO 进行交互,但是当存在真正的非阻塞解决方案时,它们并不是一个很好的替代品。 (有时这与文件处理有点争议,因为这取决于底层系统是否有可能异步执行 - 但现在通常是可能的。)
在你的情况下,我会考虑在响应式发布者中包装 AsynchronousFileChannel
。为此,您需要使用 create()
或 push()
,然后显式调用 sink
,但具体 如何 取决于你的用例。作为文件写入的 "simplest case",您可以可行地执行以下操作:
static Mono<Void> writeToFile(AsynchronousFileChannel channel, String content) {
return Mono.create(sink -> {
byte[] bytes = content.getBytes();
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();
channel.write(buffer, 0, null, new CompletionHandler<>() {
@Override
public void completed(Integer result, Object attachment) {
sink.success();
}
@Override
public void failed(Throwable exc, Object attachment) {
sink.error(exc);
}
});
});
}
可以找到桥接两个 API 的更彻底/全面的示例 here - 几乎可以肯定还有其他示例。