创建Flux.fromIterable后如何运行onClose操作?
How to run onClose operation after creating Flux.fromIterable?
假设我们需要根据 Closeable
资源的内容创建一个 Flux
。
为清楚起见,假设有一个 BufferedReader
要转换为 Flux<String>
。
BufferedReader reader = createReader("my_resource_path");
Flux<String> flux = Flux.fromIterable(() -> iteratorOfLines(reader));
让我们假设 iteratorOfLines
产生一个有限的项目集。
我正在寻找一种方法来关闭 BufferedReader
,当 Flux
已经消耗了它的所有数据或者由于某种原因不需要剩余数据(即订阅被中止)。
有构造函数reactor.core.publisher.FluxIterable(Iterable iterable, Runnable onClose)
,但是:
- 似乎无法从 reactor
的 public API 到达(甚至可传递)
- 我怀疑它是否有用,因为它没有涵盖 Flux 在获取可迭代对象中的最后一项之前停止的情况。
Flux.fromIterable
发布最后一项后 cleaning/closing 资源的正确处理方式是什么?
可能有比 fromIterable
更好的方法来做类似的事情,所以欢迎所有选择。
对于等效的资源尝试,您可以使用 using
Flux.using(
//Set up resource
() -> createReader("my_resource_path"),
//Create flux from resource
reader -> Flux.fromIterable(iteratorOfLines(reader)),
//Perform action (cleanup/close)
//when resource completes/errors/cancelled
reader -> {
try{
reader.close();
}catch(IOException e){
throw Exceptions.propagate(e);
}
}
);
假设我们需要根据 Closeable
资源的内容创建一个 Flux
。
为清楚起见,假设有一个 BufferedReader
要转换为 Flux<String>
。
BufferedReader reader = createReader("my_resource_path");
Flux<String> flux = Flux.fromIterable(() -> iteratorOfLines(reader));
让我们假设 iteratorOfLines
产生一个有限的项目集。
我正在寻找一种方法来关闭 BufferedReader
,当 Flux
已经消耗了它的所有数据或者由于某种原因不需要剩余数据(即订阅被中止)。
有构造函数reactor.core.publisher.FluxIterable(Iterable iterable, Runnable onClose)
,但是:
- 似乎无法从 reactor 的 public API 到达(甚至可传递)
- 我怀疑它是否有用,因为它没有涵盖 Flux 在获取可迭代对象中的最后一项之前停止的情况。
Flux.fromIterable
发布最后一项后 cleaning/closing 资源的正确处理方式是什么?
可能有比 fromIterable
更好的方法来做类似的事情,所以欢迎所有选择。
对于等效的资源尝试,您可以使用 using
Flux.using(
//Set up resource
() -> createReader("my_resource_path"),
//Create flux from resource
reader -> Flux.fromIterable(iteratorOfLines(reader)),
//Perform action (cleanup/close)
//when resource completes/errors/cancelled
reader -> {
try{
reader.close();
}catch(IOException e){
throw Exceptions.propagate(e);
}
}
);