RxJS Observable with Subject,通过计时器轮询和 combineLatest 不会触发
RxJS Observable with Subject, polling via timer and combineLatest does not fire
我写了一个函数来对 API 进行轮询,它也可以进行分页。为此,分页是使用 Subject Observable 完成的,轮询是使用计时器方法完成的(我也尝试了间隔,结果相同)。
这是我的代码:
getItems(pagination: Subject<Pagination>): Observable<ListResult<Item>> {
let params: URLSearchParams = new URLSearchParams();
return Observable
.timer(0, 5000)
.combineLatest(
pagination,
(timer, pagination) => pagination
)
.startWith({offset: 0, limit: 3})
.switchMap(pagination => {
params.set('skip', pagination.offset.toString());
params.set('limit', pagination.limit.toString());
return this.authHttp.get(`${environment.apiBase}/items`, {search: params})
})
.map(response => response.json() as ListResult<Item>)
.catch(this.handleError);
}
预期的行为是:
HTTP 请求每 5 秒以及当用户更改页面时触发。
事情是这样的:
第一个 HTTP 请求被触发,但是在使用分页之前没有其他请求被发送到服务器。
第一次使用分页后,轮询也开始工作了。
这是我第一次使用 Observables,所以我很确定我错过了一些东西,但我看不出它可能是什么。
我也尝试过这种方法(可能是在 startWith 中缺少定时器计数器),但它并没有改变任何东西。
[...]
.combineLatest(
pagination
)
.startWith([0, {offset: 0, limit: 3}])
[...]
确保您遵循以下步骤
正在创建可保存的数据
你有一个 observable verable 并且你正在通过 observable.next(value);
推送数据
获取主题或 behourSubject 中的数据
在您想要获取数据的地方声明一个 Subject 类型变量并通过订阅它来访问它。
private val: Subject = YourObservable;
private uiValue: string;
val.sunscribe((val) => {
uiVal = val;
});
现在您可以在任何地方使用 uiVal,它会在每个时间间隔内更新。
您可以通过 console.log
来分别测试这两个部分,以确保一切正常 file
combineLatest()
运算符要求所有源 Observable 至少发出一项。
您的演示只发出一个请求,因为您使用的是 .startWith()
。 combineLatest()
永远不会发出,因为 pagination
是一个 Subject
,它可能永远不会发出任何项目。
所以一个选择是移动 .startWith()
:
.combineLatest(
pagination.startWith({offset: 0, limit: 3}),
(timer, pagination) => pagination
)
但也许这对您没有多大帮助,因为您忽略了来自 timer()
的所有项目,而您只使用了 pagination
。因此,也许您可以仅使用 merge()
从其中一个来源刷新列表。然后 timer()
独立增加偏移量。
Observable
.timer(0, 1000)
.map(i => { return {offset: i*3, limit: 3}})
.merge(pagination.startWith({offset: 0, limit: 3}))
.switchMap(pagination => {
return Observable.of(pagination).delay(200)
})
.subscribe(val => console.log(val));
我写了一个函数来对 API 进行轮询,它也可以进行分页。为此,分页是使用 Subject Observable 完成的,轮询是使用计时器方法完成的(我也尝试了间隔,结果相同)。
这是我的代码:
getItems(pagination: Subject<Pagination>): Observable<ListResult<Item>> {
let params: URLSearchParams = new URLSearchParams();
return Observable
.timer(0, 5000)
.combineLatest(
pagination,
(timer, pagination) => pagination
)
.startWith({offset: 0, limit: 3})
.switchMap(pagination => {
params.set('skip', pagination.offset.toString());
params.set('limit', pagination.limit.toString());
return this.authHttp.get(`${environment.apiBase}/items`, {search: params})
})
.map(response => response.json() as ListResult<Item>)
.catch(this.handleError);
}
预期的行为是: HTTP 请求每 5 秒以及当用户更改页面时触发。
事情是这样的: 第一个 HTTP 请求被触发,但是在使用分页之前没有其他请求被发送到服务器。 第一次使用分页后,轮询也开始工作了。
这是我第一次使用 Observables,所以我很确定我错过了一些东西,但我看不出它可能是什么。
我也尝试过这种方法(可能是在 startWith 中缺少定时器计数器),但它并没有改变任何东西。
[...]
.combineLatest(
pagination
)
.startWith([0, {offset: 0, limit: 3}])
[...]
确保您遵循以下步骤
正在创建可保存的数据
你有一个 observable verable 并且你正在通过 observable.next(value);
获取主题或 behourSubject 中的数据
在您想要获取数据的地方声明一个 Subject 类型变量并通过订阅它来访问它。
private val: Subject = YourObservable;
private uiValue: string;
val.sunscribe((val) => {
uiVal = val;
});
现在您可以在任何地方使用 uiVal,它会在每个时间间隔内更新。
您可以通过 console.log
来分别测试这两个部分,以确保一切正常 file
combineLatest()
运算符要求所有源 Observable 至少发出一项。
您的演示只发出一个请求,因为您使用的是 .startWith()
。 combineLatest()
永远不会发出,因为 pagination
是一个 Subject
,它可能永远不会发出任何项目。
所以一个选择是移动 .startWith()
:
.combineLatest(
pagination.startWith({offset: 0, limit: 3}),
(timer, pagination) => pagination
)
但也许这对您没有多大帮助,因为您忽略了来自 timer()
的所有项目,而您只使用了 pagination
。因此,也许您可以仅使用 merge()
从其中一个来源刷新列表。然后 timer()
独立增加偏移量。
Observable
.timer(0, 1000)
.map(i => { return {offset: i*3, limit: 3}})
.merge(pagination.startWith({offset: 0, limit: 3}))
.switchMap(pagination => {
return Observable.of(pagination).delay(200)
})
.subscribe(val => console.log(val));