观察可关闭资源在订阅者为 removed/disposed 时关闭
Observing closeable resources to be closed when the subscriber is removed/disposed
我正在开发一个小型子系统,它使用 RxJava 2 集成了两个简单的组件。
这两个组件以简单的客户端-服务器方式工作,其中第一个组件生成可观察的数据,在后台打开资源。
该资源不会暴露给第二个组件。
此外,只要可观察对象在使用中,它就必须打开,但是可观察对象无法确定何时关闭。
用代码来说,一个示例实现是这样的:
private Disposable disposable;
public void onCreate() {
final Maybe<Object> maybeResource = Maybe.defer(() -> {
System.out.println("open");
// here is the resource under the hood, it is encapsulated in the observable and never gets exposed
final Closeable resource = () -> { };
return Maybe.just(resource)
.doOnDispose(() -> {
// this "destructor" is never called, resulting in a resource leak
System.out.println("close");
resource.close();
})
// arbitrary data, does not represent the data I'm working with, but it hides the resource away
.map(closeable -> new Object());
});
disposable = maybeResource.subscribe(data -> System.out.println("process: " + data));
}
public void onUserWorflow() {
// ...
System.out.println("... ... ...");
// ...
}
public void onDestroy() {
disposable.dispose();
}
我期望得到的输出是:
open
process: <...>
... ... ...
close <-- this is never produced
但最后一行 close
从未生成,因为 doOnDispose
方法未被调用并且无法像我认为的那样工作。
因此资源永远不会被释放。
还有 Maybe.using
做类似的事情,但它不允许“跨越”“用户工作流程”。
是否有 RxJava/RxJava 2 方法允许管理在处理订阅者时关闭的“可关闭”资源?
我猜你需要使用 Observable.create()
而不是 Maybe。
类似的东西:
final Observable<Object> resourceObservable = Observable.create<Object> {(emitter ->
// do you staff
emitter.onNext(new Object()); //to make observable emit something
emitter.setCancellable (
System.out.println("close");
resource.close();
)
);
disposable = resourceObservable.subscribe(data -> System.out.println("process: " + data));
我正在开发一个小型子系统,它使用 RxJava 2 集成了两个简单的组件。 这两个组件以简单的客户端-服务器方式工作,其中第一个组件生成可观察的数据,在后台打开资源。 该资源不会暴露给第二个组件。 此外,只要可观察对象在使用中,它就必须打开,但是可观察对象无法确定何时关闭。 用代码来说,一个示例实现是这样的:
private Disposable disposable;
public void onCreate() {
final Maybe<Object> maybeResource = Maybe.defer(() -> {
System.out.println("open");
// here is the resource under the hood, it is encapsulated in the observable and never gets exposed
final Closeable resource = () -> { };
return Maybe.just(resource)
.doOnDispose(() -> {
// this "destructor" is never called, resulting in a resource leak
System.out.println("close");
resource.close();
})
// arbitrary data, does not represent the data I'm working with, but it hides the resource away
.map(closeable -> new Object());
});
disposable = maybeResource.subscribe(data -> System.out.println("process: " + data));
}
public void onUserWorflow() {
// ...
System.out.println("... ... ...");
// ...
}
public void onDestroy() {
disposable.dispose();
}
我期望得到的输出是:
open
process: <...>
... ... ...
close <-- this is never produced
但最后一行 close
从未生成,因为 doOnDispose
方法未被调用并且无法像我认为的那样工作。
因此资源永远不会被释放。
还有 Maybe.using
做类似的事情,但它不允许“跨越”“用户工作流程”。
是否有 RxJava/RxJava 2 方法允许管理在处理订阅者时关闭的“可关闭”资源?
我猜你需要使用 Observable.create()
而不是 Maybe。
类似的东西:
final Observable<Object> resourceObservable = Observable.create<Object> {(emitter ->
// do you staff
emitter.onNext(new Object()); //to make observable emit something
emitter.setCancellable (
System.out.println("close");
resource.close();
)
);
disposable = resourceObservable.subscribe(data -> System.out.println("process: " + data));