递归可观察
Recursive observable
我正在使用 RxJs,我必须建立一个轮询机制来从服务器检索更新。
我需要每秒发出一个请求,解析更新,发出它并记住它的 id,因为我需要它来请求下一组更新,例如 getUpdate(lastId + 1)
。
第一部分很简单,所以我只使用 interval
和 mergeMap
let lastId = 0
const updates = Rx.Observable.interval(1000)
.map(() => lastId)
.mergeMap((offset) => getUpdates(offset + 1))
我正在收集这样的标识符:
updates.pluck('update_id').scan(Math.max, 0).subscribe(val => lastId = val)
但是这个解决方案不是纯反应式的,我正在寻找省略 "global" 变量使用的方法。
如何改进代码,同时仍然能够 return observable 仅包含对调用者的更新?
更新。
getUpdates(id) 的服务器响应如下所示:
[
{ update_id: 1, payload: { ... } },
{ update_id: 3, payload: { ... } },
{ update_id: 2, payload: { ... } }
]
它可能包含任何顺序的 0 到 Infinity 更新
是这样的吗?请注意,这是一个无限流,因为没有条件可以中止;你一个都没给。
// Just returns the ID as the update_id.
const fakeResponse = id => {
return [{ update_id: id }];
};
// Fakes the actual HTTP call with a network delay.
const getUpdates = id => Rx.Observable.of(null).delay(250).map(() => fakeResponse(id));
// Start with update_id = 0, then recursively call with the last
// returned ID incremented by 1.
// The actual emissions on this stream will be the full server responses.
const updates$ = getUpdates(0)
.expand(response => Rx.Observable.of(null)
.delay(1000)
.switchMap(() => {
const highestId = Math.max(...response.map(update => update.update_id));
return getUpdates(highestId + 1);
})
)
updates$.take(5).subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>
要定义流的终止,您可能希望在末尾钩入 switchMap
;使用 response
的任何 属性 有条件地 return Observable.empty()
而不是再次调用 getUpdates
。
我正在使用 RxJs,我必须建立一个轮询机制来从服务器检索更新。
我需要每秒发出一个请求,解析更新,发出它并记住它的 id,因为我需要它来请求下一组更新,例如 getUpdate(lastId + 1)
。
第一部分很简单,所以我只使用 interval
和 mergeMap
let lastId = 0
const updates = Rx.Observable.interval(1000)
.map(() => lastId)
.mergeMap((offset) => getUpdates(offset + 1))
我正在收集这样的标识符:
updates.pluck('update_id').scan(Math.max, 0).subscribe(val => lastId = val)
但是这个解决方案不是纯反应式的,我正在寻找省略 "global" 变量使用的方法。
如何改进代码,同时仍然能够 return observable 仅包含对调用者的更新?
更新。
getUpdates(id) 的服务器响应如下所示:
[
{ update_id: 1, payload: { ... } },
{ update_id: 3, payload: { ... } },
{ update_id: 2, payload: { ... } }
]
它可能包含任何顺序的 0 到 Infinity 更新
是这样的吗?请注意,这是一个无限流,因为没有条件可以中止;你一个都没给。
// Just returns the ID as the update_id.
const fakeResponse = id => {
return [{ update_id: id }];
};
// Fakes the actual HTTP call with a network delay.
const getUpdates = id => Rx.Observable.of(null).delay(250).map(() => fakeResponse(id));
// Start with update_id = 0, then recursively call with the last
// returned ID incremented by 1.
// The actual emissions on this stream will be the full server responses.
const updates$ = getUpdates(0)
.expand(response => Rx.Observable.of(null)
.delay(1000)
.switchMap(() => {
const highestId = Math.max(...response.map(update => update.update_id));
return getUpdates(highestId + 1);
})
)
updates$.take(5).subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>
要定义流的终止,您可能希望在末尾钩入 switchMap
;使用 response
的任何 属性 有条件地 return Observable.empty()
而不是再次调用 getUpdates
。