RxJS:我将如何 "manually" 更新一个 Observable?
RxJS: How would I "manually" update an Observable?
我想我一定是误解了一些基本的东西,因为在我看来这应该是可观察对象的最基本情况,但对于我的生活我无法从文档中弄清楚如何做到这一点。
基本上,我希望能够做到这一点:
// create a dummy observable, which I would update manually
var eventObservable = rx.Observable.create(function(observer){});
var observer = eventObservable.subscribe(
function(x){
console.log('next: ' + x);
}
...
var my_function = function(){
eventObservable.push('foo');
//'push' adds an event to the datastream, the observer gets it and prints
// next: foo
}
但是我还没有找到像push
这样的方法。我将其用于点击处理程序,并且我知道他们为此提供了 Observable.fromEvent
,但我正在尝试将其与 React 一起使用,我宁愿能够简单地在回调中更新数据流,而不是使用完全不同的事件处理系统。所以基本上我想要这个:
$( "#target" ).click(function(e) {
eventObservable.push(e.target.text());
});
我得到的最接近的是使用 observer.onNext('foo')
,但这似乎并没有真正起作用,而且它是在观察者上调用的,这似乎不正确。观察者应该对数据流做出反应,而不是改变它,对吧?
我只是不明白 observer/observable 关系吗?
在 RX 中,Observer 和 Observable 是不同的实体。一个观察者订阅一个 Observable。 Observable 通过调用观察者的方法向其观察者发射项目。如果您需要在 Observable.create()
范围之外调用观察者方法,您可以使用 Subject,它是同时充当观察者和 Observable 的代理。
你可以这样做:
var eventStream = new Rx.Subject();
var subscription = eventStream.subscribe(
function (x) {
console.log('Next: ' + x);
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
var my_function = function() {
eventStream.next('foo');
}
您可以在此处找到有关主题的更多信息:
我相信 Observable.create()
不会将 observer 作为回调参数,而是将其作为发射器。所以如果你想给你的 Observable 添加一个新值试试这个:
var emitter;
var observable = Rx.Observable.create(e => emitter = e);
var observer = {
next: function(next) {
console.log(next);
},
error: function(error) {
console.log(error);
},
complete: function() {
console.log("done");
}
}
observable.subscribe(observer);
emitter.next('foo');
emitter.next('bar');
emitter.next('baz');
emitter.complete();
//console output
//"foo"
//"bar"
//"baz"
//"done"
是的 Subject 使它更容易,在同一个对象中提供 Observable 和 Observer,但它并不完全相同,因为 Subject 允许您订阅多个观察者到同一个可观察者,而一个可观察者只向最后一个订阅的观察者发送数据,所以要有意识地使用它。
JsBin 如果你想修改它,这里有一个。
我想我一定是误解了一些基本的东西,因为在我看来这应该是可观察对象的最基本情况,但对于我的生活我无法从文档中弄清楚如何做到这一点。
基本上,我希望能够做到这一点:
// create a dummy observable, which I would update manually
var eventObservable = rx.Observable.create(function(observer){});
var observer = eventObservable.subscribe(
function(x){
console.log('next: ' + x);
}
...
var my_function = function(){
eventObservable.push('foo');
//'push' adds an event to the datastream, the observer gets it and prints
// next: foo
}
但是我还没有找到像push
这样的方法。我将其用于点击处理程序,并且我知道他们为此提供了 Observable.fromEvent
,但我正在尝试将其与 React 一起使用,我宁愿能够简单地在回调中更新数据流,而不是使用完全不同的事件处理系统。所以基本上我想要这个:
$( "#target" ).click(function(e) {
eventObservable.push(e.target.text());
});
我得到的最接近的是使用 observer.onNext('foo')
,但这似乎并没有真正起作用,而且它是在观察者上调用的,这似乎不正确。观察者应该对数据流做出反应,而不是改变它,对吧?
我只是不明白 observer/observable 关系吗?
在 RX 中,Observer 和 Observable 是不同的实体。一个观察者订阅一个 Observable。 Observable 通过调用观察者的方法向其观察者发射项目。如果您需要在 Observable.create()
范围之外调用观察者方法,您可以使用 Subject,它是同时充当观察者和 Observable 的代理。
你可以这样做:
var eventStream = new Rx.Subject();
var subscription = eventStream.subscribe(
function (x) {
console.log('Next: ' + x);
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
var my_function = function() {
eventStream.next('foo');
}
您可以在此处找到有关主题的更多信息:
我相信 Observable.create()
不会将 observer 作为回调参数,而是将其作为发射器。所以如果你想给你的 Observable 添加一个新值试试这个:
var emitter;
var observable = Rx.Observable.create(e => emitter = e);
var observer = {
next: function(next) {
console.log(next);
},
error: function(error) {
console.log(error);
},
complete: function() {
console.log("done");
}
}
observable.subscribe(observer);
emitter.next('foo');
emitter.next('bar');
emitter.next('baz');
emitter.complete();
//console output
//"foo"
//"bar"
//"baz"
//"done"
是的 Subject 使它更容易,在同一个对象中提供 Observable 和 Observer,但它并不完全相同,因为 Subject 允许您订阅多个观察者到同一个可观察者,而一个可观察者只向最后一个订阅的观察者发送数据,所以要有意识地使用它。 JsBin 如果你想修改它,这里有一个。