如何使用 highland.js 分叉流?

How to fork a stream using highland.js?

我有一个 sourceStreamBaseData 个对象组成。

我想将此流分成 n 数量的不同流,然后根据自己的喜好过滤和转换每个 BaseData 对象。

最后,我希望 n 流仅包含特定类型,并且分叉流的长度可以不同,因为将来可能会删除或添加数据。

我想我可以通过 fork:

进行设置
import * as _ from 'highland';

interface BaseData {
    id: string;
    data: string;
}

const sourceStream = _([
    {id: 'foo', data: 'poit'},
    {id: 'foo', data: 'fnord'},
    {id: 'bar', data: 'narf'}]);

const partners = [
    'foo',
    'bar',
];

partners.forEach((partner: string) => {
    const partnerStream = sourceStream.fork();

    partnerStream.filter((baseData: BaseData) => {
        return baseData.id === partner;
    });

    partnerStream.each(console.log);
});

我希望现在有两个流,foo-stream 包含两个元素:

{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }

bar-stream 包含一个元素:

{ id: 'bar', data: 'narf' }

然而我却得到一个错误:

/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338
        throw new Error(
        ^

Error: Stream already being consumed, you must either fork() or observe()
    at Stream._addConsumer (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338:15)
    at Stream.consume (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1500:10)
    at Stream.each (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1774:18)
    at partners.forEach (/usr/src/marketing-tasks/dist/bin/example.js:17:19)
    at Array.forEach (native)
    at Object.<anonymous> (/usr/src/marketing-tasks/dist/bin/example.js:12:10)
    at Module._compile (module.js:570:32)
    at Object.Module._extensions..js (module.js:579:10)
    at Module.load (module.js:487:32)
    at tryModuleLoad (module.js:446:12)

如何将一个流分成多个流?


我也试过链接调用,但我只得到一个流的结果:

partners.forEach((partner: string) => {
    console.log(partner);
    const partnerStream = sourceStream
        .fork()
        .filter((item: BaseData) => {
            return item.id === partner;
        });

    partnerStream.each((item: BaseData) => {
        console.log(item);
    });
});

仅打印:

foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar

而不是预期的:

foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
{id: 'bar', data: 'narf'}

也可能是我误会是fork的情况。根据 its doc entry:

Stream.fork() Forks a stream, allowing you to add additional consumers with shared back-pressure. A stream forked to multiple consumers will only pull values from its source as fast as the slowest consumer can handle them.

NOTE: Do not depend on a consistent execution order between the forks. This transform only guarantees that all forks will process a value foo before any will process a second value bar. It does not guarantee the order in which the forks process foo.

TIP: Be careful about modifying stream values within the forks (or using a library that does so). Since the same value will be passed to every fork, changes made in one fork will be visible in any fork that executes after it. Add to that the inconsistent execution order, and you can end up with subtle data corruption bugs. If you need to modify any values, you should make a copy and modify the copy instead.

Deprecation warning: It is currently possible to fork a stream after consuming it (e.g., via a transform). This will no longer be possible in the next major release. If you are going to fork a stream, always call fork on it.

所以我的实际问题可能不是 "How to fork a stream?":如何将高地流动态复制到不同的流中?

partnerStream.filter()return一个流。然后,您将使用 partnerStream.each() 再次消耗 partnerStream,而无需调用 fork()observe()。因此,要么链接 partnerStream.filter().each() 调用,要么将 partnerStream.filter() 的 return 值分配给一个变量,然后在该变量上调用 .each()

必须记住,在创建所有分叉之前不要使用分叉流。就好像一个人消费一个分叉的流,它和它的 "parent" 将被消费,使得任何后续的分叉都从一个空流中分叉出来。

const partnerStreams: Array<Stream<BaseData>> = [];

partners.forEach((partner: string) => {
    const partnerStream = sourceStream
        .fork()
        .filter((item: BaseData) => {
            return item.id === partner;
         }
    );

    partnerStreams.push(partnerStream);
});

partnerStreams.forEach((stream, index) => {
    console.log(index, stream);
    stream.toArray((foo) => {
        console.log(index, foo);
    });
});

它打印:

0 [ { id: 'foo', data: 'poit' }, { id: 'foo', data: 'fnord' } ]
1 [ { id: 'bar', data: 'narf' } ]