自定义可观察对象无法链接
Custom observable not able to be chained
我正在尝试创建一个自定义可观察运算符 "write",它将链接到一个可观察流并输出可观察流的内容。
以下代码有效:
RX.Observable.write = function() {
return RX.Observable.create(function (observer) {
try{
observer.onNext("this is a test message");
observer.onCompleted();
} catch(exception){
observer.onError(exception);
}
});
};
var observable = RX.Observable.write();
var subscription = observable.subscribe( function (x) {
console.log(x);
} );
但是添加了 "range(0, 5)" 的以下代码不起作用并引发异常:
var observable = RX.Observable.range(0,5).write();
^
TypeError: Object #<RangeObservable> has no method 'write'
at Object.<anonymous> (/Users/ericbroda/Development/rxstream/index.js:15:43)
at Module._compile (module.js:456:26)
at Object.Module._extensions..js (module.js:474:10)
at Module.load (module.js:356:32)
at Function.Module._load (module.js:312:12)
at Function.Module.runMain (module.js:497:10)
at startup (node.js:119:16)
at node.js:906:3
可能有一件我想念的简单的事情,但我现在却忘记了。
感谢任何帮助。
您正在 Rx.Observable 对象上放置 write,本质上是创建一个静态方法。如果你想让它在链的中间工作,你需要将它添加到原型 'Rx.Observable.prototype.write'.
尽管如此,这可能不会如您所愿。它将忽略传入的 RangeObservable
并简单地 return "this is a test message" 一旦完成。所以在实例方法中你需要处理传入的 Observable,即 this
.
Rx.Observable.prototype.write = function() {
var source = this;
return source.map(/*Do something with incoming data*/);
};
为了完整起见,我想 post 我制定的最终解决方案——我现在可以使用 RX 可观察模式将可观察对象写入节点流:
var RX = require("rx");
var util = require('util');
RX.Observable.prototype.write = function(stream, encoding, cb) {
var source = this;
return new RX.AnonymousObservable(function(observer) {
return source.subscribe (
function(x) {
observer.onNext(x);
stream.write(x, encoding, cb(data));
},
function(e) {
observer.onError(e);
},
function() {
observer.onCompleted();
},
source);
return function() {
// console.log('disposed');
};
});
};
上面的"writable" observable 可以如下使用。该示例使用节点应用程序将可观察流写入节点的标准输出以进行说明,但可以使用任何流,例如文件(注意:您需要以适当的方式处理编码)。这使得使用常规 RX 可观察模式编写可观察对象变得非常简单。
var RX = require("rx");
var util = require('util');
var observable = RX.Observable.range(0,5).write(process.stdout, 'utf8', function(x){
console.log("stream completion callback called, x: %s", x);
});
var subscription = observable.subscribe( function (x) {
console.dir(x);
} );
我正在尝试创建一个自定义可观察运算符 "write",它将链接到一个可观察流并输出可观察流的内容。
以下代码有效:
RX.Observable.write = function() {
return RX.Observable.create(function (observer) {
try{
observer.onNext("this is a test message");
observer.onCompleted();
} catch(exception){
observer.onError(exception);
}
});
};
var observable = RX.Observable.write();
var subscription = observable.subscribe( function (x) {
console.log(x);
} );
但是添加了 "range(0, 5)" 的以下代码不起作用并引发异常:
var observable = RX.Observable.range(0,5).write();
^
TypeError: Object #<RangeObservable> has no method 'write'
at Object.<anonymous> (/Users/ericbroda/Development/rxstream/index.js:15:43)
at Module._compile (module.js:456:26)
at Object.Module._extensions..js (module.js:474:10)
at Module.load (module.js:356:32)
at Function.Module._load (module.js:312:12)
at Function.Module.runMain (module.js:497:10)
at startup (node.js:119:16)
at node.js:906:3
可能有一件我想念的简单的事情,但我现在却忘记了。
感谢任何帮助。
您正在 Rx.Observable 对象上放置 write,本质上是创建一个静态方法。如果你想让它在链的中间工作,你需要将它添加到原型 'Rx.Observable.prototype.write'.
尽管如此,这可能不会如您所愿。它将忽略传入的 RangeObservable
并简单地 return "this is a test message" 一旦完成。所以在实例方法中你需要处理传入的 Observable,即 this
.
Rx.Observable.prototype.write = function() {
var source = this;
return source.map(/*Do something with incoming data*/);
};
为了完整起见,我想 post 我制定的最终解决方案——我现在可以使用 RX 可观察模式将可观察对象写入节点流:
var RX = require("rx");
var util = require('util');
RX.Observable.prototype.write = function(stream, encoding, cb) {
var source = this;
return new RX.AnonymousObservable(function(observer) {
return source.subscribe (
function(x) {
observer.onNext(x);
stream.write(x, encoding, cb(data));
},
function(e) {
observer.onError(e);
},
function() {
observer.onCompleted();
},
source);
return function() {
// console.log('disposed');
};
});
};
上面的"writable" observable 可以如下使用。该示例使用节点应用程序将可观察流写入节点的标准输出以进行说明,但可以使用任何流,例如文件(注意:您需要以适当的方式处理编码)。这使得使用常规 RX 可观察模式编写可观察对象变得非常简单。
var RX = require("rx");
var util = require('util');
var observable = RX.Observable.range(0,5).write(process.stdout, 'utf8', function(x){
console.log("stream completion callback called, x: %s", x);
});
var subscription = observable.subscribe( function (x) {
console.dir(x);
} );