使用 RxJava 延迟获取分页对象
Lazy fetching of paginated objects using RxJava
我几乎被 RxJava 卖掉了,它是 Retrofit 的完美伴侣,但我在迁移代码时遇到了一种常见模式:为了节省带宽,我想延迟获取(分页)对象根据需要从我的网络服务,而我的列表视图(或回收视图)正在使用反应式编程滚动。
我以前的代码完美地完成了这项工作,但响应式编程似乎值得一试。
听 listview/recyclerview 滚动(和其他无聊的东西)不是问题并且使用 Retrofit 很容易获得 Observable:
@GET("/api/messages")
Observable<List<Message>> getMessages(@Path("offset") int offset, @Path("limit") int limit);
我只是想不出在反应式编程中使用的模式。
Concat
operator seems a good starting point, along with ConnectableObservable
at some point to defer emission and maybe flatMap
,但是如何呢?
编辑:
这是我当前(天真的)解决方案:
public interface Paged<T> {
boolean isLoading();
void cancel();
void next(int count);
void next(int count, Scheduler scheduler);
Observable<List<T>> asObservable();
boolean hasCompleted();
int position();
}
以及我使用主题的实现:
public abstract class SimplePaged<T> implements Paged<T> {
final PublishSubject<List<T>> subject = PublishSubject.create();
private volatile boolean loading;
private volatile int offset;
private Subscription subscription;
@Override
public boolean isLoading() {
return loading;
}
@Override
public synchronized void cancel() {
if(subscription != null && !subscription.isUnsubscribed())
subscription.unsubscribe();
if(!hasCompleted())
subject.onCompleted();
subscription = null;
loading = false;
}
@Override
public void next(int count) {
next(count, null);
}
@Override
public synchronized void next(int count, Scheduler scheduler) {
if (isLoading())
throw new IllegalStateException("you can't call next() before onNext()");
if(hasCompleted())
throw new IllegalStateException("you can't call next() after onCompleted()");
loading = true;
Observable<List<T>> obs = onNextPage(offset, count).single();
if(scheduler != null)
obs = obs.subscribeOn(scheduler); // BEWARE! onNext/onError/onComplete will happens on that scheduler!
subscription = obs.subscribe(this::onNext, this::onError, this::onComplete);
}
@Override
public Observable<List<T>> asObservable() {
return subject.asObservable();
}
@Override
public boolean hasCompleted() {
return subject.hasCompleted();
}
@Override
public int position() {
return offset;
}
/* Warning: functions below may be called from another thread */
protected synchronized void onNext(List<T> items) {
if (items != null)
offset += items.size();
loading = false;
if (items == null || items.size() == 0)
subject.onCompleted();
else
subject.onNext(items);
}
protected synchronized void onError(Throwable t) {
loading = false;
subject.onError(t);
}
protected synchronized void onComplete() {
loading = false;
}
abstract protected Observable<List<T>> onNextPage(int offset, int count);
}
这是处理响应式分页的几种可能方法之一。假设我们有一个方法 getNextPageTrigger
,当滚动侦听器(或任何输入)想要加载新页面时,returns 和 Observable
会发出一些事件对象。在现实生活中,它可能会有 debounce
运算符,但除此之外,我们将确保仅在加载最新页面后触发它。
我们还定义了一种方法来从列表中解包消息:
Observable<Message> getPage(final int page) {
return service.getMessages(page * PAGE_SIZE, PAGE_SIZE)
.flatMap(messageList -> Observable.from(messageList));
}
然后我们可以进行实际的抓取逻辑:
// Start with the first page.
getPage(0)
// Add on each incremental future page.
.concatWith(Observable.range(1, Integer.MAX_VALUE)
// Uses a little trick to get the next page to wait for a signal to load.
// By ignoring all actual elements emitted and casting, the trigger must
// complete before the actual page request will be made.
.concatMap(page -> getNextPageTrigger().limit(1)
.ignoreElements()
.cast(Message.class)
.concatWith(getPage(page))) // Then subscribe, etc..
这仍然遗漏了一些可能很重要的事情:
1 - 这显然不知道何时停止获取额外的页面,这意味着一旦它到达终点,取决于服务器 returns,它可能每次都不断遇到错误或空结果滚动被触发。解决此问题的方法取决于您如何向客户端发出没有更多页面可加载的信号。
2 - 如果您需要错误重试,我建议查看 retryWhen
运算符。否则,常见的网络错误可能会导致页面加载错误传播。
我几乎被 RxJava 卖掉了,它是 Retrofit 的完美伴侣,但我在迁移代码时遇到了一种常见模式:为了节省带宽,我想延迟获取(分页)对象根据需要从我的网络服务,而我的列表视图(或回收视图)正在使用反应式编程滚动。
我以前的代码完美地完成了这项工作,但响应式编程似乎值得一试。
听 listview/recyclerview 滚动(和其他无聊的东西)不是问题并且使用 Retrofit 很容易获得 Observable:
@GET("/api/messages")
Observable<List<Message>> getMessages(@Path("offset") int offset, @Path("limit") int limit);
我只是想不出在反应式编程中使用的模式。
Concat
operator seems a good starting point, along with ConnectableObservable
at some point to defer emission and maybe flatMap
,但是如何呢?
编辑:
这是我当前(天真的)解决方案:
public interface Paged<T> {
boolean isLoading();
void cancel();
void next(int count);
void next(int count, Scheduler scheduler);
Observable<List<T>> asObservable();
boolean hasCompleted();
int position();
}
以及我使用主题的实现:
public abstract class SimplePaged<T> implements Paged<T> {
final PublishSubject<List<T>> subject = PublishSubject.create();
private volatile boolean loading;
private volatile int offset;
private Subscription subscription;
@Override
public boolean isLoading() {
return loading;
}
@Override
public synchronized void cancel() {
if(subscription != null && !subscription.isUnsubscribed())
subscription.unsubscribe();
if(!hasCompleted())
subject.onCompleted();
subscription = null;
loading = false;
}
@Override
public void next(int count) {
next(count, null);
}
@Override
public synchronized void next(int count, Scheduler scheduler) {
if (isLoading())
throw new IllegalStateException("you can't call next() before onNext()");
if(hasCompleted())
throw new IllegalStateException("you can't call next() after onCompleted()");
loading = true;
Observable<List<T>> obs = onNextPage(offset, count).single();
if(scheduler != null)
obs = obs.subscribeOn(scheduler); // BEWARE! onNext/onError/onComplete will happens on that scheduler!
subscription = obs.subscribe(this::onNext, this::onError, this::onComplete);
}
@Override
public Observable<List<T>> asObservable() {
return subject.asObservable();
}
@Override
public boolean hasCompleted() {
return subject.hasCompleted();
}
@Override
public int position() {
return offset;
}
/* Warning: functions below may be called from another thread */
protected synchronized void onNext(List<T> items) {
if (items != null)
offset += items.size();
loading = false;
if (items == null || items.size() == 0)
subject.onCompleted();
else
subject.onNext(items);
}
protected synchronized void onError(Throwable t) {
loading = false;
subject.onError(t);
}
protected synchronized void onComplete() {
loading = false;
}
abstract protected Observable<List<T>> onNextPage(int offset, int count);
}
这是处理响应式分页的几种可能方法之一。假设我们有一个方法 getNextPageTrigger
,当滚动侦听器(或任何输入)想要加载新页面时,returns 和 Observable
会发出一些事件对象。在现实生活中,它可能会有 debounce
运算符,但除此之外,我们将确保仅在加载最新页面后触发它。
我们还定义了一种方法来从列表中解包消息:
Observable<Message> getPage(final int page) {
return service.getMessages(page * PAGE_SIZE, PAGE_SIZE)
.flatMap(messageList -> Observable.from(messageList));
}
然后我们可以进行实际的抓取逻辑:
// Start with the first page.
getPage(0)
// Add on each incremental future page.
.concatWith(Observable.range(1, Integer.MAX_VALUE)
// Uses a little trick to get the next page to wait for a signal to load.
// By ignoring all actual elements emitted and casting, the trigger must
// complete before the actual page request will be made.
.concatMap(page -> getNextPageTrigger().limit(1)
.ignoreElements()
.cast(Message.class)
.concatWith(getPage(page))) // Then subscribe, etc..
这仍然遗漏了一些可能很重要的事情:
1 - 这显然不知道何时停止获取额外的页面,这意味着一旦它到达终点,取决于服务器 returns,它可能每次都不断遇到错误或空结果滚动被触发。解决此问题的方法取决于您如何向客户端发出没有更多页面可加载的信号。
2 - 如果您需要错误重试,我建议查看 retryWhen
运算符。否则,常见的网络错误可能会导致页面加载错误传播。