RxJs:从服务器请求列表,消耗值,当我们几乎没有值时重新请求

RxJs: request list from server, consume values, re-request when we're almost out of values

我正在从 REST api 中获取项目列表。用户通过单击与每个人进行交互,当只有几个未使用时,我想重复请求以获取更多项目。我正在尝试使用适当的 RxJs (5) 面向流的方法来做到这一点。

所以,像这样:

var userClick$ = Observable.fromEvent(button.nativeElement, 'click');

var needToExtend$ = new BehaviorSubject(1);

var list$ = needToExtend$
            .flatMap( () => this.http.get("http://myserver/get-list") )
            .flatMap( x => x['list'] );

var itemsUsed$ = userClick$.zip(list$, (click, item) => item);
itemsUsed$.subscribe( item => use(item) );

然后,在必要时触发重新加载:

list$.subscribe(
    if (list$.isEmpty()) {
        needToExtend$.next(1);
    }
)

这最后一点是错误的,手动重新触发似乎不是很好 "stream-oriented" 即使它确实按预期工作。有什么想法吗?

这类似于,但我无法对API返回的列表的长度做出假设,我想在列表完整之前重新请求消费。而且那里的解决方案感觉有点太聪明了。一定有更易读的方式吧?

这样的事情怎么样:

const LIST_LIMIT = 3;
userClick$ = Observable.fromEvent(button.nativeElement, 'click');
list$ = this.http.get("http://myserver/get-list").map(r => r.list);

clickCounter$ = this.userClick$.scan((acc: number, val) => acc + 1, 0);

getList$ = new BehaviorSubject([]);

this.getList$
    .switchMap(previousList => this.list$)
    .switchMap(list => this.clickCounter$, (list, clickCount) => { return {list, clickCount}; })
    .filter(({list, clickCount}) => clickCount >= list.length - LIST_LIMIT)
    .map(({list, clickCount}) => list)
    .subscribe(this.getList$);

这里的逻辑如果你定义一个列表 getter 流,以及一个触发它的信号。

首先,信号导致 switchMap 获取一个新列表,然后将其馈送到另一个重新订阅点击计数器的 switchmap。您将两个流的结果组合起来并将其提供给过滤器,它仅在点击次数大于或等于列表长度减去 3(或任何您想要的)时发出。然后信号被订阅到整个流,以便它自己重新触发。

编辑:最大的弱点是您需要在副作用而不是订阅或异步管道中设置列表值(用于显示)。您可以重新排列它并进行多播:

const LIST_LIMIT = 3;
userClick$ = Observable.fromEvent(button.nativeElement, 'click');
list$ = this.http.get("http://myserver/get-list").map(r => r.list);

clickCounter$: Observable<number> = this.userClick$.scan((acc: number, val) => acc + 1, 0).startWith(0);

getList$ = new BehaviorSubject([]);

refresh$ = this.getList$
        .switchMap(list => this.clickCounter$
                               .filter(clickCount => list.length <= clickCount + LIST_LIMIT)
                               .first(), 
            (list, clickCount) => list)
        .switchMap(previousList => this.list$)
        .multicast(() => this.getList$);

this.refresh$.connect();
this.refresh$.subscribe(e => console.log(e));

这种方式有一些优点,但可能会少一些"readable"。这些部分大部分是相同的,但是您先去柜台,然后切换到列表提取。然后你多播它以重新启动计数器。

我不清楚你是如何跟踪获取下一组项目的,所以我假设这是某种形式的寻呼来回答我的问题。我还假设您不知道项目的总数。

console.clear();
const pageSize = 5;
const pageBuffer = 2;
const data = [...Array(17).keys()]

function getData(page) {
  const begin = pageSize * page
  const end = begin + pageSize;
 return Rx.Observable.of(data.slice(begin, end));
}

const clicks = Rx.Observable.interval(400);

clicks
  .scan(count => ++count, 0)
  .do(() => console.log('click'))
  .map(count => {
    const page = Math.floor(count / pageSize) + 1;
    const total = page * pageSize;
    return { total, page, count }
  })
  .filter(x => x.total - pageBuffer === x.count)
  .startWith({ page: 0 })
  .switchMap(x => getData(x.page))
  .takeWhile(x => x.length > 0)
  .subscribe(
    x => { console.log('next: ', x); },
    x => { console.log('error: ', x); },
    () => { console.log('completed'); }
  );
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.3/Rx.min.js"></script>

解释如下:

  • Rx.Observable.interval(#):模拟客户端点击事件
  • .scan(...): 累计点击事件
  • .map(...):计算页面索引和潜在的总项目数(实际数量可能更少,但对我们的目的来说并不重要
  • .filter(...): 只有刚命中页面缓冲区才允许通过获取新的页面数据。
  • .startWith(...):获取首页,无需等待点击。 .scan 中页面计算的 +1 说明了这一点。
  • .switchMap(...):获取下一页数据。
  • .takeWhile(...): 保持流打开直到我们得到一个空列表。

因此它将获得一个初始页面,然后在点击次数进入指定缓冲区时获得一个新页面。一旦检索到所有项目(已知为空列表),它将完成。

有一件事我没有弄清楚如何在页面长度小于页面大小时完成列表。不确定这对您是否重要。