使用 rxjs5 在打字稿中构建无限滚动列表
Building an infinite scrolling list in typescript with rxjs5
我正在尝试使用 TypeScript 和 rxjs 构建一个无限滚动列表。也就是说,我希望应用程序从后端获取几页结果,然后在用户滚动到底部附近时获取更多结果。
我有一个使用 Observable.prototype.expand()
构建的 Observable,它将给我所有结果,最终从服务器获取所有页面。但是由于 Observable 的性质,我不能暂停这个过程。一旦我订阅,它不可避免地会尽快获取所有结果。我需要一个不同的解决方案,在那里我可以以我需要的速度从结果流中提取。
事情变得更加复杂,因为我无法直接从 API 获取第二页,每一页都包含我获取下一页所需的信息。回复如下所示:
interface GraphApiResponse {
data?: any[];
paging?: {
cursors: {
before: string,
after: string
next?: string,
previous?: string
可迭代的 Promises
我的第一个想法是使用 Promises 的 Iterable,但是我不知道我会得到多少结果,迫使我构建一个无限的 Iterable<Promise<Response?>>
,其 Promises 将在之后全部解析为 undefined
某一点。但是,由于它是无限的,我无法正常迭代它(它会用 Promises 填充整个可用内存),实际上在结果处于那种形式时使用结果意味着在前一个的 resolve 函数中获得每个 Promise。
在谷歌搜索这个问题时我发现 as well as a GitHub issue on rxjs backpressure,两者都包含 Ben Lesh 的代码片段,显然可以用来给 Observable 添加背压,遗憾的是,无论我尝试什么,我都无法得到source Observable 发射它的值比它生成它们的速度慢,它们总是在某个地方被缓冲,这意味着无论如何网络请求都会发生。
// this behavior subject is basically your "give me the next batch" mechanism.
// in this example, we're going to make 5 async requests back to back before requesting more.
const BATCH_SIZE = 5;
const requests = new BehaviorSubject(BATCH_SIZE); // start by requesting five items
// for every request, pump out a stream of events that represent how many you have left to fulfill
requests.flatMap((count) => Observable.range(0, count).map(n => count - n - 1))
// then concat map that into an observable of what you want to control with backpressure
// you might have some parameterization here you need to handle, this example is simplified
// handle side effects with a `do` block
.concatMap(() => getSomeObservableOfDataHere().do(stuffWithIt), (remaining) => remaining)
// narrow it down to when there are no more left to request,
// and pump another batch request into the BehaviorSubject
.filter(remaining => remaining === 0)
来自 Whosebug:
// start with 5 values
const controller = new Rx.BehaviorSubject(5);
// some observable source, in this case, an interval.
const source = Rx.Observable.interval(100)
const controlled = controller.flatMap(
// map your count into a set of values
(count) => source.take(count),
// additional mapping for metadata about when the block is done
(count, value, _, index) => {
return { value: value, done: count - index === 1 };
// when the block is done, request 5 more.
.do(({done}) => done && controller.next(5))
// we only care about the value for output
.map(({value}) => value);
// start our subscription
controlled.subscribe(x => {
我可能错了,但在我看来,一旦我订阅了一个 Observable,它就会尽可能快地产生它的值,没有办法减慢它的速度,所以这可能不是一个解决方案。
使用 ixjs
似乎 ixjs is meant to be a solution to my problem, however that repository has not been updated in a long time. There apparently is a reimplementation in TypeScript,但是这似乎处于开发初期并且没有很好地记录 jet。
我在网上搜索了 TypeScript 中无限滚动列表的实现(Angular)。我当前的方法是拥有一个服务,该服务提供一个对象,可用于获取所有结果。然后我有一个显示它们的组件。备选方案似乎是 doing the checking for scroll position right in the service that queries the backend, or having the component fetch a new Observable from the backend service when the user scrolls.
这两种解决方案都会迫使我混合使用目前整齐分开的代码。我更希望有服务 return 的东西,我可以直接输入组件,而组件不必知道网络请求,或者服务不必知道滚动位置。
根本就不要使用 expand
您可以简单地构建一个 Subject
,您可以在其上分派异步映射到新结果的滚动操作。这是这个想法的粗略草图(w/o 假设一个框架,甚至 运行 通过打字稿编译器):
class InfiniteScroll<T> {
private subject = new Rx.Subject<{}>();
readonly data = subject
.flatmap(_ => this.getMoreData())
constructor(private getMoreData: () => Promise<T[]>) { }
onScrollDown() {
了解一些 cycle.js
我建议您改用 mergeScan
类似于 expand
运算符,因为它将前一个请求的数据作为累加器返回,但与 expand
不同的是它不会继续 运行 直到时间结束。
基本上假设您有一个接受请求的函数 makeRequest(params)
和一个 returns 一个最终解析为响应的 Observable
和一个表示滚动事件的流,我们将调用 fetchMore$
// This is abstracted as a simple "fetch" concept but in reality should
// be hooked up to your scroll handler, properly debounced etc.
// Make the request
(acc, _) => makeRequest(acc.paging.next ? acc.paging.cursors.after : ''),
{paging: {}}, // Initial request body
1 // Maximum concurrency, i.e. how many requests can be in flight at once
.subscribe(data => {/*Do something with the data*/});
我将并发设置为 1,因为虽然您可能有多个请求正在运行,但目前无法保证顺序,因此如果用户滚动得非常快,结果可能是 acc 不同步,而并发性为 1 时,数据将始终有序。
我正在尝试使用 TypeScript 和 rxjs 构建一个无限滚动列表。也就是说,我希望应用程序从后端获取几页结果,然后在用户滚动到底部附近时获取更多结果。
我有一个使用 Observable.prototype.expand()
构建的 Observable,它将给我所有结果,最终从服务器获取所有页面。但是由于 Observable 的性质,我不能暂停这个过程。一旦我订阅,它不可避免地会尽快获取所有结果。我需要一个不同的解决方案,在那里我可以以我需要的速度从结果流中提取。
事情变得更加复杂,因为我无法直接从 API 获取第二页,每一页都包含我获取下一页所需的信息。回复如下所示:
interface GraphApiResponse {
data?: any[];
paging?: {
cursors: {
before: string,
after: string
next?: string,
previous?: string
可迭代的 Promises
我的第一个想法是使用 Promises 的 Iterable,但是我不知道我会得到多少结果,迫使我构建一个无限的 Iterable<Promise<Response?>>
,其 Promises 将在之后全部解析为 undefined
某一点。但是,由于它是无限的,我无法正常迭代它(它会用 Promises 填充整个可用内存),实际上在结果处于那种形式时使用结果意味着在前一个的 resolve 函数中获得每个 Promise。
// this behavior subject is basically your "give me the next batch" mechanism.
// in this example, we're going to make 5 async requests back to back before requesting more.
const BATCH_SIZE = 5;
const requests = new BehaviorSubject(BATCH_SIZE); // start by requesting five items
// for every request, pump out a stream of events that represent how many you have left to fulfill
requests.flatMap((count) => Observable.range(0, count).map(n => count - n - 1))
// then concat map that into an observable of what you want to control with backpressure
// you might have some parameterization here you need to handle, this example is simplified
// handle side effects with a `do` block
.concatMap(() => getSomeObservableOfDataHere().do(stuffWithIt), (remaining) => remaining)
// narrow it down to when there are no more left to request,
// and pump another batch request into the BehaviorSubject
.filter(remaining => remaining === 0)
来自 Whosebug:
// start with 5 values
const controller = new Rx.BehaviorSubject(5);
// some observable source, in this case, an interval.
const source = Rx.Observable.interval(100)
const controlled = controller.flatMap(
// map your count into a set of values
(count) => source.take(count),
// additional mapping for metadata about when the block is done
(count, value, _, index) => {
return { value: value, done: count - index === 1 };
// when the block is done, request 5 more.
.do(({done}) => done && controller.next(5))
// we only care about the value for output
.map(({value}) => value);
// start our subscription
controlled.subscribe(x => {
我可能错了,但在我看来,一旦我订阅了一个 Observable,它就会尽可能快地产生它的值,没有办法减慢它的速度,所以这可能不是一个解决方案。
使用 ixjs
似乎 ixjs is meant to be a solution to my problem, however that repository has not been updated in a long time. There apparently is a reimplementation in TypeScript,但是这似乎处于开发初期并且没有很好地记录 jet。
我在网上搜索了 TypeScript 中无限滚动列表的实现(Angular)。我当前的方法是拥有一个服务,该服务提供一个对象,可用于获取所有结果。然后我有一个显示它们的组件。备选方案似乎是 doing the checking for scroll position right in the service that queries the backend, or having the component fetch a new Observable from the backend service when the user scrolls.
这两种解决方案都会迫使我混合使用目前整齐分开的代码。我更希望有服务 return 的东西,我可以直接输入组件,而组件不必知道网络请求,或者服务不必知道滚动位置。
根本就不要使用 expand
您可以简单地构建一个 Subject
,您可以在其上分派异步映射到新结果的滚动操作。这是这个想法的粗略草图(w/o 假设一个框架,甚至 运行 通过打字稿编译器):
class InfiniteScroll<T> {
private subject = new Rx.Subject<{}>();
readonly data = subject
.flatmap(_ => this.getMoreData())
constructor(private getMoreData: () => Promise<T[]>) { }
onScrollDown() {
了解一些 cycle.js
我建议您改用 mergeScan
类似于 expand
运算符,因为它将前一个请求的数据作为累加器返回,但与 expand
不同的是它不会继续 运行 直到时间结束。
基本上假设您有一个接受请求的函数 makeRequest(params)
和一个 returns 一个最终解析为响应的 Observable
和一个表示滚动事件的流,我们将调用 fetchMore$
// This is abstracted as a simple "fetch" concept but in reality should
// be hooked up to your scroll handler, properly debounced etc.
// Make the request
(acc, _) => makeRequest(acc.paging.next ? acc.paging.cursors.after : ''),
{paging: {}}, // Initial request body
1 // Maximum concurrency, i.e. how many requests can be in flight at once
.subscribe(data => {/*Do something with the data*/});
我将并发设置为 1,因为虽然您可能有多个请求正在运行,但目前无法保证顺序,因此如果用户滚动得非常快,结果可能是 acc 不同步,而并发性为 1 时,数据将始终有序。