使用 rxjs5 在打字稿中构建无限滚动列表

Building an infinite scrolling list in typescript with rxjs5

我正在尝试使用 TypeScript 和 rxjs 构建一个无限滚动列表。也就是说,我希望应用程序从后端获取几页结果,然后在用户滚动到底部附近时获取更多结果。

我有一个使用 Observable.prototype.expand() 构建的 Observable,它将给我所有结果,最终从服务器获取所有页面。但是由于 Observable 的性质,我不能暂停这个过程。一旦我订阅,它不可避免地会尽快获取所有结果。我需要一个不同的解决方案,在那里我可以以我需要的速度从结果流中提取。

事情变得更加复杂,因为我无法直接从 API 获取第二页,每一页都包含我获取下一页所需的信息。回复如下所示:

interface GraphApiResponse {
    data?: any[];
    paging?: {
        cursors: {
            before: string,
            after: string
        },
        next?: string,
        previous?: string
    };
}

paging.next 的存在表示还有另一个页面,paging.cursors.after 用于实际检索它。

我似乎无法弄清楚如何在不弄乱的情况下实现它。然而,无限列表似乎是一个常见的问题,不太可能没有好的解决方案。我应该如何着手实施它,而又不会把事情搞得一团糟?

我尝试过的东西

可迭代的 Promises

我的第一个想法是使用 Promises 的 Iterable,但是我不知道我会得到多少结果,迫使我构建一个无限的 Iterable<Promise<Response?>>,其 Promises 将在之后全部解析为 undefined某一点。但是,由于它是无限的,我无法正常迭代它(它会用 Promises 填充整个可用内存),实际上在结果处于那种形式时使用结果意味着在前一个的 resolve 函数中获得每个 Promise。

这个解决方案似乎可行,但我写的每一行都变得更难读且更复杂。

将其与行为主体合并

在谷歌搜索这个问题时我发现 as well as a GitHub issue on rxjs backpressure,两者都包含 Ben Lesh 的代码片段,显然可以用来给 Observable 添加背压,遗憾的是,无论我尝试什么,我都无法得到source Observable 发射它的值比它生成它们的速度慢,它们总是在某个地方被缓冲,这意味着无论如何网络请求都会发生。

来自GitHub:

// this behavior subject is basically your "give me the next batch" mechanism.
// in this example, we're going to make 5 async requests back to back before requesting more.
const BATCH_SIZE = 5;
const requests = new BehaviorSubject(BATCH_SIZE); // start by requesting five items

// for every request, pump out a stream of events that represent how many you have left to fulfill
requests.flatMap((count) => Observable.range(0, count).map(n => count - n - 1))
  // then concat map that into an observable of what you want to control with backpressure
  // you might have some parameterization here you need to handle, this example is simplified
  // handle side effects with a `do` block
  .concatMap(() => getSomeObservableOfDataHere().do(stuffWithIt), (remaining) => remaining)
  // narrow it down to when there are no more left to request,
  // and pump another batch request into the BehaviorSubject
  .filter(remaining => remaining === 0)
  .mapTo(BATCH_SIZE)
  .subscribe(requests);

来自 Whosebug:

// start with 5 values
const controller = new Rx.BehaviorSubject(5);

// some observable source, in this case, an interval.
const source = Rx.Observable.interval(100)

const controlled = controller.flatMap(
      // map your count into a set of values
      (count) => source.take(count), 
      // additional mapping for metadata about when the block is done
      (count, value, _, index) => {
        return { value: value, done: count - index === 1 }; 
      })
      // when the block is done, request 5 more.
      .do(({done}) => done && controller.next(5))
      // we only care about the value for output
      .map(({value}) => value);


// start our subscription
controlled.subscribe(x => {
  console.log(x)
});

我可能错了,但在我看来,一旦我订阅了一个 Observable,它就会尽可能快地产生它的值,没有办法减慢它的速度,所以这可能不是一个解决方案。

使用 ixjs

似乎 ixjs is meant to be a solution to my problem, however that repository has not been updated in a long time. There apparently is a reimplementation in TypeScript,但是这似乎处于开发初期并且没有很好地记录 jet。

我宁愿不依赖一个很少有人使用的框架来解决一个实际上非常简单的问题。

重构应用程序

我在网上搜索了 TypeScript 中无限滚动列表的实现(Angular)。我当前的方法是拥有一个服务,该服务提供一个对象,可用于获取所有结果。然后我有一个显示它们的组件。备选方案似乎是 doing the checking for scroll position right in the service that queries the backend, or having the component fetch a new Observable from the backend service when the user scrolls.

这两种解决方案都会迫使我混合使用目前整齐分开的代码。我更希望有服务 return 的东西,我可以直接输入组件,而组件不必知道网络请求,或者服务不必知道滚动位置。

根本就不要使用 expand,因为那样的话您将被迫使用自定义调度程序,这会不必要地复杂化。

您可以简单地构建一个 Subject,您可以在其上分派异步映射到新结果的滚动操作。这是这个想法的粗略草图(w/o 假设一个框架,甚至 运行 通过打字稿编译器):

class InfiniteScroll<T> {
   private subject = new Rx.Subject<{}>();
   readonly data = subject
     .asObservable()
     .flatmap(_ => this.getMoreData())      

   constructor(private getMoreData: () => Promise<T[]>) { }

   onScrollDown() {
     subject.next({});
   }
}

了解一些 cycle.js,您可能会受益匪浅。

我建议您改用 mergeScan 运算符。看起来它可能很适合这里。

MergeScan 类似于 expand 运算符,因为它将前一个请求的数据作为累加器返回,但与 expand 不同的是它不会继续 运行 直到时间结束。

基本上假设您有一个接受请求的函数 makeRequest(params) 和一个 returns 一个最终解析为响应的 Observable 和一个表示滚动事件的流,我们将调用 fetchMore$,你可以像这样创建一个按需获取服务:

// This is abstracted as a simple "fetch" concept but in reality should
// be hooked up to your scroll handler, properly debounced etc.
this.fetchMore$
  .mergeScan(
    // Make the request
    (acc, _) => makeRequest(acc.paging.next ? acc.paging.cursors.after : ''),
    {paging: {}}, // Initial request body
    1 // Maximum concurrency, i.e. how many requests can be in flight at once 
  )
  .pluck('data')
  .subscribe(data => {/*Do something with the data*/});

我将并发设置为 1,因为虽然您可能有多个请求正在运行,但目前无法保证顺序,因此如果用户滚动得非常快,结果可能是 acc 不同步,而并发性为 1 时,数据将始终有序。