.Net 中的反应式扩展 (C#) - Subject<T> 实例仅处理一个订阅
Reactive Extensions in .Net (C#) - Subject<T> instance only processing one subscription
所以我希望我的标题足够准确,但我现在非常处于“只见树木不见森林”的状态。
我研究 C# 的反应式扩展已经有一段时间了,直到现在它的表现都非常直观和美妙,然后我遇到了一个问题:
我创建了一个主题
我创建了一个值 Observable.Create 并为主题调用订阅
然后该值愉快地弹出到我的 OnNext()
lambda 中,万岁!
但是,如果我创建一个后续的 Observable.Return 并订阅...没有任何反应,主题包含 HasObservers: false
我整理了一个演示问题的最小控制台片段
using System.Reactive.Linq;
using System.Reactive.Subjects;
static void Main(string[] args)
{
Subject<string> messenger = new Subject<string>();
messenger.Where(o => o.Length > 0).Subscribe(file => { Console.WriteLine("got file request: " + file); });
var pathObservable = Observable.Return<string>("File 1");
pathObservable.Subscribe(messenger);
var pathObservable2 = Observable.Return<string>("File 2");
pathObservable2.Subscribe(messenger);
Console.ReadKey();
}
结果:
注意:在有人问为什么我在做一些愚蠢的事情之前,比如在一个序列可以做的地方使用两个可观察对象,这只是一个例子;实际上,两个单独的可观察创建调用是独立的,并且是由外部文件请求引起的,在这种情况下,问题是第一个请求成功,第二个没有。
我尝试使用 Observable.Create() 代替,并使用单个值调用 OnNext,结果相同;有趣的是,在这种情况下,在 lambda 中有一个断点表明正在为第二个可观察对象调用 OnNext,它只是没有弹出到 Subject
非常欢迎任何关于我遗漏或误解的想法,
提前致谢
通过让 messenger
订阅另一个可观察对象,您将让它订阅 OnNext
、OnError
和 OnCompleted
。
如果您在创建 Subject<string>
后添加此内容,您将看到它已完成:
messenger.Subscribe(_ => {}, e => Console.WriteLine(e), () => Console.WriteLine("Completed"));
相反,如果您只添加一个关于下一步的操作,则主题将不会完成:
Subject<string> messenger = new Subject<string>();
messenger.Where(o => o.Length > 0).Subscribe(file => { Console.WriteLine("got file request: " + file); });
var pathObservable = Observable.Return<string>("File 1");
pathObservable.Subscribe(v => messenger.OnNext(v));
var pathObservable2 = Observable.Return<string>("File 2");
pathObservable2.Subscribe(v => messenger.OnNext(v));
messenger.OnNext("File 3");
Console.ReadLine();
输出:
got file request: File 1
got file request: File 2
got file request: File 3
执行此操作的最简单方法是仅订阅 OnNext
消息,正如 Paulo 所建议的那样。您还可以从可观察对象中删除或抑制 OnCompleted
消息。
但是,最好的方法可能是替换 messenger
主题。我不知道你更大的用例,但一个好的目标是尽量减少 Subject 的使用。它们非常适合学习,但不适合应用。
在这种情况下,您可以将整个代码替换为一行代码:
Observable.Merge(
Observable.Return<string>("File 1"),
Observable.Return<string>("File 2")
)
.Where(o => o.Length > 0)
.Subscribe(file => Console.WriteLine("got file request: " + file));
如果您想模拟动态添加可观察对象,您可能希望使用 Subject<IObservable<string>>
来实现:
Subject<IObservable<string>> consoleWriter = new Subject<IObservable<string>>();
consoleWriter
.Merge()
.Where(o => o.Length > 0)
.Subscribe(file => Console.WriteLine("got file request: " + file));
var pathObservable = Observable.Return<string>("File 1");
consoleWriter.OnNext(pathObservable);
var pathObservable2 = Observable.Return<string>("File 2");
consoleWriter.OnNext(pathObservable2);
所以我希望我的标题足够准确,但我现在非常处于“只见树木不见森林”的状态。
我研究 C# 的反应式扩展已经有一段时间了,直到现在它的表现都非常直观和美妙,然后我遇到了一个问题:
我创建了一个主题
我创建了一个值 Observable.Create 并为主题调用订阅
然后该值愉快地弹出到我的 OnNext()
lambda 中,万岁!
但是,如果我创建一个后续的 Observable.Return 并订阅...没有任何反应,主题包含 HasObservers: false
我整理了一个演示问题的最小控制台片段
using System.Reactive.Linq;
using System.Reactive.Subjects;
static void Main(string[] args)
{
Subject<string> messenger = new Subject<string>();
messenger.Where(o => o.Length > 0).Subscribe(file => { Console.WriteLine("got file request: " + file); });
var pathObservable = Observable.Return<string>("File 1");
pathObservable.Subscribe(messenger);
var pathObservable2 = Observable.Return<string>("File 2");
pathObservable2.Subscribe(messenger);
Console.ReadKey();
}
结果:
注意:在有人问为什么我在做一些愚蠢的事情之前,比如在一个序列可以做的地方使用两个可观察对象,这只是一个例子;实际上,两个单独的可观察创建调用是独立的,并且是由外部文件请求引起的,在这种情况下,问题是第一个请求成功,第二个没有。
我尝试使用 Observable.Create() 代替,并使用单个值调用 OnNext,结果相同;有趣的是,在这种情况下,在 lambda 中有一个断点表明正在为第二个可观察对象调用 OnNext,它只是没有弹出到 Subject
非常欢迎任何关于我遗漏或误解的想法, 提前致谢
通过让 messenger
订阅另一个可观察对象,您将让它订阅 OnNext
、OnError
和 OnCompleted
。
如果您在创建 Subject<string>
后添加此内容,您将看到它已完成:
messenger.Subscribe(_ => {}, e => Console.WriteLine(e), () => Console.WriteLine("Completed"));
相反,如果您只添加一个关于下一步的操作,则主题将不会完成:
Subject<string> messenger = new Subject<string>();
messenger.Where(o => o.Length > 0).Subscribe(file => { Console.WriteLine("got file request: " + file); });
var pathObservable = Observable.Return<string>("File 1");
pathObservable.Subscribe(v => messenger.OnNext(v));
var pathObservable2 = Observable.Return<string>("File 2");
pathObservable2.Subscribe(v => messenger.OnNext(v));
messenger.OnNext("File 3");
Console.ReadLine();
输出:
got file request: File 1
got file request: File 2
got file request: File 3
执行此操作的最简单方法是仅订阅 OnNext
消息,正如 Paulo 所建议的那样。您还可以从可观察对象中删除或抑制 OnCompleted
消息。
但是,最好的方法可能是替换 messenger
主题。我不知道你更大的用例,但一个好的目标是尽量减少 Subject 的使用。它们非常适合学习,但不适合应用。
在这种情况下,您可以将整个代码替换为一行代码:
Observable.Merge(
Observable.Return<string>("File 1"),
Observable.Return<string>("File 2")
)
.Where(o => o.Length > 0)
.Subscribe(file => Console.WriteLine("got file request: " + file));
如果您想模拟动态添加可观察对象,您可能希望使用 Subject<IObservable<string>>
来实现:
Subject<IObservable<string>> consoleWriter = new Subject<IObservable<string>>();
consoleWriter
.Merge()
.Where(o => o.Length > 0)
.Subscribe(file => Console.WriteLine("got file request: " + file));
var pathObservable = Observable.Return<string>("File 1");
consoleWriter.OnNext(pathObservable);
var pathObservable2 = Observable.Return<string>("File 2");
consoleWriter.OnNext(pathObservable2);