RxJava 数据流与 SqlBrite 和 Retrofit
RxJava data flow with SqlBrite and Retrofit
在我的 Android 应用程序中,我使用的是域级存储库接口,该接口由使用 SqlBrite 和网络 api 以及 Retrofit observables 实现的本地数据库支持。所以我在 Repository 中有方法 getDomains(): Observable<List<Domain>>
,在我的 Retrofit 和 SqlBrite 中有两个相应的方法。
我不想连接或合并这两个可观察对象。我希望我的存储库仅从 SqlBrite 获取数据,并且由于 SqlBrite returns QueryObservable 会在每次基础数据更改时触发 onNext()
,我可以独立地 运行 我的网络请求并将结果存储到 SqlBrite 和让我的 Observable 更新为从网络获取并存储到数据库数据。
所以我尝试按如下方式实现存储库的 getDomains()
方法:
fun getDomains(): Observable<List<Domain>> {
return db.getDomains()
.doOnSubscribe {
networkClient.getDomains()
.doOnNext { db.putDomains(it) }
.onErrorReturn{ emptyList() }
.subscribe()
}
}
但是这样的话客户端每次都要订阅,每次都会发起网络请求,这样就不太好了。我考虑过其他 do...
运算符将请求移到那里,但是 doOnCompleted()
在 QueryObservable 的情况下永远不会被调用,直到我在某个地方调用 toBlocking()
,我不会,doOnEach()
也不好,因为它每次从数据库中提取项目时都会发出请求。
我还尝试使用 replay()
运算符,但尽管在这种情况下缓存了 Observable,但订阅会发生并导致网络请求。
那么,如何以所需的方式组合这两个 Observable?
如果您的问题是您每次想要获取数据时都必须订阅您的观察者,您可以使用 relay,它永远不会取消订阅观察者,因为它没有实现 onComplete
/**
* Relay is just an observable which subscribe an observer, but it wont unsubscribe once emit the items. So the pipeline keep open
* It should return 1,2,3,4,5 for first observer and just 3, 4, 5 fot the second observer since default relay emit last emitted item,
* and all the next items passed to the pipeline.
*/
@Test
public void testRelay() throws InterruptedException {
BehaviorRelay<String> relay = BehaviorRelay.create("default");
relay.subscribe(result -> System.out.println("Observer1:" + result));
relay.call("1");
relay.call("2");
relay.call("3");
relay.subscribe(result -> System.out.println("Observer2:" + result));
relay.call("4");
relay.call("5");
}
这里还有一个例子https://github.com/politrons/reactive/blob/master/src/test/java/rx/relay/Relay.java
好的,这取决于您的具体用例:即假设您要显示本地数据库中的最新数据,并且不时通过在后台执行网络请求来更新数据库。
也许有更好的方法,但也许你可以这样做
fun <T> createDataAwareObservable(databaseQuery: Observable<T>): Observable<T> =
stateDeterminer.getState().flatMap {
when (it) {
State.UP_TO_DATE -> databaseQuery // Nothing to do, data is up to date so observable can be returned directly
State.NO_DATA ->
networkClient.getDomains() // no data so first do the network call
.flatMap { db.save(it) } // save network call result in database
.flatMap { databaseQuery } // continue with original observable
State.SYNC_IN_BACKGROUND -> {
// Execute sync in background
networkClient.getDomains()
.flatMap { db.save(it) }
.observeOn(backgroundSyncScheduler)
.subscribeOn(backgroundSyncScheduler)
.subscribe({}, { Timber.e(it, "Error when starting background sync") }, {})
// Continue with original observable in parallel, network call will then update database and thanks to sqlbrite databaseQuery will be update automatically
databaseQuery
}
}
}
因此,最后您创建了 SQLBrite Observable (QueryObservable) 并将其传递给 createDataAwareObservable()
函数。如果这里没有数据,它将确保它从网络加载数据,否则它将检查数据是否应该在后台更新(将其保存到数据库中,然后自动更新 SQLBrite QueryObservable)或者数据是否是最新。
基本上你可以这样使用它:
createDataAwareObservable( db.getAllDomains() ).subscribe(...)
因此,作为此 createDataAwareObservable()
的用户,您总是会在作为参数传入时得到相同的类型 Observable<T>
。所以从本质上讲,您似乎总是订阅 db.getAllDomains()
...
在我的 Android 应用程序中,我使用的是域级存储库接口,该接口由使用 SqlBrite 和网络 api 以及 Retrofit observables 实现的本地数据库支持。所以我在 Repository 中有方法 getDomains(): Observable<List<Domain>>
,在我的 Retrofit 和 SqlBrite 中有两个相应的方法。
我不想连接或合并这两个可观察对象。我希望我的存储库仅从 SqlBrite 获取数据,并且由于 SqlBrite returns QueryObservable 会在每次基础数据更改时触发 onNext()
,我可以独立地 运行 我的网络请求并将结果存储到 SqlBrite 和让我的 Observable 更新为从网络获取并存储到数据库数据。
所以我尝试按如下方式实现存储库的 getDomains()
方法:
fun getDomains(): Observable<List<Domain>> {
return db.getDomains()
.doOnSubscribe {
networkClient.getDomains()
.doOnNext { db.putDomains(it) }
.onErrorReturn{ emptyList() }
.subscribe()
}
}
但是这样的话客户端每次都要订阅,每次都会发起网络请求,这样就不太好了。我考虑过其他 do...
运算符将请求移到那里,但是 doOnCompleted()
在 QueryObservable 的情况下永远不会被调用,直到我在某个地方调用 toBlocking()
,我不会,doOnEach()
也不好,因为它每次从数据库中提取项目时都会发出请求。
我还尝试使用 replay()
运算符,但尽管在这种情况下缓存了 Observable,但订阅会发生并导致网络请求。
那么,如何以所需的方式组合这两个 Observable?
如果您的问题是您每次想要获取数据时都必须订阅您的观察者,您可以使用 relay,它永远不会取消订阅观察者,因为它没有实现 onComplete
/**
* Relay is just an observable which subscribe an observer, but it wont unsubscribe once emit the items. So the pipeline keep open
* It should return 1,2,3,4,5 for first observer and just 3, 4, 5 fot the second observer since default relay emit last emitted item,
* and all the next items passed to the pipeline.
*/
@Test
public void testRelay() throws InterruptedException {
BehaviorRelay<String> relay = BehaviorRelay.create("default");
relay.subscribe(result -> System.out.println("Observer1:" + result));
relay.call("1");
relay.call("2");
relay.call("3");
relay.subscribe(result -> System.out.println("Observer2:" + result));
relay.call("4");
relay.call("5");
}
这里还有一个例子https://github.com/politrons/reactive/blob/master/src/test/java/rx/relay/Relay.java
好的,这取决于您的具体用例:即假设您要显示本地数据库中的最新数据,并且不时通过在后台执行网络请求来更新数据库。
也许有更好的方法,但也许你可以这样做
fun <T> createDataAwareObservable(databaseQuery: Observable<T>): Observable<T> =
stateDeterminer.getState().flatMap {
when (it) {
State.UP_TO_DATE -> databaseQuery // Nothing to do, data is up to date so observable can be returned directly
State.NO_DATA ->
networkClient.getDomains() // no data so first do the network call
.flatMap { db.save(it) } // save network call result in database
.flatMap { databaseQuery } // continue with original observable
State.SYNC_IN_BACKGROUND -> {
// Execute sync in background
networkClient.getDomains()
.flatMap { db.save(it) }
.observeOn(backgroundSyncScheduler)
.subscribeOn(backgroundSyncScheduler)
.subscribe({}, { Timber.e(it, "Error when starting background sync") }, {})
// Continue with original observable in parallel, network call will then update database and thanks to sqlbrite databaseQuery will be update automatically
databaseQuery
}
}
}
因此,最后您创建了 SQLBrite Observable (QueryObservable) 并将其传递给 createDataAwareObservable()
函数。如果这里没有数据,它将确保它从网络加载数据,否则它将检查数据是否应该在后台更新(将其保存到数据库中,然后自动更新 SQLBrite QueryObservable)或者数据是否是最新。
基本上你可以这样使用它:
createDataAwareObservable( db.getAllDomains() ).subscribe(...)
因此,作为此 createDataAwareObservable()
的用户,您总是会在作为参数传入时得到相同的类型 Observable<T>
。所以从本质上讲,您似乎总是订阅 db.getAllDomains()
...