在 RXJS 中,如何使用 retryWhen 强制重新执行使用 bindNodeCallback 创建的 Observable?
In RXJS, how do I force re-execution of an Observable created with bindNodeCallback using retryWhen?
我有一个 Subject
接收对象,然后调用我用 bindNodeCallback
包装的函数。如果由于错误或结果返回错误而失败,我想使用 retryWhen
重试该函数的执行。
我尝试了几种不同的方法,但未能成功使绑定函数再次触发。
function fakeSend(
task: string,
cb: (err: Error | null, result?: boolean) => void
) {
console.log("fakesend", task);
setTimeout(() => {
const hasError = Math.random() < 0.5;
const res = Math.random() < 0.5;
console.log(hasError ? "hasError" : `responding with ${res}`);
if (hasError) {
return cb(new Error("error"));
}
return cb(null, res);
}, 100);
}
const boundSend = bindNodeCallback(fakeSend);
const subject = new Subject<string>();
subject.subscribe(
(task) => {
boundSend(task)
.pipe(
tap((status) => {
if (!status) {
throw new Error("Did not send");
}
return status;
}),
retryWhen((errs) =>
errs.pipe(
delay(1000),
tap((err) => console.log)
)
)
)
.subscribe({
next: console.log,
error: console.error,
complete: () => {
console.log("complete", task);
}
});
},
(error) => {
console.log("error in subject subscribe");
}
);
subject.next("test1");
subject.next("test2");
subject.next("test3");
subject.next("test4");
谢谢!
const { Subject, bindNodeCallback } = rxjs;
const { tap, retryWhen, delay } = rxjs.operators;
function fakeSend(
task,
cb
) {
console.log("fakesend", task);
setTimeout(() => {
const hasError = Math.random() < 0.5;
const res = Math.random() < 0.5;
console.log(hasError ? "hasError" : `responding with ${res}`);
if (hasError) {
return cb(new Error("error"));
}
return cb(null, res);
}, 100);
}
const boundSend = bindNodeCallback(fakeSend);
const subject = new Subject();
subject.subscribe(
(task) => {
boundSend(task)
.pipe(
tap((status) => {
console.log('tap');
if (!status) {
throw new Error("Did not send");
}
return status;
}),
retryWhen((errs) =>
errs.pipe(
delay(1000),
tap((err) => console.log)
)
)
)
.subscribe({
next: console.log,
error: console.error,
complete: () => {
console.log("complete", task);
}
});
},
(error) => {
console.log("error in subject subscribe");
}
);
subject.next("test1");
subject.next("test2");
subject.next("test3");
subject.next("test4");
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.2/rxjs.umd.min.js"></script>
编辑
我用这个工作了:
subject.subscribe(
(task) => {
of(task).pipe(
mergeMap((v) => boundSend(v)),
tap((val) => {
if (!val) throw new Error("Did not send");
}),
retryWhen((errs) => errs.pipe(delay(300)))
).subscribe();
});
但是,我不明白为什么这不起作用:
subject
.pipe(
mergeMap((task) => boundSend(task)),
tap((val) => {
if (!val) throw new Error("Did not send");
}),
retryWhen((errs) => errs.pipe(delay(300)))
)
.subscribe();
编辑2
嗯。我想这可以解释它:https://github.com/ReactiveX/rxjs/issues/1401
这里我以为是因为我是 RX noob :/
如果你想重试 callback-based,我认为你可以尝试使用 new Observable
构造函数而不是 bindNodeCallback
通过 retryWhen
运算符在出现错误时发挥作用。下面是一些细节。
首先,您可以使用 new Observable
构造函数从 callback-based 函数创建一个 Observable
function newSend(task: string) {
return new Observable(
(subscriber: Subscriber<string>): TeardownLogic => {
fakeSend(task, (err: Error | null, result?: string) => {
if (err) {
subscriber.error(err);
} else {
subscriber.next(result);
subscriber.complete();
}
});
}
);
}
函数 newSend
returns 一个 Observable,它在内部调用 fakeSend
并根据传递给 fakeSend
的回调接收到的参数发出结果或错误。
现在我们有办法创建一个 Observable 包装我们的 callback-based 函数,我们可以使用 mergeMap
创建结果流从通过 subject 生成的通知流中回调,像这样
subject
.pipe(
mergeMap((task) => {
return newSend(task);
})
)
.subscribe(
(data) => console.log("Notification", data),
(error) => {
console.log("Error in final subscription", error);
},
() => console.log("DONE")
);
subject.next("test1");
subject.next("test2");
subject.next("test3");
subject.next("test4");
subject.complete(); // added just to show that the complete function of the subscriber can be invoked
如果我们执行上面的代码,我们会看到 subject 上的订阅记录了所有对 fakeSend
的成功调用,直到遇到错误,此时它记录错误并终止(参见 this stackblitz)。
然后我们可以添加 retryWhen
运算符以在 fakeSend
回调中出现错误时实际重试,如下所示
subject
.pipe(
mergeMap((task) => {
return newSend(task).pipe(
retryWhen((errs) =>
errs.pipe(
delay(1000),
tap((err) => console.log("Retry", err.message))
)
)
);
})
)
至此我们的逻辑就完成了。任何时候调用 fakeSend
回调时出现错误,retryWhen
运算符都会确保使用相同的参数对 fakeSend
进行新的调用。完整代码见this stackblitz.
为什么它不适用于 bindNodeCallback
原因是 bindNodeCallback
的实现在内部使用了一个 AsyncSubject
来缓存最后的结果并在后续订阅中重放它。因此,如果 callback-based 函数出错并且 retryWhen
运算符再次订阅,那么订阅会再次生成相同的错误,从而开始无限循环。如果将 newSend(task)
替换为 boundSend(task)
(第 46 和 47 行)
,则可以看到 in the stackblitz
我有一个 Subject
接收对象,然后调用我用 bindNodeCallback
包装的函数。如果由于错误或结果返回错误而失败,我想使用 retryWhen
重试该函数的执行。
我尝试了几种不同的方法,但未能成功使绑定函数再次触发。
function fakeSend(
task: string,
cb: (err: Error | null, result?: boolean) => void
) {
console.log("fakesend", task);
setTimeout(() => {
const hasError = Math.random() < 0.5;
const res = Math.random() < 0.5;
console.log(hasError ? "hasError" : `responding with ${res}`);
if (hasError) {
return cb(new Error("error"));
}
return cb(null, res);
}, 100);
}
const boundSend = bindNodeCallback(fakeSend);
const subject = new Subject<string>();
subject.subscribe(
(task) => {
boundSend(task)
.pipe(
tap((status) => {
if (!status) {
throw new Error("Did not send");
}
return status;
}),
retryWhen((errs) =>
errs.pipe(
delay(1000),
tap((err) => console.log)
)
)
)
.subscribe({
next: console.log,
error: console.error,
complete: () => {
console.log("complete", task);
}
});
},
(error) => {
console.log("error in subject subscribe");
}
);
subject.next("test1");
subject.next("test2");
subject.next("test3");
subject.next("test4");
谢谢!
const { Subject, bindNodeCallback } = rxjs;
const { tap, retryWhen, delay } = rxjs.operators;
function fakeSend(
task,
cb
) {
console.log("fakesend", task);
setTimeout(() => {
const hasError = Math.random() < 0.5;
const res = Math.random() < 0.5;
console.log(hasError ? "hasError" : `responding with ${res}`);
if (hasError) {
return cb(new Error("error"));
}
return cb(null, res);
}, 100);
}
const boundSend = bindNodeCallback(fakeSend);
const subject = new Subject();
subject.subscribe(
(task) => {
boundSend(task)
.pipe(
tap((status) => {
console.log('tap');
if (!status) {
throw new Error("Did not send");
}
return status;
}),
retryWhen((errs) =>
errs.pipe(
delay(1000),
tap((err) => console.log)
)
)
)
.subscribe({
next: console.log,
error: console.error,
complete: () => {
console.log("complete", task);
}
});
},
(error) => {
console.log("error in subject subscribe");
}
);
subject.next("test1");
subject.next("test2");
subject.next("test3");
subject.next("test4");
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.2/rxjs.umd.min.js"></script>
编辑
我用这个工作了:
subject.subscribe(
(task) => {
of(task).pipe(
mergeMap((v) => boundSend(v)),
tap((val) => {
if (!val) throw new Error("Did not send");
}),
retryWhen((errs) => errs.pipe(delay(300)))
).subscribe();
});
但是,我不明白为什么这不起作用:
subject
.pipe(
mergeMap((task) => boundSend(task)),
tap((val) => {
if (!val) throw new Error("Did not send");
}),
retryWhen((errs) => errs.pipe(delay(300)))
)
.subscribe();
编辑2
嗯。我想这可以解释它:https://github.com/ReactiveX/rxjs/issues/1401
这里我以为是因为我是 RX noob :/
如果你想重试 callback-based,我认为你可以尝试使用 new Observable
构造函数而不是 bindNodeCallback
通过 retryWhen
运算符在出现错误时发挥作用。下面是一些细节。
首先,您可以使用 new Observable
构造函数从 callback-based 函数创建一个 Observable
function newSend(task: string) {
return new Observable(
(subscriber: Subscriber<string>): TeardownLogic => {
fakeSend(task, (err: Error | null, result?: string) => {
if (err) {
subscriber.error(err);
} else {
subscriber.next(result);
subscriber.complete();
}
});
}
);
}
函数 newSend
returns 一个 Observable,它在内部调用 fakeSend
并根据传递给 fakeSend
的回调接收到的参数发出结果或错误。
现在我们有办法创建一个 Observable 包装我们的 callback-based 函数,我们可以使用 mergeMap
创建结果流从通过 subject 生成的通知流中回调,像这样
subject
.pipe(
mergeMap((task) => {
return newSend(task);
})
)
.subscribe(
(data) => console.log("Notification", data),
(error) => {
console.log("Error in final subscription", error);
},
() => console.log("DONE")
);
subject.next("test1");
subject.next("test2");
subject.next("test3");
subject.next("test4");
subject.complete(); // added just to show that the complete function of the subscriber can be invoked
如果我们执行上面的代码,我们会看到 subject 上的订阅记录了所有对 fakeSend
的成功调用,直到遇到错误,此时它记录错误并终止(参见 this stackblitz)。
然后我们可以添加 retryWhen
运算符以在 fakeSend
回调中出现错误时实际重试,如下所示
subject
.pipe(
mergeMap((task) => {
return newSend(task).pipe(
retryWhen((errs) =>
errs.pipe(
delay(1000),
tap((err) => console.log("Retry", err.message))
)
)
);
})
)
至此我们的逻辑就完成了。任何时候调用 fakeSend
回调时出现错误,retryWhen
运算符都会确保使用相同的参数对 fakeSend
进行新的调用。完整代码见this stackblitz.
为什么它不适用于 bindNodeCallback
原因是 bindNodeCallback
的实现在内部使用了一个 AsyncSubject
来缓存最后的结果并在后续订阅中重放它。因此,如果 callback-based 函数出错并且 retryWhen
运算符再次订阅,那么订阅会再次生成相同的错误,从而开始无限循环。如果将 newSend(task)
替换为 boundSend(task)
(第 46 和 47 行)