RxJs 不确定的可观察调用
RxJs indefinite observable calls
我需要从服务器检索无限量的数据。那应该按以下方式进行:
- 发送初始请求
- 取回一部分数据,告诉服务器一切正常,我可以获取更多数据
- 重复步骤 2 和 3,直到我收到一个特定值,这意味着没有更多数据
我如何使用可观察对象来做到这一点?
现在我只能想到使用一些函数的递归可观察调用。
const send = execSend() {
this.send(message).subscribe(resp => {
if (resp === 'end') {
subscriber.next(byteArr.join(''));
console.log('finished');
subscriber.complete();
} else {
byteArr.push(resp);
execSend();
}
});
}();
像这样:
let todo = true;
interval(100).pipe(
takeWhile(()=>todo),
concatMap(()=>getStuff())
).subscribe(data => {
todo = !isFinished(data);
});
未测试,但您可以试试这个模式
exec=()=>http.get(....)
exec().pipe(
expand((resp)=>exec()),
takeWhile(resp=>resp !== 'end'),
scan((acc,curr)=>acc.concat(curr),[])
).subscribe()
以下应该复制您的想法:1 个事件在完成前发出。
import { of } from 'rxjs';
import { expand, takeWhile, reduce } from 'rxjs/operators';
let count = 0;
const FINISH = "finished";
const limit = 5;
const send$ = () => of(count++ < limit ? "sent" : FINISH);
const expander$ = send$().pipe(
expand(resp => send$()),
takeWhile(resp => resp !== FINISH),
reduce((acc, val) => acc ? acc + val : val, null)
);
const subscribe = expander$.subscribe(console.log);
您可以在 this blitz
中看到它正在运行
我需要从服务器检索无限量的数据。那应该按以下方式进行:
- 发送初始请求
- 取回一部分数据,告诉服务器一切正常,我可以获取更多数据
- 重复步骤 2 和 3,直到我收到一个特定值,这意味着没有更多数据
我如何使用可观察对象来做到这一点?
现在我只能想到使用一些函数的递归可观察调用。
const send = execSend() {
this.send(message).subscribe(resp => {
if (resp === 'end') {
subscriber.next(byteArr.join(''));
console.log('finished');
subscriber.complete();
} else {
byteArr.push(resp);
execSend();
}
});
}();
像这样:
let todo = true;
interval(100).pipe(
takeWhile(()=>todo),
concatMap(()=>getStuff())
).subscribe(data => {
todo = !isFinished(data);
});
未测试,但您可以试试这个模式
exec=()=>http.get(....)
exec().pipe(
expand((resp)=>exec()),
takeWhile(resp=>resp !== 'end'),
scan((acc,curr)=>acc.concat(curr),[])
).subscribe()
以下应该复制您的想法:1 个事件在完成前发出。
import { of } from 'rxjs';
import { expand, takeWhile, reduce } from 'rxjs/operators';
let count = 0;
const FINISH = "finished";
const limit = 5;
const send$ = () => of(count++ < limit ? "sent" : FINISH);
const expander$ = send$().pipe(
expand(resp => send$()),
takeWhile(resp => resp !== FINISH),
reduce((acc, val) => acc ? acc + val : val, null)
);
const subscribe = expander$.subscribe(console.log);
您可以在 this blitz
中看到它正在运行