RxJava轮询+手动刷新
RxJava polling + manual refresh
我有一个列表想要每分钟刷新一次。
例如这里的用户列表:https://github.com/android10/Android-CleanArchitecture/blob/master/domain/src/main/java/com/fernandocejas/android10/sample/domain/interactor/GetUserList.java
我使用 repeatWhen 添加定期刷新:
public Observable<List<User>> buildUseCaseObservable(Void unused) {
return this.userRepository
.users()
.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
return objectObservable.delay(1, TimeUnit.MINUTES);
}
});
}
这样很好用,每分钟调用一次onNext。
但是如果我想立即刷新这个列表(因为用户的操作或因为通知),我不知道如何执行。
我应该 cancel/dispose observable 并重新启动一个新的吗?
谢谢
如果您不关心在其他可观察对象之一发出数据后调整刷新时间,您可以执行以下操作:
// Specific example of a user manually requesting
val request = Observable.create<String> { emitter ->
refresh.setOnClickListener {
emitter.onNext("Click Request")
}
}
.observeOn(Schedulers.io())
.flatMap {
userRepository.users()
}
// Refresh based off of your original work, could use something like interval as well
val interval = userRepository.users()
.subscribeOn(Schedulers.io())
.repeatWhen { objectObservable ->
objectObservable.delay(1, TimeUnit.MINUTES)
}
// Combine them so that both emissions are received you can even add on another source
Observable.merge(request,interval)
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
contents.text = it.toString()
}, {
contents.text = it.toString()
},{
println(contents.text)
})
这样就不用每次都dispose重新订阅了
根据您的代码,我了解到用户列表是在订阅后生成和发出的。
这里有一些我能想到的解决方案,而不是取消订阅并重新订阅您想立即响应的事件:
不使用 repeatWhen
运算符,而是使用 interval
creation operator 结合 flatMap
每分钟调用对新 Observable 的订阅,并使用 merge
运算符添加对您感兴趣的其他事件的反应。像这样:
@Test
public void intervalObservableAndImmediateReaction() throws InterruptedException {
Observable<String> obs = Observable.interval(1, TimeUnit.SECONDS)
.cast(Object.class)
.mergeWith(
Observable.just("mockedUserClick")
.delay(500, TimeUnit.MILLISECONDS))
.flatMap(
timeOrClick -> Observable.just("Generated upon subscription")
);
obs.subscribe(System.out::println);
Thread.currentThread().sleep(3000); //to see the prints before ending the test
}
或根据您的需要调整(但原理是一样的):
Observable.interval(1, TimeUnit.MINUTES)
.mergeWith(RxView.clicks(buttonView))
.flatMap(timeOrClick -> this.userRepository.users());
您可以像以前一样使用 flatMap
运算符,即使在保持当前实现工作并且不合并到间隔的情况下 - 只需保持工作代码在程序链的另一个区域它到您选择的RxBinding:
RxView.touches(yourViewVariable)
.flatMatp(motionEvent -> this.userRepository.users())
.subscribe(theObserver);
请注意,在此解决方案中,订阅是独立于两个可观察对象完成的。如果你使用不同的观察者,或者管理一个主题或那条线上的东西,你可能会过得更好。我 运行 的一个小测试显示一个订阅者处理订阅 2 个不同的 observables 没有问题(在 Rxjava1 中 - 还没有签入 Rxjava2),但我觉得很不稳定。
我有一个列表想要每分钟刷新一次。 例如这里的用户列表:https://github.com/android10/Android-CleanArchitecture/blob/master/domain/src/main/java/com/fernandocejas/android10/sample/domain/interactor/GetUserList.java
我使用 repeatWhen 添加定期刷新:
public Observable<List<User>> buildUseCaseObservable(Void unused) {
return this.userRepository
.users()
.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
return objectObservable.delay(1, TimeUnit.MINUTES);
}
});
}
这样很好用,每分钟调用一次onNext。 但是如果我想立即刷新这个列表(因为用户的操作或因为通知),我不知道如何执行。
我应该 cancel/dispose observable 并重新启动一个新的吗? 谢谢
如果您不关心在其他可观察对象之一发出数据后调整刷新时间,您可以执行以下操作:
// Specific example of a user manually requesting
val request = Observable.create<String> { emitter ->
refresh.setOnClickListener {
emitter.onNext("Click Request")
}
}
.observeOn(Schedulers.io())
.flatMap {
userRepository.users()
}
// Refresh based off of your original work, could use something like interval as well
val interval = userRepository.users()
.subscribeOn(Schedulers.io())
.repeatWhen { objectObservable ->
objectObservable.delay(1, TimeUnit.MINUTES)
}
// Combine them so that both emissions are received you can even add on another source
Observable.merge(request,interval)
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
contents.text = it.toString()
}, {
contents.text = it.toString()
},{
println(contents.text)
})
这样就不用每次都dispose重新订阅了
根据您的代码,我了解到用户列表是在订阅后生成和发出的。
这里有一些我能想到的解决方案,而不是取消订阅并重新订阅您想立即响应的事件:
不使用
repeatWhen
运算符,而是使用interval
creation operator 结合flatMap
每分钟调用对新 Observable 的订阅,并使用merge
运算符添加对您感兴趣的其他事件的反应。像这样:@Test public void intervalObservableAndImmediateReaction() throws InterruptedException { Observable<String> obs = Observable.interval(1, TimeUnit.SECONDS) .cast(Object.class) .mergeWith( Observable.just("mockedUserClick") .delay(500, TimeUnit.MILLISECONDS)) .flatMap( timeOrClick -> Observable.just("Generated upon subscription") ); obs.subscribe(System.out::println); Thread.currentThread().sleep(3000); //to see the prints before ending the test }
或根据您的需要调整(但原理是一样的):
Observable.interval(1, TimeUnit.MINUTES) .mergeWith(RxView.clicks(buttonView)) .flatMap(timeOrClick -> this.userRepository.users());
您可以像以前一样使用
flatMap
运算符,即使在保持当前实现工作并且不合并到间隔的情况下 - 只需保持工作代码在程序链的另一个区域它到您选择的RxBinding:RxView.touches(yourViewVariable) .flatMatp(motionEvent -> this.userRepository.users()) .subscribe(theObserver);
请注意,在此解决方案中,订阅是独立于两个可观察对象完成的。如果你使用不同的观察者,或者管理一个主题或那条线上的东西,你可能会过得更好。我 运行 的一个小测试显示一个订阅者处理订阅 2 个不同的 observables 没有问题(在 Rxjava1 中 - 还没有签入 Rxjava2),但我觉得很不稳定。