创建Flux.fromIterable后如何运行onClose操作?

How to run onClose operation after creating Flux.fromIterable?

假设我们需要根据 Closeable 资源的内容创建一个 Flux。 为清楚起见,假设有一个 BufferedReader 要转换为 Flux<String>

BufferedReader reader = createReader("my_resource_path");
Flux<String> flux = Flux.fromIterable(() -> iteratorOfLines(reader));

让我们假设 iteratorOfLines 产生一个有限的项目集。

我正在寻找一种方法来关闭 BufferedReader,当 Flux 已经消耗了它的所有数据或者由于某种原因不需要剩余数据(即订阅被中止)。

有构造函数reactor.core.publisher.FluxIterable(Iterable iterable, Runnable onClose),但是:

  1. 似乎无法从 reactor
  2. 的 public API 到达(甚至可传递)
  3. 我怀疑它是否有用,因为它没有涵盖 Flux 在获取可迭代对象中的最后一项之前停止的情况。

Flux.fromIterable 发布最后一项后 cleaning/closing 资源的正确处理方式是什么?

可能有比 fromIterable 更好的方法来做类似的事情,所以欢迎所有选择。

对于等效的资源尝试,您可以使用 using

    Flux.using(
            //Set up resource
            () -> createReader("my_resource_path"),
            //Create flux from resource
            reader -> Flux.fromIterable(iteratorOfLines(reader)),
            //Perform action (cleanup/close) 
            //when resource completes/errors/cancelled
            reader -> {
                try{
                    reader.close();
                }catch(IOException e){
                    throw Exceptions.propagate(e);
                }
            }
    );