重复请求直到满足条件和 return 个中间值
Repeat request until condition is met and return intermediate values
我正在尝试编写一个(通用)函数 run<ID, ENTITY>(…): Observable<ENTITY>
,它采用以下参数:
- 一个函数
init: () => Observable<ID>
,它是启动后端进程的初始化请求。
- 一个函数
status: (id: ID) => Observable<ENTITY>
,它获取生成的 ID 并在后端查询它的状态。
- 函数
repeat: (status: ENTITY) => boolean
确定是否必须重复 status
请求。
- 两个整数值
initialDelay
和 repeatDelay
。
所以 run
应该执行 init
,然后等待 initialDelay
秒。从现在开始它应该 运行 status
每 repeatDelay
秒直到 repeat()
returns false
.
但是,有两件重要的事情需要解决:
如果 status
比 repeatDelay
花费的时间更长,则 repeatDelay
应该仅在 status
发出其值时才开始计算以避免竞争条件
- 调用
status
发出的中间值也必须发送给调用者。
除了我提到的最后一件事,下面的(不是很漂亮)版本做了所有的事情:它在重试之前不等待网络响应status
。
run<ID, ENTITY>(…): Observable<ENTITY> {
let finished = false;
return init().mergeMap(id => {
return Observable.timer(initialDelay, repeatDelay)
.switchMap(() => {
if (finished) return Observable.of(null);
return status(id);
})
.takeWhile(response => {
if (repeat(response)) return true;
if (finished) return false;
finished = true;
return true;
});
});
}
我的第二个版本是这个,除了一个细节之外,它再次适用于所有细节:不会发出 status
调用的中间值,但我确实需要它们在调用者中显示进度:
run<ID, ENTITY>(…): Observable<ENTITY> {
const loop = id => {
return status(id).switchMap(response => {
return repeat(response)
? Observable.timer(repeatDelay).switchMap(() => loop(id))
: Observable.of(response);
});
};
return init()
.mergeMap(id => Observable.timer(initialDelay).switchMap(() => loop(id)));
}
不可否认,后一个也有点笨拙。我确信 rxjs 可以以更简洁的方式解决这个问题(更重要的是,完全解决它),但我似乎无法弄清楚如何。
repeatWhen
运算符本身看起来很诱人,但它只提供一个无效的 onComplete
通知流,因此您不能在没有外部帮助的情况下根据值决定重复。在这里,BehaviorSubject
可能是您最好的选择:
function run(/* ... */): Observable<ENTITY> {
const last_value$ = new BehaviorSubject();
const delayed$ = init()
.delay(initialDelay)
.flatMap(id =>
status(id).repeatWhen(completions =>
completions.delay(repeatDelay)
.takeWhile(_ => repeat(last_value$.getValue()))
)
).share();
delayed$.subscribe(last_value$);
return delayed$;
}
此处,repeatWhen
仅当上次请求的最后一个值是表示重复的状态时才重新订阅请求的冷源。
警告:当通知程序(传递给 repeatWhen
的函数)执行和 BehaviorSubject 之间的 repeatDelay
较小时,可能存在竞争条件风险收到相应的状态。对于给定的观察者,我们保证所有 onNext
通知都发生在 onCompleted
通知之前,但这里我们的 BehaviorSubject
和 repeatWhen
是分开的。乍一看 the source,onNext
通知似乎直接通过 repeatWhen
传递。我怀疑 concat
也是如此,但我不确定。
所以我想出了这个似乎可行的方法。它使用 expand
创建无限重试序列,并使用 takeWhile
确定需要多长时间。
可以通过编写带有谓词函数的自定义运算符 takeUntil
来删除 takeWhile
黑客攻击。这目前不存在,请参阅 rxjs#2420。
Fiddle: https://jsfiddle.net/2mhcvnog/1/
run<ID, ENTITY>(...): Observable<ENTITY> {
/* This little hack is needed to also emit the final item in the takeWhile() loop. */
let finished = false;
const delayStatus = (id, delay) => {
return Observable.of(null)
.delay(delay)
.switchMap(() => status(id))
.map(status => [id, status]);
};
return init()
.mergeMap(id => delayStatus(id, initialDelay))
.expand(([id]) => {
if (finished) {
return Observable.of([id, null]);
}
return delayStatus(id, repeatDelay);
})
.takeWhile(([_, status]) => {
if (repeat(status)) {
return true;
}
if (finished) {
return false;
}
finished = true;
return true;
})
.map(([_, status]) => status);
}
Update:Observable 原生支持 expand
的递归,也显示在@IngoBürk 的回答中。这让我们可以更简洁地编写递归:
function run<ENTITY>(/* ... */): Observable<ENTITY> {
return init().delay(initialDelay).flatMap(id =>
status(id).expand(s =>
repeat(s) ? Observable.of(null).delay(repeatDelay).flatMap(_ => status(id)) : Observable.empty()
)
)
}
如果递归可以接受,那么你还可以做的更简洁:
function run(/* ... */): Observable<ENTITY> {
function recurse(id: number): Observable<ENTITY> {
const status$ = status(id).share();
const tail$ = status$.delay(repeatDelay)
.flatMap(status => repeat(status) ? recurse(id, repeatDelay) : Observable.empty());
return status$.merge(tail$);
}
return init().delay(initialDelay).flatMap(id => recurse(id));
}
尝试the fiddle。
我正在尝试编写一个(通用)函数 run<ID, ENTITY>(…): Observable<ENTITY>
,它采用以下参数:
- 一个函数
init: () => Observable<ID>
,它是启动后端进程的初始化请求。 - 一个函数
status: (id: ID) => Observable<ENTITY>
,它获取生成的 ID 并在后端查询它的状态。 - 函数
repeat: (status: ENTITY) => boolean
确定是否必须重复status
请求。 - 两个整数值
initialDelay
和repeatDelay
。
所以 run
应该执行 init
,然后等待 initialDelay
秒。从现在开始它应该 运行 status
每 repeatDelay
秒直到 repeat()
returns false
.
但是,有两件重要的事情需要解决:
-
如果
repeatDelay
应该仅在status
发出其值时才开始计算以避免竞争条件- 调用
status
发出的中间值也必须发送给调用者。
status
比 repeatDelay
花费的时间更长,则 除了我提到的最后一件事,下面的(不是很漂亮)版本做了所有的事情:它在重试之前不等待网络响应status
。
run<ID, ENTITY>(…): Observable<ENTITY> {
let finished = false;
return init().mergeMap(id => {
return Observable.timer(initialDelay, repeatDelay)
.switchMap(() => {
if (finished) return Observable.of(null);
return status(id);
})
.takeWhile(response => {
if (repeat(response)) return true;
if (finished) return false;
finished = true;
return true;
});
});
}
我的第二个版本是这个,除了一个细节之外,它再次适用于所有细节:不会发出 status
调用的中间值,但我确实需要它们在调用者中显示进度:
run<ID, ENTITY>(…): Observable<ENTITY> {
const loop = id => {
return status(id).switchMap(response => {
return repeat(response)
? Observable.timer(repeatDelay).switchMap(() => loop(id))
: Observable.of(response);
});
};
return init()
.mergeMap(id => Observable.timer(initialDelay).switchMap(() => loop(id)));
}
不可否认,后一个也有点笨拙。我确信 rxjs 可以以更简洁的方式解决这个问题(更重要的是,完全解决它),但我似乎无法弄清楚如何。
repeatWhen
运算符本身看起来很诱人,但它只提供一个无效的 onComplete
通知流,因此您不能在没有外部帮助的情况下根据值决定重复。在这里,BehaviorSubject
可能是您最好的选择:
function run(/* ... */): Observable<ENTITY> {
const last_value$ = new BehaviorSubject();
const delayed$ = init()
.delay(initialDelay)
.flatMap(id =>
status(id).repeatWhen(completions =>
completions.delay(repeatDelay)
.takeWhile(_ => repeat(last_value$.getValue()))
)
).share();
delayed$.subscribe(last_value$);
return delayed$;
}
此处,repeatWhen
仅当上次请求的最后一个值是表示重复的状态时才重新订阅请求的冷源。
警告:当通知程序(传递给 repeatWhen
的函数)执行和 BehaviorSubject 之间的 repeatDelay
较小时,可能存在竞争条件风险收到相应的状态。对于给定的观察者,我们保证所有 onNext
通知都发生在 onCompleted
通知之前,但这里我们的 BehaviorSubject
和 repeatWhen
是分开的。乍一看 the source,onNext
通知似乎直接通过 repeatWhen
传递。我怀疑 concat
也是如此,但我不确定。
所以我想出了这个似乎可行的方法。它使用 expand
创建无限重试序列,并使用 takeWhile
确定需要多长时间。
可以通过编写带有谓词函数的自定义运算符 takeUntil
来删除 takeWhile
黑客攻击。这目前不存在,请参阅 rxjs#2420。
Fiddle: https://jsfiddle.net/2mhcvnog/1/
run<ID, ENTITY>(...): Observable<ENTITY> {
/* This little hack is needed to also emit the final item in the takeWhile() loop. */
let finished = false;
const delayStatus = (id, delay) => {
return Observable.of(null)
.delay(delay)
.switchMap(() => status(id))
.map(status => [id, status]);
};
return init()
.mergeMap(id => delayStatus(id, initialDelay))
.expand(([id]) => {
if (finished) {
return Observable.of([id, null]);
}
return delayStatus(id, repeatDelay);
})
.takeWhile(([_, status]) => {
if (repeat(status)) {
return true;
}
if (finished) {
return false;
}
finished = true;
return true;
})
.map(([_, status]) => status);
}
Update:Observable 原生支持 expand
的递归,也显示在@IngoBürk 的回答中。这让我们可以更简洁地编写递归:
function run<ENTITY>(/* ... */): Observable<ENTITY> {
return init().delay(initialDelay).flatMap(id =>
status(id).expand(s =>
repeat(s) ? Observable.of(null).delay(repeatDelay).flatMap(_ => status(id)) : Observable.empty()
)
)
}
如果递归可以接受,那么你还可以做的更简洁:
function run(/* ... */): Observable<ENTITY> {
function recurse(id: number): Observable<ENTITY> {
const status$ = status(id).share();
const tail$ = status$.delay(repeatDelay)
.flatMap(status => repeat(status) ? recurse(id, repeatDelay) : Observable.empty());
return status$.merge(tail$);
}
return init().delay(initialDelay).flatMap(id => recurse(id));
}
尝试the fiddle。