如何连接到源?

How to concat to sources?

我有以下代码:

const Cycle = require('@cycle/core');
const {Observable} = require('rx');

function main(sources) {
    const B$ = sources.driverA
        .concat(Observable.just('b'))
        .concat(Observable.just('c'));

    const C$ = sources.driverB.map(x => x.toUpperCase());

    return {
        driverA: Observable.just('a'),
        driverB: B$,
        driverC: C$
    }
}

Cycle.run(main, {
    driverA: (A$) => A$,
    driverB: (B$) => B$,
    driverC: msg$ => { msg$.subscribe(msg => console.log(msg)) }
});

我希望在控制台上得到三行:A、B 和 C,但我只得到最后一行。看起来,即使 B$.tap(console.log) 输出所有三个 ("a", "b", "c");[=11,driverC 也只会收到最后一条消息=]

对此行为的解释是什么?如何将所有三个消息传播到 driverC?

版本:

行为解释

其实这并不容易解释。这是由于 cycle.run 如何连接其循环。 trycicle 中的以下代码 运行 :

const Cycle = require('@cycle/core');
const {Observable} = require('rx');

function main(sources) {
    const B$ = sources.driverA
        .concat(Observable.just('b'))
        .concat(Observable.just('c'))
        .concat(Observable.just('d'));

    const C$ = sources.driverB.map(x => x.toUpperCase());

    return {
        driverA: Observable.just('a'),
        driverB: B$,
        driverC: C$
    }
}

Cycle.run(main, {
    driverA: (A$) => A$.tap(msg => console.log(msg)),
    driverB: (B$) => B$.tap(msg => console.log(msg)),
    driverC: msg$ => { msg$.subscribe(msg => console.log(msg)) }
});

并且只显示a d D。所以它实际上是显示的最后一个字母。

现在如果你运行这个:

const Cycle = require('@cycle/core');
const {Observable} = require('rx');

function main(sources) {
    const B$ = sources.driverA
        .concat(Observable.just('b').delay(1))
        .concat(Observable.just('c'))
        .concat(Observable.just('d'));

    const C$ = sources.driverB.map(x => x.toUpperCase());

    return {
        driverA: Observable.just('a'),
        driverB: B$,
        driverC: C$
    }
}

Cycle.run(main, {
    driverA: (A$) => A$.tap(msg => console.log(msg)),
    driverB: (B$) => B$.tap(msg => console.log(msg)),
    driverC: msg$ => { msg$.subscribe(msg => console.log(msg)) }
});

你得到 a a A b B c C d D,这是你所期望的。

发生的事情是 run 通过主题将驱动程序连接到源,并按顺序进行。哪个顺序? var x in obj 中的属性枚举顺序未指定,因此不能依赖 - 可能依赖于浏览器(参见 )). Now chrome and firefox latest version seem to enumerate properties in definition order for alphanumerical properties but numerical order for numerical properties (following ES2015 spec)。

所以在这里,driverA首先连接到源,它启动相应的数据流。当 driverB 连接到源时,同样的事情。由于您的写法 B$,该数据流是 同步的 。因此,当 subscribe 即接线完成时,所有数据 a b c dB$ 同步流出,而当 driverC 接线时, B$ 已经完成。鉴于接线是用 replaySubject(1) 进行的,该接线会为您提供完成前的最后一个发射值,即 d.

所以在这里,由于同步性,顺序很重要:如果先连接 B 和 C,就可以了。不幸的是你的执行顺序不合适。

为了让您相信这一点,我按拓扑顺序对流进行排序的代码按预期工作:

const Cycle = require('@cycle/core');
const {Observable} = require('rx');

function main(sources) {
    const B$ = sources.driverA
        .concat(Observable.just('b'))
        .concat(Observable.just('c'))
        .concat(Observable.just('d'));

    const C$ = sources.driverB.map(x => x.toUpperCase());

    return {
        driverC: C$,
        driverB: B$,
        driverA: Observable.just('a'),
    }
}

Cycle.run(main, {
    driverA: (A$) => A$.tap(msg => console.log(msg)),
    driverB: (B$) => B$.tap(msg => console.log(msg)),
    driverC: msg$ => { msg$.subscribe(msg => console.log(msg)) }})

如何传播所有三个消息

好吧,要么按拓扑顺序排列接收器,要么删除同步性。当 driverC 已经连线以接收下一个值时,我添加了一个 delay(1) 以使数据流在下一个滴答时继续。这可能是最可靠的选项,因为拓扑顺序可能并不总是像这里那样计算起来很明显,可能会随着源的交错而改变,并且依赖于浏览器相关的对象 属性 枚举(!)。

单独说明,当无法避免数据流的同步性时,您通常通过先使用 publish 连接所有源,然后 connect 来处理连接问题,以便当数据流动时,所有源都已准备好接收它。

driverA后加一个delay即可:

const B$ = sources.driverA.delay(1)

WebpackBin example.

或者,您可以在列出所有可观察值的情况下唤起 concat 一次,然后延迟它。

const B$ = Observable.concat(
    sources.driverA,
    Observable.just('b'),
    Observable.just('c'),
    Observable.just('d')
).delay(1);

WebpackBin example #2.

需要记住的一点是,main 函数只是连接管道。 run 打开水。