为什么 Rx.Observable.throw(x) 实际上没有抛出?

Why Rx.Observable.throw(x) does not actually throw?

我正在研究 Rxjs、可观察对象和地图,我发现 Observable.throw(error) 有一个我无法解释的奇怪行为。

如果我有一个使用 map 运算符的 Rx 流,并且我想中断该过程,我会期望方法 Observable.throw 是合适的,但是,情况似乎并非如此.

考虑以下示例:

Rx.Observable.just("test 1").subscribe(
    x => console.log(x),
  err => console.log(err),
  () => console.log("done, no errors")
);

现在让我们引入一个错误,如果我使用 Javascript 中的常规 throw 它会按预期工作:

Rx.Observable.just("test 2").map(
    x => { throw new Error(x) }
).subscribe(
    x => console.log(x),
  err => console.log(err),
  () => console.log("done - error was thrown, as expected")
);

输出:

Error: test 2 at Rx.Observable.just.map.x ((index):58) at c (rx.all.compat.js:61) at e.onNext (rx.all.compat.js:5169) (...)

但是如果我使用 Rx.Observable.throw(...),来自以下订阅者的 error 回调将永远不会被调用,并且 next 回调将被调用,而不是一些奇怪的似乎是 Rx 错误对象的对象。

Rx.Observable.just("test 3").map(
    x => Rx.Observable.throw(x)
).subscribe(
    x => console.log(x),
  err => console.log(err),
  () => console.log("done - it does not work... why?")
);

输出:

b_subscribe: f(a)error: "test 3" scheduler: a__proto__: g

正如@Whymarrh 指出的那样,如果我改为使用 flatMap 运算符,它似乎工作正常。

Documentation for map:

The Map operator applies a function of your choosing to each item emitted by the source Observable, and returns an Observable that emits the results of these function applications.

Documentation for Observable.throw:

Returns an observable sequence that terminates with an exception, using the specified scheduler to send out the single onError message.

有谁知道为什么在地图运算符中使用Observable.throw时没有调用error回调,为什么进程没有中断?

Example in jsfiddle

我知道我可以只使用常规的 throw 并继续,我已经有了一个可行的解决方案,出于好奇我发布这个问题是为了更好地了解框架的工作原理。

快速提醒:Whosebug 有一个 be nice policy.

其中一条评论正确回答了这个问题:

throw new Error(x) 抛出一个异常,它只是冒泡到进程的顶部。

x => Rx.Observable.throw(x) 只是创建一个表示错误的结构。对于 map 运算符,此结构是一个与任何其他结构一样的值,用于调用成功处理程序。 flatMap 另一方面将获取结构并将其解包,然后调用错误处理程序。