RXJava2 管理订阅
RXJava2 manage subscriptions
我需要澄清什么是使用 RxJava2 管理特定场景的最佳方法(整个应用程序结构都基于它):
在我的应用程序中,很多人可以对同一文档进行更改,因此我需要将每项更改传递给查看该文档的每个人。但是这个对象非常复杂和沉重,所以我需要在最后一个人关闭它时从内存中删除它。以及更多:文档可以是另一个文档的子文档,因此父文档中的每个更改都必须发送给所有子文档。
到目前为止我做了什么:我创建了一个管理器,所以每个文档请求都会收到它。每当有人需要处理文档时,我都会在地图中查看文档是否已经打开。如果不是,我创建一个 BaseDocument 实例,它从文档和 PublishSubject 接收数据以分发事件并添加到该地图。然后我在 PublishSubject 上订阅用户的 Observer 以获取更改。每当用户需要更改某些内容时,它会将更改发送到 BaseDocument,进行更改并通过 onNext() 将新版本发送给所有人。到目前为止一切顺利。
我的问题是我无法控制何时有人处置文档观察者,因此我无法控制何时不再需要该文档,因此我可以保留任何未保存的更改并销毁该对象。除了 "hasObservers()" 之外,我找不到任何订阅列表或类似的东西,而且我不想添加一个计时器来轮询它是否在所有完成后关闭。
我的 "miraculous answer" 将是在最后一个订阅者处理时调用的回调,所以我可以打扫房子并扔掉整个对象,但我找不到这样的东西。那么,如何管理订阅?
其中一种方法是计算订阅和处置的数量。如果数字为 0,则删除文档。它看起来像这样:
int numberOfSubscribers = 0;
...
public Observable<T> expose(){
return subject.asObservable()
.doOnSubscribe(()-> numberOfSubscribers++)
.doOnDispose(()-> {
numberOfSubscribers--;
if (numberOfSubscribers == 0){
//remove the object
}
});
当然你需要在这里添加对并发问题的支持(synchronized/atomic int),这只是一个草稿。
希望对您有所帮助:)
我需要澄清什么是使用 RxJava2 管理特定场景的最佳方法(整个应用程序结构都基于它):
在我的应用程序中,很多人可以对同一文档进行更改,因此我需要将每项更改传递给查看该文档的每个人。但是这个对象非常复杂和沉重,所以我需要在最后一个人关闭它时从内存中删除它。以及更多:文档可以是另一个文档的子文档,因此父文档中的每个更改都必须发送给所有子文档。
到目前为止我做了什么:我创建了一个管理器,所以每个文档请求都会收到它。每当有人需要处理文档时,我都会在地图中查看文档是否已经打开。如果不是,我创建一个 BaseDocument 实例,它从文档和 PublishSubject 接收数据以分发事件并添加到该地图。然后我在 PublishSubject 上订阅用户的 Observer 以获取更改。每当用户需要更改某些内容时,它会将更改发送到 BaseDocument,进行更改并通过 onNext() 将新版本发送给所有人。到目前为止一切顺利。
我的问题是我无法控制何时有人处置文档观察者,因此我无法控制何时不再需要该文档,因此我可以保留任何未保存的更改并销毁该对象。除了 "hasObservers()" 之外,我找不到任何订阅列表或类似的东西,而且我不想添加一个计时器来轮询它是否在所有完成后关闭。
我的 "miraculous answer" 将是在最后一个订阅者处理时调用的回调,所以我可以打扫房子并扔掉整个对象,但我找不到这样的东西。那么,如何管理订阅?
其中一种方法是计算订阅和处置的数量。如果数字为 0,则删除文档。它看起来像这样:
int numberOfSubscribers = 0;
...
public Observable<T> expose(){
return subject.asObservable()
.doOnSubscribe(()-> numberOfSubscribers++)
.doOnDispose(()-> {
numberOfSubscribers--;
if (numberOfSubscribers == 0){
//remove the object
}
});
当然你需要在这里添加对并发问题的支持(synchronized/atomic int),这只是一个草稿。
希望对您有所帮助:)