我如何通过发布和连接共享一个可观察对象?
How do I share an observable with publish and connect?
我有一个正在对其应用操作的可观察数据流,拆分为两个单独的流,对两个流中的每一个应用更多(不同的)操作,然后再次合并在一起。我正在尝试使用 Publish
和 Connect
在两个订阅者之间共享 observable,但每个订阅者似乎都在使用单独的流。也就是说,在下面的示例中,我看到 "Doing an expensive operation" 为流中的每个项目打印一次 对于两个订阅者 。 (想象一下昂贵的操作应该在所有订阅者之间只发生一次,因此我正在尝试重用流。)我已经使用 Publish
和 Connect
尝试与共享合并的可观察对象两个订阅者,但它似乎有错误的效果。
问题示例:
var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false });
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
// Converting to strings is an expensive operation
Console.WriteLine("Doing an expensive operation");
return string.Format("#{0}", i);
});
var a = expensive.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = expensive.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });
var connectable = Observable.Merge(a, b).Publish();
connectable.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
connectable.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));
connectable.Connect();
我看到以下输出:
Doing expensive operation
Doing expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing expensive operation
Doing expensive operation
Subscriber B got: { Source = B, Value = #1 }
(输出继续,为简洁起见被截断。)
我如何与两个订阅者共享 observable?
一个可能的修复:
var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false });
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
// Converting to strings is an expensive operation
Console.WriteLine("Doing an expensive operation");
return string.Format("#{0}", i);
});
var subj = new ReplaySubject<string>();
expensive.Subscribe(subj);
var a = subj.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = subj.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });
var merged = Observable.Merge(a, b);
merged.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
merged.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));
上面的示例实质上创建了一个新的中间可观察对象,它发出昂贵操作的结果。这允许您订阅昂贵操作的结果,而不是应用于计时器的昂贵转换。
有了这个你会看到:
Doing an expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing an expensive operation
Subscriber B got: { Source = B, Value = #1 }
(输出继续,为简洁起见被截断。)
或者,您可以将调用移至 Publish
和 Connect
:
var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) {IsBackground = false});
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
// Converting to strings is an expensive operation
Console.WriteLine("Doing an expensive operation");
return string.Format("#{0}", i);
}).Publish();
var a = expensive.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = expensive.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });
var merged = Observable.Merge(a, b);
merged.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
merged.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));
expensive.Connect();
为什么 ReplaySubject
,而不只是 Subject
或其他一些主题?
A Subject
,在 .NET Rx 实现中默认是什么 the ReactiveX documentation calls a PublishSubject
,它只向观察者发射那些在订阅之后由源 Observable 发射的项目.另一方面,ReplaySubject
向任何观察者发送源 Observable 发出的所有项目,无论观察者何时订阅。如果我们在第一个示例中使用普通主题,subj
对计时器的订阅将导致对 subj
的订阅错过在主题订阅昂贵操作的时间和他们订阅了中级主题 (subj
)。
您发布了错误的 observable。
使用当前代码,您正在合并,然后像这样发布 Observable.Merge(a, b).Publish();
。现在,由于 a
和 b
是针对 expensive
定义的,您仍然可以获得对 expensive
.
的两个订阅
订阅创建了这些管道:
如果你从代码中取出 .Publish();
就可以看到这个。输出变为:
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Subscriber B got: { Source = B, Value = #1 }
这会创建这些管道:
因此,通过将 .Publish()
移回 expensive
即可解决问题。这就是您真正需要它的地方,因为毕竟这是一项昂贵的操作。
这是您需要的代码:
var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false });
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
// Converting to strings is an expensive operation
Console.WriteLine("Doing an expensive operation");
return string.Format("#{0}", i);
});
var connectable = expensive.Publish();
var a = connectable.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = connectable.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });
var merged = Observable.Merge(a, b);
merged.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
merged.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));
connectable.Connect();
这很好地产生了以下内容:
Doing an expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing an expensive operation
Subscriber B got: { Source = B, Value = #1 }
Doing an expensive operation
Subscriber A got: { Source = A, Value = #2 }
Doing an expensive operation
Subscriber B got: { Source = B, Value = #3 }
这为您提供了这些管道:
从这张图可以看出,还是有重复的。很好,因为这些部件并不昂贵。
重复其实很重要。管道的共享部分使它们的端点容易出错,从而提前终止。共享越少,代码的健壮性就越好。只有当你有一个昂贵的操作时,你才应该担心发布。否则你应该让管道成为它们自己。
这里有一个例子来说明。如果您没有已发布的来源,那么如果一个来源产生错误,那么它不会关闭所有管道。
但是一旦你引入了一个共享的可观察对象,那么一个错误就会导致所有的管道崩溃。
我有一个正在对其应用操作的可观察数据流,拆分为两个单独的流,对两个流中的每一个应用更多(不同的)操作,然后再次合并在一起。我正在尝试使用 Publish
和 Connect
在两个订阅者之间共享 observable,但每个订阅者似乎都在使用单独的流。也就是说,在下面的示例中,我看到 "Doing an expensive operation" 为流中的每个项目打印一次 对于两个订阅者 。 (想象一下昂贵的操作应该在所有订阅者之间只发生一次,因此我正在尝试重用流。)我已经使用 Publish
和 Connect
尝试与共享合并的可观察对象两个订阅者,但它似乎有错误的效果。
问题示例:
var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false });
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
// Converting to strings is an expensive operation
Console.WriteLine("Doing an expensive operation");
return string.Format("#{0}", i);
});
var a = expensive.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = expensive.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });
var connectable = Observable.Merge(a, b).Publish();
connectable.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
connectable.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));
connectable.Connect();
我看到以下输出:
Doing expensive operation
Doing expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing expensive operation
Doing expensive operation
Subscriber B got: { Source = B, Value = #1 }
(输出继续,为简洁起见被截断。)
我如何与两个订阅者共享 observable?
一个可能的修复:
var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false });
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
// Converting to strings is an expensive operation
Console.WriteLine("Doing an expensive operation");
return string.Format("#{0}", i);
});
var subj = new ReplaySubject<string>();
expensive.Subscribe(subj);
var a = subj.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = subj.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });
var merged = Observable.Merge(a, b);
merged.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
merged.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));
上面的示例实质上创建了一个新的中间可观察对象,它发出昂贵操作的结果。这允许您订阅昂贵操作的结果,而不是应用于计时器的昂贵转换。
有了这个你会看到:
Doing an expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing an expensive operation
Subscriber B got: { Source = B, Value = #1 }
(输出继续,为简洁起见被截断。)
或者,您可以将调用移至 Publish
和 Connect
:
var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) {IsBackground = false});
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
// Converting to strings is an expensive operation
Console.WriteLine("Doing an expensive operation");
return string.Format("#{0}", i);
}).Publish();
var a = expensive.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = expensive.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });
var merged = Observable.Merge(a, b);
merged.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
merged.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));
expensive.Connect();
为什么 ReplaySubject
,而不只是 Subject
或其他一些主题?
A Subject
,在 .NET Rx 实现中默认是什么 the ReactiveX documentation calls a PublishSubject
,它只向观察者发射那些在订阅之后由源 Observable 发射的项目.另一方面,ReplaySubject
向任何观察者发送源 Observable 发出的所有项目,无论观察者何时订阅。如果我们在第一个示例中使用普通主题,subj
对计时器的订阅将导致对 subj
的订阅错过在主题订阅昂贵操作的时间和他们订阅了中级主题 (subj
)。
您发布了错误的 observable。
使用当前代码,您正在合并,然后像这样发布 Observable.Merge(a, b).Publish();
。现在,由于 a
和 b
是针对 expensive
定义的,您仍然可以获得对 expensive
.
订阅创建了这些管道:
如果你从代码中取出 .Publish();
就可以看到这个。输出变为:
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Subscriber B got: { Source = B, Value = #1 }
这会创建这些管道:
因此,通过将 .Publish()
移回 expensive
即可解决问题。这就是您真正需要它的地方,因为毕竟这是一项昂贵的操作。
这是您需要的代码:
var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false });
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
// Converting to strings is an expensive operation
Console.WriteLine("Doing an expensive operation");
return string.Format("#{0}", i);
});
var connectable = expensive.Publish();
var a = connectable.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = connectable.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });
var merged = Observable.Merge(a, b);
merged.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
merged.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));
connectable.Connect();
这很好地产生了以下内容:
Doing an expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing an expensive operation
Subscriber B got: { Source = B, Value = #1 }
Doing an expensive operation
Subscriber A got: { Source = A, Value = #2 }
Doing an expensive operation
Subscriber B got: { Source = B, Value = #3 }
这为您提供了这些管道:
从这张图可以看出,还是有重复的。很好,因为这些部件并不昂贵。
重复其实很重要。管道的共享部分使它们的端点容易出错,从而提前终止。共享越少,代码的健壮性就越好。只有当你有一个昂贵的操作时,你才应该担心发布。否则你应该让管道成为它们自己。
这里有一个例子来说明。如果您没有已发布的来源,那么如果一个来源产生错误,那么它不会关闭所有管道。
但是一旦你引入了一个共享的可观察对象,那么一个错误就会导致所有的管道崩溃。