从享元可自行移除的资源

Self removable resource from flyweight

我正在处理具有不同资源的应用程序,我需要关闭这些资源,同时使用反应流。

我有基于 flyweight 模式的工厂,它保留对对象的引用,并且它们实现了 AutoCloseable 接口。问题是我在 Autocloseable class 中使用 close(), 这是我的问题:删除对工厂内封闭资源的引用的最佳解决方案是什么?我是否可以抛出某种事件并在工厂中捕获它​​,或者在每次可以关闭资源的操作之后我是否应该遍历引用映射并删除已关闭的资源?

为了更好的上下文: 我正在使用 reactivex Observable,它会发出目录事件(创建、删除 file/directory),并且在每个订阅者取消订阅后,我将关闭我正在使用的 WatchService。

编辑#1

这是我的工厂 class 的样子:

public final class Factory {

    private final ConcurrentHashMap<String, ReactiveStream> reactiveStreams = new ConcurrentHashMap<>();

    public ReactiveStream getReactiveStream(Path path) throws IOException {

        ReactiveStream stream = reactiveStreams.get(path.toString());

        if (stream != null) return stream;

        stream = new ReactiveStream(path);
        reactiveStreams.put(path.toString(), stream);

        return stream;

    }

}

这是我的 ReactiveStream class 的样子:

public class ReactiveStream implements AutoCloseable {

    (...)
    private WatchService service;
    private Observable<Event> observable;

    public Observable<Event> getObservable() throws IOException {

        (...) // where i create observable

        return observable;
    }

    (...)

    @Override
    public void close() throws IOException {
        service.close();
    }
}

如你所见,我有一个工厂,它保留对 ReactiveStream class 的引用,它在可观察后自行关闭,将不再被订阅(我使用 doOnUnsubscribe(() -> close()) 在 observable 上使用 share() 之前,所以当没有订阅者时,将调用 doOnUnsubscribe)。

我的问题是,在关闭 ReactiveStream 之后,如何从 Factory 中删除对关闭的 ReactiveStream 的引用?

编辑#2

observable = Observable.fromCallable(new EventObtainer()).flatMap(Observable::from).subscribeOn(Schedulers.io()).repeat().doOnUnsubscribe(() -> {
                try {
                    close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }).share();

以下是我创建可观察对象的方式。 EventObtainer 嵌套在 ReactiveStream 中 class,ReactiveStream 使用 WatchService,每个订阅者停止订阅后需要关闭它。

今天我的同事告诉我解决这个问题的最佳方案。所以我创建了界面:

@FunctionalInterface
public interface CustomClosable {

    void onClosing();

}

并在Constructor中为ReactiveStream添加该接口的引用

另外,现在我正在调用 onClosing.onClosing() 需要关闭资源的地方。

感谢工厂 class 负责声明在其资源关闭后应该做什么的操作,而且我没有循环依赖,所以我的 ReactiveStream class 可以多次重复使用。