如何从可观察到的第一个元素和 return 一个可观察到的流的其余部分?
How to get first element from observable and return an observable for the rest of the stream?
我正在尝试从类似 CSV 文件的文件中向 return objects 写入一个 Observable。我的问题是我想使用文件的 header 作为 Observer 下一个元素的 属性 名称。例如,如果我有这个 CSV:
symbol;word
1;one
2;two
3;three
我希望这发生:
getObjects().subscribe(o => console.log(o));
// should log this:
{ symbol: '1', word: 'one' }
{ symbol: '2', word: 'two' }
{ symbol: '3', word: 'three' }
我设法用这段代码做到了这一点:
const Rx = require('rxjs/Rx');
const _zipObject = require('lodash/zipObject');
const getObjects = () => {
const observable = Rx.Observable.of(
['symbol', 'word'],
['1', 'one'],
['2', 'two'],
['3', 'three']
).share();
let header;
return Rx.Observable.concat(
observable.first().do(line => header = line).filter(() => false),
observable.skip(1).map(line => _zipObject(header, line))
);
};
getObjects().subscribe(o => console.log(o));
它有效,但感觉不太好。我也考虑过使用 filter 来过滤第一个元素并设置 header,但感觉并不会好多少,而且缺点是每个元素都会调用 filter 回调。
您可以使用 combineLatest
同时获取 header 和日期,避免从 parent 范围访问它:
const Rx = require('rxjs/Rx');
const Observable = Rx.Observable;
const getObjects = () => {
const observable = Observable.of(
['symbol', 'word'],
['1', 'one'],
['2', 'two'],
['3', 'three']
).share();
const header$ = observable.take(1);
const data$ = observable.skip(1);
return Observable.combineLatest(header$, data$, (headers, line) => {
const result = {};
headers.forEach((header, i) => {
result[header] = line[i];
});
return result;
});
};
getObjects().subscribe(o => console.log(o));
这会打印:
{ symbol: '1', word: 'one' }
{ symbol: '2', word: 'two' }
{ symbol: '3', word: 'three' }
我正在尝试从类似 CSV 文件的文件中向 return objects 写入一个 Observable。我的问题是我想使用文件的 header 作为 Observer 下一个元素的 属性 名称。例如,如果我有这个 CSV:
symbol;word
1;one
2;two
3;three
我希望这发生:
getObjects().subscribe(o => console.log(o));
// should log this:
{ symbol: '1', word: 'one' }
{ symbol: '2', word: 'two' }
{ symbol: '3', word: 'three' }
我设法用这段代码做到了这一点:
const Rx = require('rxjs/Rx');
const _zipObject = require('lodash/zipObject');
const getObjects = () => {
const observable = Rx.Observable.of(
['symbol', 'word'],
['1', 'one'],
['2', 'two'],
['3', 'three']
).share();
let header;
return Rx.Observable.concat(
observable.first().do(line => header = line).filter(() => false),
observable.skip(1).map(line => _zipObject(header, line))
);
};
getObjects().subscribe(o => console.log(o));
它有效,但感觉不太好。我也考虑过使用 filter 来过滤第一个元素并设置 header,但感觉并不会好多少,而且缺点是每个元素都会调用 filter 回调。
您可以使用 combineLatest
同时获取 header 和日期,避免从 parent 范围访问它:
const Rx = require('rxjs/Rx');
const Observable = Rx.Observable;
const getObjects = () => {
const observable = Observable.of(
['symbol', 'word'],
['1', 'one'],
['2', 'two'],
['3', 'three']
).share();
const header$ = observable.take(1);
const data$ = observable.skip(1);
return Observable.combineLatest(header$, data$, (headers, line) => {
const result = {};
headers.forEach((header, i) => {
result[header] = line[i];
});
return result;
});
};
getObjects().subscribe(o => console.log(o));
这会打印:
{ symbol: '1', word: 'one' }
{ symbol: '2', word: 'two' }
{ symbol: '3', word: 'three' }