Node.js 流与可观察对象
Node.js Streams vs. Observables
了解Observables, I find them quite similar to Node.js streams后。两者都有一种机制,可以在新数据到达、发生错误或没有更多数据 (EOF) 时通知消费者。
我很想了解两者之间的 conceptual/functional 区别。谢谢!
Observables 和 node.js 的 Streams 都允许您解决相同的潜在问题:异步处理一系列值。我认为,两者之间的主要区别与其出现的背景有关。该上下文反映在术语和 API.
中
在 Observables 方面,您有一个 EcmaScript 扩展,它引入了反应式编程模型。它试图用 Observer
和 Observable
.
的极简主义和可组合的概念来填补价值生成和异步性之间的空白。
在 node.js 和 Streams 方面,您想为网络流和本地文件的异步和高性能处理创建一个接口。该术语源自该初始上下文,您会得到 pipe
、chunk
、encoding
、flush
、Duplex
、Buffer
等。一种为特定用例提供明确支持的务实方法,您会失去一些组合事物的能力,因为它不那么统一。例如,您在 Readable
流上使用 push
,在 Writable
上使用 write
,尽管从概念上讲,您在做同样的事情:发布一个值。
因此,在实践中,如果您查看这些概念,并且如果您使用选项 { objectMode: true }
,则可以将 Observable
与 Readable
流匹配,并且 Observer
与 Writable
流。您甚至可以在两个模型之间创建一些简单的适配器。
var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var util = require('util');
var Observable = function(subscriber) {
this.subscribe = subscriber;
}
var Subscription = function(unsubscribe) {
this.unsubscribe = unsubscribe;
}
Observable.fromReadable = function(readable) {
return new Observable(function(observer) {
function nop() {};
var nextFn = observer.next ? observer.next.bind(observer) : nop;
var returnFn = observer.return ? observer.return.bind(observer) : nop;
var throwFn = observer.throw ? observer.throw.bind(observer) : nop;
readable.on('data', nextFn);
readable.on('end', returnFn);
readable.on('error', throwFn);
return new Subscription(function() {
readable.removeListener('data', nextFn);
readable.removeListener('end', returnFn);
readable.removeListener('error', throwFn);
});
});
}
var Observer = function(handlers) {
function nop() {};
this.next = handlers.next || nop;
this.return = handlers.return || nop;
this.throw = handlers.throw || nop;
}
Observer.fromWritable = function(writable, shouldEnd, throwFn) {
return new Observer({
next: writable.write.bind(writable),
return: shouldEnd ? writable.end.bind(writable) : function() {},
throw: throwFn
});
}
您可能已经注意到我更改了一些名称并使用了此处介绍的 Observer
和 Subscription
等更简单的概念,以避免由 Observables 完成的职责过载 在 Generator
中。基本上,Subscription
允许您取消订阅 Observable
。无论如何,使用上面的代码你可以得到一个 pipe
.
Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout));
与 process.stdin.pipe(process.stdout)
相比,您拥有的是一种组合、过滤和转换流的方法,该方法也适用于任何其他数据序列。您可以使用 Readable
、Transform
和 Writable
流来实现它,但是 API 支持子类化而不是链接 Readable
和应用函数。例如,在 Observable
模型上,转换值对应于将转换器函数应用于流。它不需要 Transform
.
的新子类型
Observable.just = function(/*... arguments*/) {
var values = arguments;
return new Observable(function(observer) {
[].forEach.call(values, function(value) {
observer.next(value);
});
observer.return();
return new Subscription(function() {});
});
};
Observable.prototype.transform = function(transformer) {
var source = this;
return new Observable(function(observer) {
return source.subscribe({
next: function(v) {
observer.next(transformer(v));
},
return: observer.return.bind(observer),
throw: observer.throw.bind(observer)
});
});
};
Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify)
.subscribe(Observer.fromWritable(process.stdout))
结论?很容易在任何地方引入反应模型和 Observable
概念。围绕该概念实施整个库更加困难。所有这些小功能都需要始终如一地协同工作。毕竟,ReactiveX 项目仍在进行中。但是如果你真的需要将文件内容发送到客户端,处理编码,然后压缩它,那么它就在 NodeJS 中,并且它工作得很好。
了解Observables, I find them quite similar to Node.js streams后。两者都有一种机制,可以在新数据到达、发生错误或没有更多数据 (EOF) 时通知消费者。
我很想了解两者之间的 conceptual/functional 区别。谢谢!
Observables 和 node.js 的 Streams 都允许您解决相同的潜在问题:异步处理一系列值。我认为,两者之间的主要区别与其出现的背景有关。该上下文反映在术语和 API.
中在 Observables 方面,您有一个 EcmaScript 扩展,它引入了反应式编程模型。它试图用 Observer
和 Observable
.
在 node.js 和 Streams 方面,您想为网络流和本地文件的异步和高性能处理创建一个接口。该术语源自该初始上下文,您会得到 pipe
、chunk
、encoding
、flush
、Duplex
、Buffer
等。一种为特定用例提供明确支持的务实方法,您会失去一些组合事物的能力,因为它不那么统一。例如,您在 Readable
流上使用 push
,在 Writable
上使用 write
,尽管从概念上讲,您在做同样的事情:发布一个值。
因此,在实践中,如果您查看这些概念,并且如果您使用选项 { objectMode: true }
,则可以将 Observable
与 Readable
流匹配,并且 Observer
与 Writable
流。您甚至可以在两个模型之间创建一些简单的适配器。
var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var util = require('util');
var Observable = function(subscriber) {
this.subscribe = subscriber;
}
var Subscription = function(unsubscribe) {
this.unsubscribe = unsubscribe;
}
Observable.fromReadable = function(readable) {
return new Observable(function(observer) {
function nop() {};
var nextFn = observer.next ? observer.next.bind(observer) : nop;
var returnFn = observer.return ? observer.return.bind(observer) : nop;
var throwFn = observer.throw ? observer.throw.bind(observer) : nop;
readable.on('data', nextFn);
readable.on('end', returnFn);
readable.on('error', throwFn);
return new Subscription(function() {
readable.removeListener('data', nextFn);
readable.removeListener('end', returnFn);
readable.removeListener('error', throwFn);
});
});
}
var Observer = function(handlers) {
function nop() {};
this.next = handlers.next || nop;
this.return = handlers.return || nop;
this.throw = handlers.throw || nop;
}
Observer.fromWritable = function(writable, shouldEnd, throwFn) {
return new Observer({
next: writable.write.bind(writable),
return: shouldEnd ? writable.end.bind(writable) : function() {},
throw: throwFn
});
}
您可能已经注意到我更改了一些名称并使用了此处介绍的 Observer
和 Subscription
等更简单的概念,以避免由 Observables 完成的职责过载 在 Generator
中。基本上,Subscription
允许您取消订阅 Observable
。无论如何,使用上面的代码你可以得到一个 pipe
.
Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout));
与 process.stdin.pipe(process.stdout)
相比,您拥有的是一种组合、过滤和转换流的方法,该方法也适用于任何其他数据序列。您可以使用 Readable
、Transform
和 Writable
流来实现它,但是 API 支持子类化而不是链接 Readable
和应用函数。例如,在 Observable
模型上,转换值对应于将转换器函数应用于流。它不需要 Transform
.
Observable.just = function(/*... arguments*/) {
var values = arguments;
return new Observable(function(observer) {
[].forEach.call(values, function(value) {
observer.next(value);
});
observer.return();
return new Subscription(function() {});
});
};
Observable.prototype.transform = function(transformer) {
var source = this;
return new Observable(function(observer) {
return source.subscribe({
next: function(v) {
observer.next(transformer(v));
},
return: observer.return.bind(observer),
throw: observer.throw.bind(observer)
});
});
};
Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify)
.subscribe(Observer.fromWritable(process.stdout))
结论?很容易在任何地方引入反应模型和 Observable
概念。围绕该概念实施整个库更加困难。所有这些小功能都需要始终如一地协同工作。毕竟,ReactiveX 项目仍在进行中。但是如果你真的需要将文件内容发送到客户端,处理编码,然后压缩它,那么它就在 NodeJS 中,并且它工作得很好。