Rx Merge 防止初始值

Rx Merge prevents initial value

我是 Reactive 开发的新手,并且无法以赋予初始值以及随时间发生在主题上的变化的方式组合多个主题。

我预期的代码如下:

first - OnNext(a)  <-- initial on subscribe
first - OnNext(c)  <-- initial on subscribe
first - OnNext(b)  <-- result of OnNext
second - OnNext(b) <-- initial on subscribe
second - OnNext(c) <-- initial on subscribe
first - OnNext(d)  <-- result of OnNext
second - OnNext(d) <-- result of OnNext

但我得到:

first - OnNext(b)  <-- result of OnNext
first - OnNext(d)  <-- result of OnNext
second - OnNext(d) <-- result of OnNext

代码:

var connection1 = new BehaviorSubject<string>("a");
var connection2 = new BehaviorSubject<string>("c");
var connection = Observable.Merge(connection1, connection2).StartWith(connection1.Value, connection2.Value).Publish();
connection.Connect();
connection.SubscribeLogger("first");
connection1.OnNext("b");
connection.SubscribeLogger("second");
connection2.OnNext("d");

Here 是一个 link,它解释了 Connected Observable 背后的一般理想(通过在合并输出上调用 Connect() 创建。

当调用 Connect() 时,合并输出将其发布的值发送给订阅者(此时 none,因此初始的 ac 被发送到没有订阅者。

然后 first 订阅,随后对 connection1.OnNext("b"); 的调用导致 b 的发布,由 first 订阅者显示。

接下来,second 订阅,但是因为它是一个 Connected Observable,所以 second 只会接收订阅后发布的值。因此,对 connection2.OnNext("d"); 的调用由已连接的合并可观察对象发布给其订阅者,即 firstsecond.

理解通常用于解释反应式操作的弹珠图需要花费一些时间和精力。但是,它们提供了一种很好的图形方式来了解正在发生的事情。

如果您将代码更改为此...

var connection1 = new BehaviorSubject<string>("a");
var connection2 = new BehaviorSubject<string>("c");
var connection = Observable.Merge(connection1, connection2).StartWith(connection1.Value, connection2.Value).Publish();
connection.SubscribeLogger("first"); //Flip order, put Subscribe before connect
connection.Connect();
connection1.OnNext("b");
connection.SubscribeLogger("second");
connection2.OnNext("d");

然后你会得到这个输出:

first - OnNext(a) //From the start with
first - OnNext(c) //From the start with
first - OnNext(a) //From the behavior subject
first - OnNext(c) //From the behavior subject
first - OnNext(b)
first - OnNext(d)
second - OnNext(d)

Publish/Connect 将冷 observable 转换为热 observable。在你 Publish 之后,如果你想获得任何 "on-subscribe" 值,你需要在连接之前订阅。在您 Connect、none of the Cold 或 "on-subscribe" 之后,新订阅者的价值将达到。

这段代码似乎可以满足我的要求:

var subject1 = new BehaviorSubject<string>("a");
var subject2 = new BehaviorSubject<string>("c");
// Use separate merged observables to simulate multiple calls to getter
// under the hood which builds the new merged view of the subjects.
var connection = subject1.Merge(subject2);
var connection2 = subject1.Merge(subject2);

connection.SubscribeLogger("first");
subject1.OnNext("b");
connection2.SubscribeLogger("second");
subject2.OnNext("d");

输出:

first - OnNext(a)
first - OnNext(c)
first - OnNext(b)
second - OnNext(b)
second - OnNext(c)
first - OnNext(d)
second - OnNext(d)