如何正确关闭可变数量的流?

How to properly close a variable amount of streams?

我正在创建必须并行(或可能并行)访问的多个流。当资源量在编译时固定时,我知道如何进行 try-with-resources,但是如果资源量由参数确定怎么办?

我有这样的东西:

private static void foo(String path, String... files) throws IOException {
    @SuppressWarnings("unchecked")
    Stream<String>[] streams = new Stream[files.length];

    try {
        for (int i = 0; i < files.length; i++) {
            final String file = files[i];
            streams[i] = Files.lines(Paths.get(path, file))
                .onClose(() -> System.out.println("Closed " + file));
        }

        // do something with streams
        Stream.of(streams)
            .parallel()
            .flatMap(x -> x)
            .distinct()
            .sorted()
            .limit(10)
            .forEach(System.out::println);
    }
    finally {
        for (Stream<String> s : streams) {
            if (s != null) {
                s.close();
            }
        }
    }
}

您可以编写复合 AutoCloseable 来管理动态数量的 AutoCloseable:

import java.util.ArrayList;
import java.util.List;

public class CompositeAutoclosable<T extends AutoCloseable> implements AutoCloseable {
    private final List<T> components= new ArrayList<>();

    public void addComponent(T component) { components.add(component); }

    public List<T> getComponents() { return components; }

    @Override
    public void close() throws Exception {
        Exception e = null;
        for (T component : components) {
            try { component.close(); }
            catch (Exception closeException) {
                if (e == null) { e = closeException; }
                else { e.addSuppressed(closeException); }
            }
        }
        if (e != null) { throw e; }
    }
}

您可以在您的方法中使用它:

private static void foo(String path, String... files) throws Exception {
    try (CompositeAutoclosable<Stream<String>> streams 
            = new CompositeAutoclosable<Stream<String>>()) {
        for (int i = 0; i < files.length; i++) {
            final String file = files[i];
            streams.addComponent(Files.lines(Paths.get(path, file))
                .onClose(() -> System.out.println("Closed " + file)));
        }
        streams.getComponents().stream()
            .parallel()
            .flatMap(x -> x)
            .distinct()
            .sorted()
            .limit(10)
            .forEach(System.out::println);
    }
}

documentation of Stream.flatMap 说:

Each mapped stream is closed after its contents have been placed into this stream.

换句话说,对于流的普通关闭,不需要额外的操作。但是,由于只有处理过的流是关闭的,所以你不应该在不知道它们稍后是否被流处理的情况下急切地创建流:

private static void foo(String path, String... files) throws IOException {
    Arrays.stream(files).flatMap(file-> {
              try { return Files.lines(Paths.get(path, file))
                    .onClose(() -> System.out.println("Closed " + file)); }
              catch(IOException ex) { throw new UncheckedIOException(ex); } })
          .parallel()
          .distinct()
          .sorted()
          .limit(10)
          .forEachOrdered(System.out::println);
}

通过在 flatMap 中创建子流,可以保证每个子流仅在流要处理它时才创建。因此,此解决方案将关闭所有子流,即使在 try-with-resource 语句中没有外部 Stream