重复请求直到满足条件和 return 个中间值

Repeat request until condition is met and return intermediate values

我正在尝试编写一个(通用)函数 run<ID, ENTITY>(…): Observable<ENTITY>,它采用以下参数:

所以 run 应该执行 init,然后等待 initialDelay 秒。从现在开始它应该 运行 statusrepeatDelay 秒直到 repeat() returns false.

但是,有两件重要的事情需要解决:


除了我提到的最后一件事,下面的(不是很漂亮)版本做了所有的事情:它在重试之前不等待网络响应status

run<ID, ENTITY>(…): Observable<ENTITY> {
    let finished = false;
    return init().mergeMap(id => {
        return Observable.timer(initialDelay, repeatDelay)
            .switchMap(() => {
                if (finished) return Observable.of(null);
                return status(id);
            })
            .takeWhile(response => {
                if (repeat(response)) return true;
                if (finished) return false;

                finished = true;
                return true;
            });
    });
}

我的第二个版本是这个,除了一个细节之外,它再次适用于所有细节:不会发出 status 调用的中间值,但我确实需要它们在调用者中显示进度:

run<ID, ENTITY>(…): Observable<ENTITY> {
    const loop = id => {
        return status(id).switchMap(response => {
            return repeat(response)
                ? Observable.timer(repeatDelay).switchMap(() => loop(id))
                : Observable.of(response);
        });
    };

    return init()
        .mergeMap(id => Observable.timer(initialDelay).switchMap(() => loop(id)));
}

不可否认,后一个也有点笨拙。我确信 rxjs 可以以更简洁的方式解决这个问题(更重要的是,完全解决它),但我似乎无法弄清楚如何。

repeatWhen 运算符本身看起来很诱人,但它只提供一个无效的 onComplete 通知流,因此您不能在没有外部帮助的情况下根据值决定重复。在这里,BehaviorSubject 可能是您最好的选择:

function run(/* ... */): Observable<ENTITY> {
  const last_value$ = new BehaviorSubject();
  const delayed$ = init()
    .delay(initialDelay)
    .flatMap(id => 
      status(id).repeatWhen(completions => 
         completions.delay(repeatDelay)
                    .takeWhile(_ => repeat(last_value$.getValue()))
      )
    ).share();
  delayed$.subscribe(last_value$);
  return delayed$;
}

此处,repeatWhen仅当上次请求的最后一个值是表示重复的状态时才重新订阅请求的冷源。

Try the fiddle.

警告:当通知程序(传递给 repeatWhen 的函数)执行和 BehaviorSubject 之间的 repeatDelay 较小时,可能存在竞争条件风险收到相应的状态。对于给定的观察者,我们保证所有 onNext 通知都发生在 onCompleted 通知之前,但这里我们的 BehaviorSubjectrepeatWhen 是分开的。乍一看 the sourceonNext 通知似乎直接通过 repeatWhen 传递。我怀疑 concat也是如此,但我不确定。

所以我想出了这个似乎可行的方法。它使用 expand 创建无限重试序列,并使用 takeWhile 确定需要多长时间。

可以通过编写带有谓词函数的自定义运算符 takeUntil 来删除 takeWhile 黑客攻击。这目前不存在,请参阅 rxjs#2420

Fiddle: https://jsfiddle.net/2mhcvnog/1/

run<ID, ENTITY>(...): Observable<ENTITY> {
    /* This little hack is needed to also emit the final item in the takeWhile() loop. */
    let finished = false;

    const delayStatus = (id, delay) => {
        return Observable.of(null)
            .delay(delay)
            .switchMap(() => status(id))
            .map(status => [id, status]);
    };

    return init()
        .mergeMap(id => delayStatus(id, initialDelay))
        .expand(([id]) => {
            if (finished) {
                return Observable.of([id, null]);
            }

            return delayStatus(id, repeatDelay);
        })
        .takeWhile(([_, status]) => {
            if (repeat(status)) {
                return true;
            }

            if (finished) {
                return false;
            }

            finished = true;
            return true;
        })
        .map(([_, status]) => status);
}

Update:Observable 原生支持 expand 的递归,也显示在@IngoBürk 的回答中。这让我们可以更简洁地编写递归:

function run<ENTITY>(/* ... */): Observable<ENTITY> {
  return init().delay(initialDelay).flatMap(id =>
    status(id).expand(s => 
      repeat(s) ? Observable.of(null).delay(repeatDelay).flatMap(_ => status(id)) : Observable.empty()
    )
  )
}

Fiddle.


如果递归可以接受,那么你还可以做的更简洁:

function run(/* ... */): Observable<ENTITY> {
  function recurse(id: number): Observable<ENTITY> {
    const status$ = status(id).share();
    const tail$ = status$.delay(repeatDelay)
                         .flatMap(status => repeat(status) ? recurse(id, repeatDelay) : Observable.empty());
    return status$.merge(tail$);
  }
  return init().delay(initialDelay).flatMap(id => recurse(id));
}

尝试the fiddle