subscriber.next() 在 subscriber.error 语句后未触发

subscriber.next() is not firing after subscriber.error statement

根据输入的数据,我想判断它是否是有效数据,然后将 observable 发送到最后。

我试图创建一个可观察的和顺序的 subscribe.next() 工作正常,但是在没有语句工作之后会出现任何错误。

const observable = new Observable((subscriber) => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.error('failed this after second');
  subscriber.next(3);
  subscriber.next(4);
  subscriber.complete();
});

observable.subscribe({
  next(x) {
    console.log('got value ' + x);
  },
  error(err) {
    console.error('something wrong occurred: ' + err);
  },
  complete() {
    console.log('done');
  },
});

当前输出:

预期输出

这里是 stackblitz link https://stackblitz.com/edit/zlksm5?devtoolsheight=50&file=index.ts

帮我解决这个问题,或者用任何替代方法来处理这个场景。我试图找到解决方案,但没有得到确切的答案。

The Observable Contract

OnError:

  • 表示 Observable 已 终止 并出现指定的错误条件,并且它将 不再发射任何项目

强调我的。

如果您想要在完成后继续运行的东西,您不需要可观察对象。

寻求解决方案

只需将您的错误作为一个值发出即可。你可以(例如)将你的排放量包装在 RxJS Notification 对象中,这样你就可以 materializedematerialize observables 并保持一致的 API 排放量。

例如:

const observable = new Observable(subscriber => {
  subscriber.next({ kind: "N", value: 1 });
  subscriber.next({ kind: "N", value: 2 });
  subscriber.next({ kind: "E", error: new Error("failed this after second") });
  subscriber.next({ kind: "N", value: 3 });
  subscriber.next({ kind: "N", value: 4 });
  subscriber.next({ kind: "C" });
  subscriber.complete();
});

/**** Alternative written more succinctly ****/

const observable = of(
  { kind: "N", value: 1 },
  { kind: "N", value: 2 },
  { kind: "E", error: new Error("failed this after second") },
  { kind: "N", value: 3 },
  { kind: "N", value: 4 },
  { kind: "C" }
);

observable.subscribe((notification) => {
  switch (notification.kind) {
    case "N":
      console.log("got value ", notification.value);
      break;
    case "E":
      console.error("something wrong occurred: ", notification.error.message);
      break;
    case "C":
      console.log("done");
  }
});

我认为 Observable 不可能,因为如果发生任何错误,Observable 将被关闭。

我尝试了几种方法,供参考

//interval(1000)
observable
  //.pipe(switchMap(() => observable))
  .pipe(
    catchError((d) => {
      observable.subscribe((nd) => {
        console.log('--again--', nd);
      });
      return of(d);
    })
    //retry(1)
  )
  .subscribe({
    next(x) {
      console.log('got value ' + x);
    },
    error(err) {
      console.error('something wrong occurred: ' + err);
    },
    complete() {
      console.log('done');
    },
  });

// interval(1000)
//   .pipe(switchMap(_ => observable))
//   .pipe(
//     catchError((d) => {return of(d)}),
//     switchMap(_ => observable)
//   )
// .subscribe(d => {
//   console.log(d)
// })

对于您的用例,最好改用行为主题。