观察可关闭资源在订阅者为 removed/disposed 时关闭

Observing closeable resources to be closed when the subscriber is removed/disposed

我正在开发一个小型子系统,它使用 RxJava 2 集成了两个简单的组件。 这两个组件以简单的客户端-服务器方式工作,其中第一个组件生成可观察的数据,在后台打开资源。 该资源不会暴露给第二个组件。 此外,只要可观察对象在使用中,它就必须打开,但是可观察对象无法确定何时关闭。 用代码来说,一个示例实现是这样的:

private Disposable disposable;

public void onCreate() {
    final Maybe<Object> maybeResource = Maybe.defer(() -> {
        System.out.println("open");
        // here is the resource under the hood, it is encapsulated in the observable and never gets exposed
        final Closeable resource = () -> { };
        return Maybe.just(resource)
                .doOnDispose(() -> {
                    // this "destructor" is never called, resulting in a resource leak
                    System.out.println("close");
                    resource.close();
                })
                // arbitrary data, does not represent the data I'm working with, but it hides the resource away
                .map(closeable -> new Object());
    });
    disposable = maybeResource.subscribe(data -> System.out.println("process: " + data));
}

public void onUserWorflow() {
    // ...
    System.out.println("... ... ...");
    // ...
}

public void onDestroy() {
    disposable.dispose();
}

我期望得到的输出是:

open
process: <...>
... ... ...
close         <-- this is never produced

但最后一行 close 从未生成,因为 doOnDispose 方法未被调用并且无法像我认为的那样工作。 因此资源永远不会被释放。 还有 Maybe.using 做类似的事情,但它不允许“跨越”“用户工作流程”。

是否有 RxJava/RxJava 2 方法允许管理在处理订阅者时关闭的“可关闭”资源?

我猜你需要使用 Observable.create() 而不是 Maybe。 类似的东西:

final Observable<Object> resourceObservable = Observable.create<Object> {(emitter ->
        // do you staff
        emitter.onNext(new Object()); //to make observable emit something
        emitter.setCancellable ( 
           System.out.println("close");
           resource.close(); 
        )
    );

disposable = resourceObservable.subscribe(data -> System.out.println("process: " + data));