RxJS Observable with Subject,通过计时器轮询和 combineLatest 不会触发

RxJS Observable with Subject, polling via timer and combineLatest does not fire

我写了一个函数来对 API 进行轮询,它也可以进行分页。为此,分页是使用 Subject Observable 完成的,轮询是使用计时器方法完成的(我也尝试了间隔,结果相同)。

这是我的代码:

  getItems(pagination: Subject<Pagination>): Observable<ListResult<Item>> {
    let params: URLSearchParams = new URLSearchParams();

    return Observable
      .timer(0, 5000)
      .combineLatest(
        pagination,
        (timer, pagination) => pagination
      )
      .startWith({offset: 0, limit: 3})
      .switchMap(pagination => {
        params.set('skip', pagination.offset.toString());
        params.set('limit', pagination.limit.toString());
        return this.authHttp.get(`${environment.apiBase}/items`, {search: params})
      })
      .map(response => response.json() as ListResult<Item>)
      .catch(this.handleError);
  }

预期的行为是: HTTP 请求每 5 秒以及当用户更改页面时触发。

事情是这样的: 第一个 HTTP 请求被触发,但是在使用分页之前没有其他请求被发送到服务器。 第一次使用分页后,轮询也开始工作了。

这是我第一次使用 Observables,所以我很确定我错过了一些东西,但我看不出它可能是什么。

我也尝试过这种方法(可能是在 startWith 中缺少定时器计数器),但它并没有改变任何东西。

[...]
  .combineLatest(
    pagination
  )
  .startWith([0, {offset: 0, limit: 3}])
[...]

确保您遵循以下步骤

正在创建可保存的数据

你有一个 observable verable 并且你正在通过 observable.next(value);

推送数据

获取主题或 behourSubject 中的数据

在您想要获取数据的地方声明一个 Subject 类型变量并通过订阅它来访问它。

private val: Subject = YourObservable;
private uiValue: string;

val.sunscribe((val) => {
   uiVal = val;
});

现在您可以在任何地方使用 uiVal,它会在每个时间间隔内更新。

您可以通过 console.log 来分别测试这两个部分,以确保一切正常 file

combineLatest() 运算符要求所有源 Observable 至少发出一项。

您的演示只发出一个请求,因为您使用的是 .startWith()combineLatest() 永远不会发出,因为 pagination 是一个 Subject,它可能永远不会发出任何项目。

所以一个选择是移动 .startWith():

.combineLatest(
  pagination.startWith({offset: 0, limit: 3}),
  (timer, pagination) => pagination
)

但也许这对您没有多大帮助,因为您忽略了来自 timer() 的所有项目,而您只使用了 pagination。因此,也许您可​​以仅使用 merge() 从其中一个来源刷新列表。然后 timer() 独立增加偏移量。

Observable
  .timer(0, 1000)
  .map(i => { return {offset: i*3, limit: 3}})
  .merge(pagination.startWith({offset: 0, limit: 3}))
  .switchMap(pagination => {
    return Observable.of(pagination).delay(200)
  })
  .subscribe(val => console.log(val));