我如何通过发布和连接共享一个可观察对象?

How do I share an observable with publish and connect?

我有一个正在对其应用操作的可观察数据流,拆分为两个单独的流,对两个流中的每一个应用更多(不同的)操作,然后再次合并在一起。我正在尝试使用 PublishConnect 在两个订阅者之间共享 observable,但每个订阅者似乎都在使用单独的流。也就是说,在下面的示例中,我看到 "Doing an expensive operation" 为流中的每个项目打印一次 对于两个订阅者 。 (想象一下昂贵的操作应该在所有订阅者之间只发生一次,因此我正在尝试重用流。)我已经使用 PublishConnect 尝试与共享合并的可观察对象两个订阅者,但它似乎有错误的效果。

问题示例:

var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false });
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
    // Converting to strings is an expensive operation
    Console.WriteLine("Doing an expensive operation");
    return string.Format("#{0}", i);
});

var a = expensive.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = expensive.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });

var connectable = Observable.Merge(a, b).Publish();
connectable.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
connectable.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));
connectable.Connect();

我看到以下输出:

Doing expensive operation
Doing expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing expensive operation
Doing expensive operation
Subscriber B got: { Source = B, Value = #1 }

(输出继续,为简洁起见被截断。)

我如何与两个订阅者共享 observable?

一个可能的修复:

var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false });
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
    // Converting to strings is an expensive operation
    Console.WriteLine("Doing an expensive operation");
    return string.Format("#{0}", i);
});

var subj = new ReplaySubject<string>();
expensive.Subscribe(subj);

var a = subj.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = subj.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });

var merged = Observable.Merge(a, b);
merged.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
merged.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));

上面的示例实质上创建了一个新的中间可观察对象,它发出昂贵操作的结果。这允许您订阅昂贵操作的结果,而不是应用于计时器的昂贵转换。

有了这个你会看到:

Doing an expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing an expensive operation
Subscriber B got: { Source = B, Value = #1 }

(输出继续,为简洁起见被截断。)

或者,您可以将调用移至 PublishConnect:

var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) {IsBackground = false});
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
    // Converting to strings is an expensive operation
    Console.WriteLine("Doing an expensive operation");
    return string.Format("#{0}", i);
}).Publish();

var a = expensive.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = expensive.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });

var merged = Observable.Merge(a, b);
merged.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
merged.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));

expensive.Connect();

为什么 ReplaySubject,而不只是 Subject 或其他一些主题?

A Subject,在 .NET Rx 实现中默认是什么 the ReactiveX documentation calls a PublishSubject,它只向观察者发射那些在订阅之后由源 Observable 发射的项目.另一方面,ReplaySubject 向任何观察者发送源 Observable 发出的所有项目,无论观察者何时订阅。如果我们在第一个示例中使用普通主题,subj 对计时器的订阅将导致对 subj 的订阅错过在主题订阅昂贵操作的时间和他们订阅了中级主题 (subj)。

您发布了错误的 observable。

使用当前代码,您正在合并,然后像这样发布 Observable.Merge(a, b).Publish();。现在,由于 ab 是针对 expensive 定义的,您仍然可以获得对 expensive.

的两个订阅

订阅创建了这些管道:

如果你从代码中取出 .Publish(); 就可以看到这个。输出变为:

Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Subscriber B got: { Source = B, Value = #1 }

这会创建这些管道:

因此,通过将 .Publish() 移回 expensive 即可解决问题。这就是您真正需要它的地方,因为毕竟这是一项昂贵的操作。

这是您需要的代码:

var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false });
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
    // Converting to strings is an expensive operation
    Console.WriteLine("Doing an expensive operation");
    return string.Format("#{0}", i);
});

var connectable = expensive.Publish();

var a = connectable.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = connectable.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });

var merged = Observable.Merge(a, b);

merged.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
merged.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));

connectable.Connect();

这很好地产生了以下内容:

Doing an expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing an expensive operation
Subscriber B got: { Source = B, Value = #1 }
Doing an expensive operation
Subscriber A got: { Source = A, Value = #2 }
Doing an expensive operation
Subscriber B got: { Source = B, Value = #3 }

这为您提供了这些管道:

从这张图可以看出,还是有重复的。很好,因为这些部件并不昂贵。

重复其实很重要。管道的共享部分使它们的端点容易出错,从而提前终止。共享越少,代码的健壮性就越好。只有当你有一个昂贵的操作时,你才应该担心发布。否则你应该让管道成为它们自己。

这里有一个例子来说明。如果您没有已发布的来源,那么如果一个来源产生错误,那么它不会关闭所有管道。

但是一旦你引入了一个共享的可观察对象,那么一个错误就会导致所有的管道崩溃。