如何在 RxJS 中使用 node-walk?
How to use node-walk with RxJS?
Node walk 展示了一个 API,其中包含一些这样的事件。
walker.on('file', (root, filestats, next) => {
// next should be called to goto next file
next();
});
walker.on('end', () => {
// the end of the stream
});
如果您从 subscriber
调用一个函数来通知 source
转到流中的下一个项目,它是反应式的吗?事件不会等待订阅者对其做出反应,对吗?
如何将其转换为 Rx Observable?
尝试 Rx.Observable.fromCallback
:
var walkerOn = Rx.Observable.fromCallback(walker.on, walker)
var source = walkerOn('file');
var subscription = source.subscribe(observer);
最好的办法是围绕它创建一个包装器:
Rx.Observable.fromWalk = function(root, options, scheduler) {
scheduler = scheduler || Rx.Scheduler.currentThread;
return Rx.Observable.create(function(observer) {
var walker = walk.walk(root, options);
function fileHandler(x) {
observer.onNext({stats : x.stats, root : x.root});
scheduler.scheduleWithState(x, function(s, i) {
i.next();
});
}
var files = Rx.Observable.fromEvent(walker, 'file',
function(arr) {
return { root : arr[0], stats : arr[1], next : arr[2] };
});
var ended = Rx.Observable.fromEvent(walker, 'end');
return new Rx.CompositeDisposable(
files.subscribe(fileHandler),
ended.subscribe(observer.onCompleted.bind(observer))
);
});
};
我相应地更新了你的example
Node walk 展示了一个 API,其中包含一些这样的事件。
walker.on('file', (root, filestats, next) => {
// next should be called to goto next file
next();
});
walker.on('end', () => {
// the end of the stream
});
如果您从 subscriber
调用一个函数来通知 source
转到流中的下一个项目,它是反应式的吗?事件不会等待订阅者对其做出反应,对吗?
如何将其转换为 Rx Observable?
尝试 Rx.Observable.fromCallback
:
var walkerOn = Rx.Observable.fromCallback(walker.on, walker)
var source = walkerOn('file');
var subscription = source.subscribe(observer);
最好的办法是围绕它创建一个包装器:
Rx.Observable.fromWalk = function(root, options, scheduler) {
scheduler = scheduler || Rx.Scheduler.currentThread;
return Rx.Observable.create(function(observer) {
var walker = walk.walk(root, options);
function fileHandler(x) {
observer.onNext({stats : x.stats, root : x.root});
scheduler.scheduleWithState(x, function(s, i) {
i.next();
});
}
var files = Rx.Observable.fromEvent(walker, 'file',
function(arr) {
return { root : arr[0], stats : arr[1], next : arr[2] };
});
var ended = Rx.Observable.fromEvent(walker, 'end');
return new Rx.CompositeDisposable(
files.subscribe(fileHandler),
ended.subscribe(observer.onCompleted.bind(observer))
);
});
};
我相应地更新了你的example