如何在 RxJs 中异步抛出错误?

How do I throw an error asynchronously in RxJs?

我有一个跟踪命令行界面问题和答案的观察器。我想做的是在我的代码中给定某个事件的情况下向观察者注入一个错误,以终止观察者及其下游订阅。不知道什么时候运行。

我试过从主题和可观察对象的合并中抛出错误,但我似乎无法从中得到任何东西。

相关代码如下:

        this.errorInjector$ = new Subject<[discord.Message, MessageWrapper.Response]>();
        ....
        this.nextQa$ = merge(
            nextQa$,
            this.errorInjector$.pipe(
                tap((): void => {
                    throw (new Error('Stop Conversation called'));
                }),
            ),
        );

        // start conversation
        Utils.logger.trace(`Starting a new conversation id '${this.uuid}' with '${this.opMessage.author.username}'`);
    }

    getNextQa$(): Observable<[discord.Message, MessageWrapper.Response]> {
        return this.nextQa$;
    }

    stopConversation(): void {
        this.errorInjector$.next(
            null as any
        );
    }

this.nextQa$与本地nextQa$和errorInjector$合并。我可以确认正在调用停止对话并且下游正在接收 this.nextQa$ 但是当我尝试注入错误时我没有看到任何错误传播到下游。我还尝试了 this.errorInjector.error() 方法和 map() 运算符而不是 tap()。无论出于何种原因,我都无法让这两个流合并并抛出我的错误。注意:this.nextQa$ 确实会向下游传播错误。

我觉得我遗漏了一些有关合并或主题如何工作的信息,因此我们将不胜感激任何帮助或解释。

编辑

好吧,我刚刚发现我需要一个 BehaviorSubject 而不是常规主题。我想我现在的问题是为什么我需要一个 BehaviorSubject 而不是一个普通的 Subject 来抛出一个错误?

编辑 2

BehaviorSubject 总是抛出这个错误,这不是我想要的。这是由于其初始发射的性质,但我仍然不明白为什么我不能在这段代码中对常规主题做任何事情。

问题是从我拥有的字典对象中获取带有 stopConversation(): void 的对象。 this 对象已定义并显示 errorInjector$ 已定义,但当我将鼠标悬停在该值上时,调试器告诉我 errorInjector$ 已变为未定义。至少这就是问题所在,我可能需要就此提出另一个问题。

首先,如果你想工作,你必须在任何错误发出之前订阅。因此,您的代码中存在订阅序列问题。如果您在创建 this.nextQa$ 后立即订阅,则不应错过该错误。

this.nextQa$ = merge(
            nextQa$,
            this.errorInjector$.pipe(
                tap((): void => {
                    throw (new Error('Stop Conversation called'));
                }),
            ),
        );
this.nextQa$.subscribe(console.log,console.error)