在 RXJS 中,如何使用 retryWhen 强制重新执行使用 bindNodeCallback 创建的 Observable?

In RXJS, how do I force re-execution of an Observable created with bindNodeCallback using retryWhen?

我有一个 Subject 接收对象,然后调用我用 bindNodeCallback 包装的函数。如果由于错误或结果返回错误而失败,我想使用 retryWhen 重试该函数的执行。

我尝试了几种不同的方法,但未能成功使绑定函数再次触发。

我有一个codesandbox setup here

function fakeSend(
  task: string,
  cb: (err: Error | null, result?: boolean) => void
) {
  console.log("fakesend", task);

  setTimeout(() => {
    const hasError = Math.random() < 0.5;
    const res = Math.random() < 0.5;
    console.log(hasError ? "hasError" : `responding with ${res}`);
    if (hasError) {
      return cb(new Error("error"));
    }
    return cb(null, res);
  }, 100);
}

const boundSend = bindNodeCallback(fakeSend);
const subject = new Subject<string>();

subject.subscribe(
  (task) => {
    boundSend(task)
      .pipe(
        tap((status) => {
          if (!status) {
            throw new Error("Did not send");
          }
          return status;
        }),
        retryWhen((errs) =>
          errs.pipe(
            delay(1000),
            tap((err) => console.log)
          )
        )
      )
      .subscribe({
        next: console.log,
        error: console.error,
        complete: () => {
          console.log("complete", task);
        }
      });
  },
  (error) => {
    console.log("error in subject subscribe");
  }
);

subject.next("test1");
subject.next("test2");
subject.next("test3");
subject.next("test4");

  

谢谢!

const { Subject, bindNodeCallback } = rxjs;
const { tap, retryWhen, delay } = rxjs.operators;

function fakeSend(
  task,
  cb
) {
  console.log("fakesend", task);

  setTimeout(() => {
    const hasError = Math.random() < 0.5;
    const res = Math.random() < 0.5;
    console.log(hasError ? "hasError" : `responding with ${res}`);
    if (hasError) {
      return cb(new Error("error"));
    }
    return cb(null, res);
  }, 100);
}

const boundSend = bindNodeCallback(fakeSend);
const subject = new Subject();

subject.subscribe(
  (task) => {
    boundSend(task)
      .pipe(
        tap((status) => {
          console.log('tap');
          if (!status) {
            throw new Error("Did not send");
          }
          return status;
        }),
        retryWhen((errs) =>
          errs.pipe(
            delay(1000),
            tap((err) => console.log)
          )
        )
      )
      .subscribe({
        next: console.log,
        error: console.error,
        complete: () => {
          console.log("complete", task);
        }
      });
  },
  (error) => {
    console.log("error in subject subscribe");
  }
);

subject.next("test1");
subject.next("test2");
subject.next("test3");
subject.next("test4");
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.2/rxjs.umd.min.js"></script>

编辑

我用这个工作了:

subject.subscribe(
  (task) => {
    of(task).pipe(
      mergeMap((v) => boundSend(v)),
      tap((val) => {
        if (!val) throw new Error("Did not send");
      }),
      retryWhen((errs) => errs.pipe(delay(300)))
    ).subscribe();
  });

但是,我不明白为什么这不起作用:

subject
  .pipe(
    mergeMap((task) => boundSend(task)),
    tap((val) => {
      if (!val) throw new Error("Did not send");
    }),
    retryWhen((errs) => errs.pipe(delay(300)))
  )
  .subscribe();

编辑2

嗯。我想这可以解释它:https://github.com/ReactiveX/rxjs/issues/1401

这里我以为是因为我是 RX noob :/

如果你想重试 callback-based,我认为你可以尝试使用 new Observable 构造函数而不是 bindNodeCallback通过 retryWhen 运算符在出现错误时发挥作用。下面是一些细节。

首先,您可以使用 new Observable 构造函数从 callback-based 函数创建一个 Observable

function newSend(task: string) {
  return new Observable(
    (subscriber: Subscriber<string>): TeardownLogic => {
      fakeSend(task, (err: Error | null, result?: string) => {
        if (err) {
          subscriber.error(err);
        } else {
          subscriber.next(result);
          subscriber.complete();
        }
      });
    }
  );
}

函数 newSend returns 一个 Observable,它在内部调用 fakeSend 并根据传递给 fakeSend 的回调接收到的参数发出结果或错误。

现在我们有办法创建一个 Observable 包装我们的 callback-based 函数,我们可以使用 mergeMap 创建结果流从通过 subject 生成的通知流中回调,像这样

subject
  .pipe(
    mergeMap((task) => {
      return newSend(task);
    })
  )
  .subscribe(
    (data) => console.log("Notification", data),
    (error) => {
      console.log("Error in final subscription", error);
    },
    () => console.log("DONE")
  );

subject.next("test1");
subject.next("test2");
subject.next("test3");
subject.next("test4");

subject.complete();  // added just to show that the complete function of the subscriber can be invoked

如果我们执行上面的代码,我们会看到 subject 上的订阅记录了所有对 fakeSend 的成功调用,直到遇到错误,此时它记录错误并终止(参见 this stackblitz)。

然后我们可以添加 retryWhen 运算符以在 fakeSend 回调中出现错误时实际重试,如下所示

subject
  .pipe(
    mergeMap((task) => {
      return newSend(task).pipe(
        retryWhen((errs) =>
          errs.pipe(
            delay(1000),
            tap((err) => console.log("Retry", err.message))
          )
        )
      );
    })
  )

至此我们的逻辑就完成了。任何时候调用 fakeSend 回调时出现错误,retryWhen 运算符都会确保使用相同的参数对 fakeSend 进行新的调用。完整代码见this stackblitz.

为什么它不适用于 bindNodeCallback

原因是 bindNodeCallback 的实现在内部使用了一个 AsyncSubject 来缓存最后的结果并在后续订阅中重放它。因此,如果 callback-based 函数出错并且 retryWhen 运算符再次订阅,那么订阅会再次生成相同的错误,从而开始无限循环。如果将 newSend(task) 替换为 boundSend(task)(第 46 和 47 行)

,则可以看到 in the stackblitz