如何连接到源?
How to concat to sources?
我有以下代码:
const Cycle = require('@cycle/core');
const {Observable} = require('rx');
function main(sources) {
const B$ = sources.driverA
.concat(Observable.just('b'))
.concat(Observable.just('c'));
const C$ = sources.driverB.map(x => x.toUpperCase());
return {
driverA: Observable.just('a'),
driverB: B$,
driverC: C$
}
}
Cycle.run(main, {
driverA: (A$) => A$,
driverB: (B$) => B$,
driverC: msg$ => { msg$.subscribe(msg => console.log(msg)) }
});
我希望在控制台上得到三行:A、B 和 C,但我只得到最后一行。看起来,即使 B$.tap(console.log) 输出所有三个 ("a", "b", "c");[=11,driverC 也只会收到最后一条消息=]
对此行为的解释是什么?如何将所有三个消息传播到 driverC?
版本:
- @cycle/core@6.0.3
- rx@4.1.0
行为解释
其实这并不容易解释。这是由于 cycle.run
如何连接其循环。 trycicle 中的以下代码 运行 :
const Cycle = require('@cycle/core');
const {Observable} = require('rx');
function main(sources) {
const B$ = sources.driverA
.concat(Observable.just('b'))
.concat(Observable.just('c'))
.concat(Observable.just('d'));
const C$ = sources.driverB.map(x => x.toUpperCase());
return {
driverA: Observable.just('a'),
driverB: B$,
driverC: C$
}
}
Cycle.run(main, {
driverA: (A$) => A$.tap(msg => console.log(msg)),
driverB: (B$) => B$.tap(msg => console.log(msg)),
driverC: msg$ => { msg$.subscribe(msg => console.log(msg)) }
});
并且只显示a d D
。所以它实际上是显示的最后一个字母。
现在如果你运行这个:
const Cycle = require('@cycle/core');
const {Observable} = require('rx');
function main(sources) {
const B$ = sources.driverA
.concat(Observable.just('b').delay(1))
.concat(Observable.just('c'))
.concat(Observable.just('d'));
const C$ = sources.driverB.map(x => x.toUpperCase());
return {
driverA: Observable.just('a'),
driverB: B$,
driverC: C$
}
}
Cycle.run(main, {
driverA: (A$) => A$.tap(msg => console.log(msg)),
driverB: (B$) => B$.tap(msg => console.log(msg)),
driverC: msg$ => { msg$.subscribe(msg => console.log(msg)) }
});
你得到 a a A b B c C d D
,这是你所期望的。
发生的事情是 run
通过主题将驱动程序连接到源,并按顺序进行。哪个顺序? var x in obj
中的属性枚举顺序未指定,因此不能依赖 - 可能依赖于浏览器(参见 )). Now chrome
and firefox
latest version seem to enumerate properties in definition order for alphanumerical properties but numerical order for numerical properties (following ES2015 spec)。
所以在这里,driverA
首先连接到源,它启动相应的数据流。当 driverB
连接到源时,同样的事情。由于您的写法 B$
,该数据流是 同步的 。因此,当 subscribe
即接线完成时,所有数据 a b c d
从 B$
同步流出,而当 driverC
接线时, B$
已经完成。鉴于接线是用 replaySubject(1)
进行的,该接线会为您提供完成前的最后一个发射值,即 d
.
所以在这里,由于同步性,顺序很重要:如果先连接 B 和 C,就可以了。不幸的是你的执行顺序不合适。
为了让您相信这一点,我按拓扑顺序对流进行排序的代码按预期工作:
const Cycle = require('@cycle/core');
const {Observable} = require('rx');
function main(sources) {
const B$ = sources.driverA
.concat(Observable.just('b'))
.concat(Observable.just('c'))
.concat(Observable.just('d'));
const C$ = sources.driverB.map(x => x.toUpperCase());
return {
driverC: C$,
driverB: B$,
driverA: Observable.just('a'),
}
}
Cycle.run(main, {
driverA: (A$) => A$.tap(msg => console.log(msg)),
driverB: (B$) => B$.tap(msg => console.log(msg)),
driverC: msg$ => { msg$.subscribe(msg => console.log(msg)) }})
如何传播所有三个消息
好吧,要么按拓扑顺序排列接收器,要么删除同步性。当 driverC
已经连线以接收下一个值时,我添加了一个 delay(1)
以使数据流在下一个滴答时继续。这可能是最可靠的选项,因为拓扑顺序可能并不总是像这里那样计算起来很明显,可能会随着源的交错而改变,并且依赖于浏览器相关的对象 属性 枚举(!)。
单独说明,当无法避免数据流的同步性时,您通常通过先使用 publish
连接所有源,然后 connect
来处理连接问题,以便当数据流动时,所有源都已准备好接收它。
在driverA
后加一个delay
即可:
const B$ = sources.driverA.delay(1)
或者,您可以在列出所有可观察值的情况下唤起 concat
一次,然后延迟它。
const B$ = Observable.concat(
sources.driverA,
Observable.just('b'),
Observable.just('c'),
Observable.just('d')
).delay(1);
需要记住的一点是,main
函数只是连接管道。 run
打开水。
我有以下代码:
const Cycle = require('@cycle/core');
const {Observable} = require('rx');
function main(sources) {
const B$ = sources.driverA
.concat(Observable.just('b'))
.concat(Observable.just('c'));
const C$ = sources.driverB.map(x => x.toUpperCase());
return {
driverA: Observable.just('a'),
driverB: B$,
driverC: C$
}
}
Cycle.run(main, {
driverA: (A$) => A$,
driverB: (B$) => B$,
driverC: msg$ => { msg$.subscribe(msg => console.log(msg)) }
});
我希望在控制台上得到三行:A、B 和 C,但我只得到最后一行。看起来,即使 B$.tap(console.log) 输出所有三个 ("a", "b", "c");[=11,driverC 也只会收到最后一条消息=]
对此行为的解释是什么?如何将所有三个消息传播到 driverC?
版本:
- @cycle/core@6.0.3
- rx@4.1.0
行为解释
其实这并不容易解释。这是由于 cycle.run
如何连接其循环。 trycicle 中的以下代码 运行 :
const Cycle = require('@cycle/core');
const {Observable} = require('rx');
function main(sources) {
const B$ = sources.driverA
.concat(Observable.just('b'))
.concat(Observable.just('c'))
.concat(Observable.just('d'));
const C$ = sources.driverB.map(x => x.toUpperCase());
return {
driverA: Observable.just('a'),
driverB: B$,
driverC: C$
}
}
Cycle.run(main, {
driverA: (A$) => A$.tap(msg => console.log(msg)),
driverB: (B$) => B$.tap(msg => console.log(msg)),
driverC: msg$ => { msg$.subscribe(msg => console.log(msg)) }
});
并且只显示a d D
。所以它实际上是显示的最后一个字母。
现在如果你运行这个:
const Cycle = require('@cycle/core');
const {Observable} = require('rx');
function main(sources) {
const B$ = sources.driverA
.concat(Observable.just('b').delay(1))
.concat(Observable.just('c'))
.concat(Observable.just('d'));
const C$ = sources.driverB.map(x => x.toUpperCase());
return {
driverA: Observable.just('a'),
driverB: B$,
driverC: C$
}
}
Cycle.run(main, {
driverA: (A$) => A$.tap(msg => console.log(msg)),
driverB: (B$) => B$.tap(msg => console.log(msg)),
driverC: msg$ => { msg$.subscribe(msg => console.log(msg)) }
});
你得到 a a A b B c C d D
,这是你所期望的。
发生的事情是 run
通过主题将驱动程序连接到源,并按顺序进行。哪个顺序? var x in obj
中的属性枚举顺序未指定,因此不能依赖 - 可能依赖于浏览器(参见 chrome
and firefox
latest version seem to enumerate properties in definition order for alphanumerical properties but numerical order for numerical properties (following ES2015 spec)。
所以在这里,driverA
首先连接到源,它启动相应的数据流。当 driverB
连接到源时,同样的事情。由于您的写法 B$
,该数据流是 同步的 。因此,当 subscribe
即接线完成时,所有数据 a b c d
从 B$
同步流出,而当 driverC
接线时, B$
已经完成。鉴于接线是用 replaySubject(1)
进行的,该接线会为您提供完成前的最后一个发射值,即 d
.
所以在这里,由于同步性,顺序很重要:如果先连接 B 和 C,就可以了。不幸的是你的执行顺序不合适。
为了让您相信这一点,我按拓扑顺序对流进行排序的代码按预期工作:
const Cycle = require('@cycle/core');
const {Observable} = require('rx');
function main(sources) {
const B$ = sources.driverA
.concat(Observable.just('b'))
.concat(Observable.just('c'))
.concat(Observable.just('d'));
const C$ = sources.driverB.map(x => x.toUpperCase());
return {
driverC: C$,
driverB: B$,
driverA: Observable.just('a'),
}
}
Cycle.run(main, {
driverA: (A$) => A$.tap(msg => console.log(msg)),
driverB: (B$) => B$.tap(msg => console.log(msg)),
driverC: msg$ => { msg$.subscribe(msg => console.log(msg)) }})
如何传播所有三个消息
好吧,要么按拓扑顺序排列接收器,要么删除同步性。当 driverC
已经连线以接收下一个值时,我添加了一个 delay(1)
以使数据流在下一个滴答时继续。这可能是最可靠的选项,因为拓扑顺序可能并不总是像这里那样计算起来很明显,可能会随着源的交错而改变,并且依赖于浏览器相关的对象 属性 枚举(!)。
单独说明,当无法避免数据流的同步性时,您通常通过先使用 publish
连接所有源,然后 connect
来处理连接问题,以便当数据流动时,所有源都已准备好接收它。
在driverA
后加一个delay
即可:
const B$ = sources.driverA.delay(1)
或者,您可以在列出所有可观察值的情况下唤起 concat
一次,然后延迟它。
const B$ = Observable.concat(
sources.driverA,
Observable.just('b'),
Observable.just('c'),
Observable.just('d')
).delay(1);
需要记住的一点是,main
函数只是连接管道。 run
打开水。