.Net 中的反应式扩展 (C#) - Subject<T> 实例仅处理一个订阅

Reactive Extensions in .Net (C#) - Subject<T> instance only processing one subscription

所以我希望我的标题足够准确,但我现在非常处于“只见树木不见森林”的状态。

我研究 C# 的反应式扩展已经有一段时间了,直到现在它的表现都非常直观和美妙,然后我遇到了一个问题:

我创建了一个主题 我创建了一个值 Observable.Create 并为主题调用订阅 然后该值愉快地弹出到我的 OnNext() lambda 中,万岁!

但是,如果我创建一个后续的 Observable.Return 并订阅...没有任何反应,主题包含 HasObservers: false

我整理了一个演示问题的最小控制台片段

    using System.Reactive.Linq;
    using System.Reactive.Subjects;

    static void Main(string[] args)
    {
        Subject<string> messenger = new Subject<string>();
        messenger.Where(o => o.Length > 0).Subscribe(file => { Console.WriteLine("got file request: " + file); });

        var pathObservable = Observable.Return<string>("File 1");
        pathObservable.Subscribe(messenger);

        var pathObservable2 = Observable.Return<string>("File 2");
        pathObservable2.Subscribe(messenger);

        Console.ReadKey();
    }

结果:

注意:在有人问为什么我在做一些愚蠢的事情之前,比如在一个序列可以做的地方使用两个可观察对象,这只是一个例子;实际上,两个单独的可观察创建调用是独立的,并且是由外部文件请求引起的,在这种情况下,问题是第一个请求成功,第二个没有。

我尝试使用 Observable.Create() 代替,并使用单个值调用 OnNext,结果相同;有趣的是,在这种情况下,在 lambda 中有一个断点表明正在为第二个可观察对象调用 OnNext,它只是没有弹出到 Subject

非常欢迎任何关于我遗漏或误解的想法, 提前致谢

通过让 messenger 订阅另一个可观察对象,您将让它订阅 OnNextOnErrorOnCompleted

如果您在创建 Subject<string> 后添加此内容,您将看到它已完成:

messenger.Subscribe(_ => {}, e => Console.WriteLine(e), () => Console.WriteLine("Completed"));

相反,如果您只添加一个关于下一步的操作,则主题将不会完成:

Subject<string> messenger = new Subject<string>();
messenger.Where(o => o.Length > 0).Subscribe(file => { Console.WriteLine("got file request: " + file); });

var pathObservable = Observable.Return<string>("File 1");
pathObservable.Subscribe(v => messenger.OnNext(v));

var pathObservable2 = Observable.Return<string>("File 2");
pathObservable2.Subscribe(v => messenger.OnNext(v));

messenger.OnNext("File 3");

Console.ReadLine();

输出:

got file request: File 1
got file request: File 2
got file request: File 3

执行此操作的最简单方法是仅订阅 OnNext 消息,正如 Paulo 所建议的那样。您还可以从可观察对象中删除或抑制 OnCompleted 消息。

但是,最好的方法可能是替换 messenger 主题。我不知道你更大的用例,但一个好的目标是尽量减少 Subject 的使用。它们非常适合学习,但不适合应用。

在这种情况下,您可以将整个代码替换为一行代码:

Observable.Merge(
        Observable.Return<string>("File 1"),
        Observable.Return<string>("File 2")
    )
    .Where(o => o.Length > 0)
    .Subscribe(file => Console.WriteLine("got file request: " + file));

如果您想模拟动态添加可观察对象,您可能希望使用 Subject<IObservable<string>> 来实现:

Subject<IObservable<string>> consoleWriter = new Subject<IObservable<string>>();
consoleWriter
    .Merge()
    .Where(o => o.Length > 0)
    .Subscribe(file => Console.WriteLine("got file request: " + file));

var pathObservable = Observable.Return<string>("File 1");
consoleWriter.OnNext(pathObservable);

var pathObservable2 = Observable.Return<string>("File 2");
consoleWriter.OnNext(pathObservable2);