RxJs 不确定的可观察调用

RxJs indefinite observable calls

我需要从服务器检索无限量的数据。那应该按以下方式进行:

  1. 发送初始请求
  2. 取回一部分数据,告诉服务器一切正常,我可以获取更多数据
  3. 重复步骤 2 和 3,直到我收到一个特定值,这意味着没有更多数据

我如何使用可观察对象来做到这一点?

现在我只能想到使用一些函数的递归可观察调用。

const send = execSend() {
    this.send(message).subscribe(resp => {
        if (resp === 'end') {
            subscriber.next(byteArr.join(''));
            console.log('finished');
            subscriber.complete();
        } else {
            byteArr.push(resp);
            execSend();
        }
    });
}();

像这样:

let todo = true;
interval(100).pipe(
    takeWhile(()=>todo),
    concatMap(()=>getStuff())
).subscribe(data => {
  todo = !isFinished(data);
});

未测试,但您可以试试这个模式

exec=()=>http.get(....) 

exec().pipe(
  expand((resp)=>exec()),
  takeWhile(resp=>resp !== 'end'),
  scan((acc,curr)=>acc.concat(curr),[])
).subscribe()

以下应该复制您的想法:1 个事件在完成前发出。

import { of } from 'rxjs';
import { expand, takeWhile, reduce } from 'rxjs/operators';

let count = 0;
const FINISH = "finished";
const limit = 5;
const send$ = () => of(count++ < limit ? "sent" : FINISH);

const expander$ = send$().pipe(
  expand(resp => send$()),
  takeWhile(resp => resp !== FINISH),
  reduce((acc, val) => acc ? acc + val : val, null)
);

const subscribe = expander$.subscribe(console.log);

您可以在 this blitz

中看到它正在运行