压缩两个可读流似乎没有产生任何东西
Zipping two readable streams does not seem to produce anything
我有以下功能:
const _ = require('highland');
module.exports =
(numbers /* Readable */, words /* Readable */, repeated_words /* Writable */) => {
const numberStream = _(numbers);
const wordStream = _(words);
numberStream
.zip(wordStream)
.flatMap((numberWordPair) => {
const result = [];
for (let i = 0; i < numberWordPair[0]; i++) {
result.push(numberWordPair[1]);
}
return _(result);
})
.pipe(repeated_words);
};
流参数是自动注入的,我 100% 确定流注入有效(其他流函数有效)。
当我用 _(numbers).each(xs => {console.log(xs)})
这样简单的东西替换这个有点复杂的转换时,我可以看到正在记录的数据。
但是,在这里,Highland.js 显然我遗漏了一些东西,因为根本没有产生任何东西。
我正在使用 Highland.js 的 2.13.5 版。
我错过了什么?
看来一切正常,所以错误一定出在其他地方。
作为证明,我能够 运行 这个小程序:
const {TextDecoder} = require('util');
const repeater = require('./lib/repeater');
const {PassThrough, Readable, Transform} = require('stream');
class ConstantSource extends Readable {
constructor(options, constant) {
super(options);
this.constant = constant;
}
_read(size) {
for (let i = 0; i < size; i++) {
this.push(this.constant);
}
}
}
class StdinSource extends Readable {
constructor(options) {
super(options);
this.decoder = new TextDecoder('utf8');
process.stdin.on('data', (chunk) => {
this.push(this.decoder.decode(chunk).trim());
});
}
_read(size) {
}
}
class StdoutSink extends Transform {
constructor(options) {
super(options);
this.pipe(process.stdout);
}
_transform(chunk, _, callback) {
callback(null, '\x1b[33m' + chunk + '\x1b[0m\n');
}
}
const main = () => {
if (process.argv.length < 3) {
console.error('Please specify a number.');
process.exit(1);
}
const options = {objectMode: true};
const inputStream1 = new ConstantSource(options, process.argv[2]);
const inputStream2 = new StdinSource(options);
const outputStream = new StdoutSink(options);
repeater(inputStream1, inputStream2, outputStream);
}
main();
我有以下功能:
const _ = require('highland');
module.exports =
(numbers /* Readable */, words /* Readable */, repeated_words /* Writable */) => {
const numberStream = _(numbers);
const wordStream = _(words);
numberStream
.zip(wordStream)
.flatMap((numberWordPair) => {
const result = [];
for (let i = 0; i < numberWordPair[0]; i++) {
result.push(numberWordPair[1]);
}
return _(result);
})
.pipe(repeated_words);
};
流参数是自动注入的,我 100% 确定流注入有效(其他流函数有效)。
当我用 _(numbers).each(xs => {console.log(xs)})
这样简单的东西替换这个有点复杂的转换时,我可以看到正在记录的数据。
但是,在这里,Highland.js 显然我遗漏了一些东西,因为根本没有产生任何东西。
我正在使用 Highland.js 的 2.13.5 版。
我错过了什么?
看来一切正常,所以错误一定出在其他地方。 作为证明,我能够 运行 这个小程序:
const {TextDecoder} = require('util');
const repeater = require('./lib/repeater');
const {PassThrough, Readable, Transform} = require('stream');
class ConstantSource extends Readable {
constructor(options, constant) {
super(options);
this.constant = constant;
}
_read(size) {
for (let i = 0; i < size; i++) {
this.push(this.constant);
}
}
}
class StdinSource extends Readable {
constructor(options) {
super(options);
this.decoder = new TextDecoder('utf8');
process.stdin.on('data', (chunk) => {
this.push(this.decoder.decode(chunk).trim());
});
}
_read(size) {
}
}
class StdoutSink extends Transform {
constructor(options) {
super(options);
this.pipe(process.stdout);
}
_transform(chunk, _, callback) {
callback(null, '\x1b[33m' + chunk + '\x1b[0m\n');
}
}
const main = () => {
if (process.argv.length < 3) {
console.error('Please specify a number.');
process.exit(1);
}
const options = {objectMode: true};
const inputStream1 = new ConstantSource(options, process.argv[2]);
const inputStream2 = new StdinSource(options);
const outputStream = new StdoutSink(options);
repeater(inputStream1, inputStream2, outputStream);
}
main();