如何在 rxjs 中使用节点的转换流?
How to use node's transform stream in rxjs?
我使用 rxjs
已经有一段时间了,我喜欢将它的运算符用于逻辑而不是命令式编程。
然而,我也喜欢节点的流,它也是高度可组合的,所以我的明显反应是同时使用它们,但除了在 rxjs 的 book.
所以,我的问题真的是,我如何在 RxJS 上使用 npm 中的所有转换流?或者,有可能吗?
示例:-
var fs = require('fs');
var csv = require('csv-parse')({delimiter:';'});
var src = fs.createReadStream('./myFile.csv');
src.pipe(csv).pipe(process.stdout);
基本上,我想这样做:-
var fs = require('fs');
var csv = require('csv-parse')({delimiter:';'});
var rx= require('rx-node');
var src = fs.createReadStream('./myFile.csv');
var obj = rx.fromReadableStream(src);
obj.pipe(csb).map(x=>console.log(x));
过去有人告诉我使用 highland
,但我在这里严格寻找 rxjs
解决方案。
您不一定要使用 rx-node
,但您可以!记住:All streams are event emitters!
.
准备:
input.txt
Hello World!
Hello World!
Hello World!
Hello World!
Hello World!
运行:
npm install through2 split2 rx rx-node
并且在 index.js
:
var Rx = require('rx');
Rx.Node = require('rx-node');
var fs = require('fs');
var th2 = require('through2');
var split2 = require('split2');
var file = fs.createReadStream('./input.txt').on('error', console.log.bind(console, 'fs err'));
var transform = th2(function(ch, en, cb) {
cb(null, ch.toString());
}).on('error', function(err) {
console.log(err, err.toString());
});
// All streams are event emitters ! (one way without using rx-node)
// var subs = Rx.Observable.fromEvent(transform, 'data').share();
// subs
// .map(value => 'Begin line: ' + value)
// .subscribe(value => console.log(value));
// rx-node has convenience functions (another way)
var subs = Rx.Node.fromTransformStream(transform).share()
.map(value => 'Begin line: ' + value)
.subscribe(value => console.log(value));
file.pipe(split2()).pipe(transform);
输出:
Begin line: Hello World!
Begin line: Hello World!
Begin line: Hello World!
Begin line: Hello World!
Begin line: Hello World!
EdinM 给出了一个将 RxJS 与节点转换流结合使用的通用示例,但您的原始问题仍未得到解答。由于几天前我有几乎相同的问题,所以我想努力为不熟悉将 RxJS 与 Node.js 结合使用的任何人回答这个问题。我不使用 csv-parse
模块,而是使用 csv-streamify
。让我们设置基本结构:
test_data.csv:
thing,name,owner,loc
chair,sitty,billy,san fran
table,setty,bryan,new oak
运行:
$ npm install rx rx-node csv-streamify
index.js:
"use strict";
const Rx = require('rx');
Rx.Node = require('rx-node');
const fs = require('fs');
const csv = require('csv-streamify');
//Setting up the transform-stream CSV parser
let config = {
delimiter: ',', // comma, semicolon, whatever
newline: '\n', // newline character (use \r\n for CRLF files)
quote: '"', // what's considered a quote
empty: '', // empty fields are replaced by this
//objectMode: true, //parses csv table into an array of objects
//columns: true //uses column headers for the object fields
};
let parseCsv = csv(config);
//Setting up the RxJS Observer
function onNext (x) {
//do your side-effects here, after the data has
//gone through the observables operator chain
console.log('Next: ' + x);
};
function onError (err) {
console.log('Error: ' + err);
};
function onComplete () {
console.log('Completed');
};
let readStream = fs.createReadStream('test_files/test_data.csv');
readStream.pipe(parseCsv);
let subscription = Rx.Node.fromTransformStream(parseCsv)
//do something with the data with an operator such as:
//.map()
.subscribe(onNext, onError, onComplete);
现在让我们运行代码:
$ node index.js
我们会得到这个输出:
Next: ["thing","name","owner","loc\r"]
Next: ["chair","sitty","billy","san fran\r"]
Next: ["table","setty","bryan","new oak"]
Completed
结语
如果您在 csv 配置对象中将 objectMode
和 columns
设置为 true,然后使用映射运算符投影此 sideEffect
函数,如下所示:
function sideEffect (v){
console.log(v)
return v;
};
let subscription = Rx.Node.fromTransformStream(parseCsv)
.map(sideEffect)
.subscribe(onNext, onError, onComplete);
你会得到这个输出:
{ thing: 'chair',
name: 'sitty',
owner: 'billy',
'loc\r': 'san fran\r' }
Next: [object Object]
{ thing: 'table',
name: 'setty',
owner: 'bryan',
'loc\r': 'new oak' }
Next: [object Object]
Completed
我使用 rxjs
已经有一段时间了,我喜欢将它的运算符用于逻辑而不是命令式编程。
然而,我也喜欢节点的流,它也是高度可组合的,所以我的明显反应是同时使用它们,但除了在 rxjs 的 book.
所以,我的问题真的是,我如何在 RxJS 上使用 npm 中的所有转换流?或者,有可能吗?
示例:-
var fs = require('fs');
var csv = require('csv-parse')({delimiter:';'});
var src = fs.createReadStream('./myFile.csv');
src.pipe(csv).pipe(process.stdout);
基本上,我想这样做:-
var fs = require('fs');
var csv = require('csv-parse')({delimiter:';'});
var rx= require('rx-node');
var src = fs.createReadStream('./myFile.csv');
var obj = rx.fromReadableStream(src);
obj.pipe(csb).map(x=>console.log(x));
过去有人告诉我使用 highland
,但我在这里严格寻找 rxjs
解决方案。
您不一定要使用 rx-node
,但您可以!记住:All streams are event emitters!
.
准备:
input.txt
Hello World!
Hello World!
Hello World!
Hello World!
Hello World!
运行:
npm install through2 split2 rx rx-node
并且在 index.js
:
var Rx = require('rx');
Rx.Node = require('rx-node');
var fs = require('fs');
var th2 = require('through2');
var split2 = require('split2');
var file = fs.createReadStream('./input.txt').on('error', console.log.bind(console, 'fs err'));
var transform = th2(function(ch, en, cb) {
cb(null, ch.toString());
}).on('error', function(err) {
console.log(err, err.toString());
});
// All streams are event emitters ! (one way without using rx-node)
// var subs = Rx.Observable.fromEvent(transform, 'data').share();
// subs
// .map(value => 'Begin line: ' + value)
// .subscribe(value => console.log(value));
// rx-node has convenience functions (another way)
var subs = Rx.Node.fromTransformStream(transform).share()
.map(value => 'Begin line: ' + value)
.subscribe(value => console.log(value));
file.pipe(split2()).pipe(transform);
输出:
Begin line: Hello World!
Begin line: Hello World!
Begin line: Hello World!
Begin line: Hello World!
Begin line: Hello World!
EdinM 给出了一个将 RxJS 与节点转换流结合使用的通用示例,但您的原始问题仍未得到解答。由于几天前我有几乎相同的问题,所以我想努力为不熟悉将 RxJS 与 Node.js 结合使用的任何人回答这个问题。我不使用 csv-parse
模块,而是使用 csv-streamify
。让我们设置基本结构:
test_data.csv:
thing,name,owner,loc
chair,sitty,billy,san fran
table,setty,bryan,new oak
运行:
$ npm install rx rx-node csv-streamify
index.js:
"use strict";
const Rx = require('rx');
Rx.Node = require('rx-node');
const fs = require('fs');
const csv = require('csv-streamify');
//Setting up the transform-stream CSV parser
let config = {
delimiter: ',', // comma, semicolon, whatever
newline: '\n', // newline character (use \r\n for CRLF files)
quote: '"', // what's considered a quote
empty: '', // empty fields are replaced by this
//objectMode: true, //parses csv table into an array of objects
//columns: true //uses column headers for the object fields
};
let parseCsv = csv(config);
//Setting up the RxJS Observer
function onNext (x) {
//do your side-effects here, after the data has
//gone through the observables operator chain
console.log('Next: ' + x);
};
function onError (err) {
console.log('Error: ' + err);
};
function onComplete () {
console.log('Completed');
};
let readStream = fs.createReadStream('test_files/test_data.csv');
readStream.pipe(parseCsv);
let subscription = Rx.Node.fromTransformStream(parseCsv)
//do something with the data with an operator such as:
//.map()
.subscribe(onNext, onError, onComplete);
现在让我们运行代码:
$ node index.js
我们会得到这个输出:
Next: ["thing","name","owner","loc\r"]
Next: ["chair","sitty","billy","san fran\r"]
Next: ["table","setty","bryan","new oak"]
Completed
结语
如果您在 csv 配置对象中将 objectMode
和 columns
设置为 true,然后使用映射运算符投影此 sideEffect
函数,如下所示:
function sideEffect (v){
console.log(v)
return v;
};
let subscription = Rx.Node.fromTransformStream(parseCsv)
.map(sideEffect)
.subscribe(onNext, onError, onComplete);
你会得到这个输出:
{ thing: 'chair',
name: 'sitty',
owner: 'billy',
'loc\r': 'san fran\r' }
Next: [object Object]
{ thing: 'table',
name: 'setty',
owner: 'bryan',
'loc\r': 'new oak' }
Next: [object Object]
Completed