Rx .NET 观察函数调用的正确方法是什么

Rx .NET what is the right way to observe a function call

我正在学习在 .NET 中以正确的方式执行 rx,并且想知道从远程函数调用中生成可观察流的常用方法是什么。

目前,我只是创建一个 Subject<T> 并在回调方法中调用它的 OnNext 函数。

示例:

subject.AsObservable().Where(...).Subscribe(...);

这是我过滤和订阅 observable 的地方。

await speechService.StartListeningAsync(f => OnSpeechCommandRecognized(f), 
                  cts.Token, OnError, interimResults).ConfigureAwait(false);

这是我将回调方法传递给语音服务的地方。

private void OnSpeechCommandRecognized(StreamingRecognitionResult result)
{
    subject.OnNext(result);
}

这是我在回调方法中调用 OnNext 函数的地方。

我正在寻找的另一种方法是调用 Observable.Start 并订阅返回的可观察对象,但我认为这不是可行的方法。

我也可以围绕 Observable.Create.

进行调用

所以我的问题是:此示例是订阅发出结果的方法的正确方法吗?

如果您使用 Create 而不是 Subject 会怎样?

由于您的 API returns 一个任务可能在停止收听之前不会完成,您可以使用 FromAsync 来包装该调用,然后创建一个闭包以Create 的观察员。订阅内部 observable,并且您有一个 observable,当订阅时,它将从 StartListeningAsync 回调中产生结果,直到关联的任务完成(或错误)。

Observable.Create((observer, outerCancel) =>
    Observable.FromAsync(innerCancel =>
            speechService.StartListeningAsync(
                observer.OnNext,
                innerCancel,
                observer.OnError,
                interimResults
            ))
        .Subscribe(observer, outerCancel)
);