使用 Subject.create 创建一个 Rx.Subject 允许 onNext 无需订阅
Create an Rx.Subject using Subject.create that allows onNext without subscription
使用 Subject.create(observer, observable)
创建 Rx.Subject
时,Subject
太懒了。当我尝试在没有订阅的情况下使用 subject.onNext
时,它不会继续传递消息。如果我先 subject.subscribe()
,我可以在之后立即使用 onNext
。
假设我有一个 Observer
,是这样创建的:
function createObserver(socket) {
return Observer.create(msg => {
socket.send(msg);
}, err => {
console.error(err);
}, () => {
socket.removeAllListeners();
socket.close();
});
}
然后,我创建一个接受消息的 Observable:
function createObservable(socket) {
return Observable.fromEvent(socket, 'message')
.map(msg => {
// Trim out unnecessary data for subscribers
delete msg.blobs;
// Deep freeze the message
Object.freeze(msg);
return msg;
})
.publish()
.refCount();
}
主题是使用这两个函数创建的。
observer = createObserver(socket);
observable = createObservable(socket);
subject = Subject.create(observer, observable);
使用此设置,我无法立即 subject.onNext
(即使我不关心订阅)。这是设计使然吗?什么是好的解决方法?
这些实际上是 TCP 套接字,这就是为什么我没有依赖超级灵活的 websocket 主题。
基本解决方案,在使用 ReplaySubject 订阅之前缓存下一个:
我想你想做的就是使用 ReplaySubject
作为你的观察者。
const { Observable, Subject, ReplaySubject } = Rx;
const replay = new ReplaySubject();
const observable = Observable.create(observer => {
replay.subscribe(observer);
});
const mySubject = Subject.create(replay, observable);
mySubject.onNext(1);
mySubject.onNext(2);
mySubject.onNext(3);
mySubject.subscribe(x => console.log(x));
mySubject.onNext(4);
mySubject.onNext(5);
结果:
1
2
3
4
5
套接字实现(例如,不要使用)
...但是如果您正在考虑执行 Socket 实现,它会变得更加复杂。这是一个有效的套接字实现,但我不建议您使用它。相反,我建议您在 rxjs-dom (if you're an RxJS 4 or lower) or as part of RxJS 5 中使用社区支持的实现之一,这两个实现都是我帮助完成的。
function createSocketSubject(url) {
let replay = new ReplaySubject();
let socket;
const observable = Observable.create(observer => {
socket = new WebSocket(url);
socket.onmessage = (e) => {
observer.onNext(e);
};
socket.onerror = (e) => {
observer.onError(e);
};
socket.onclose = (e) => {
if (e.wasClean) {
observer.onCompleted();
} else {
observer.onError(e);
}
}
let sub;
socket.onopen = () => {
sub = replay.subscribe(x => socket.send(x));
};
return () => {
socket && socket.readyState === 1 && socket.close();
sub && sub.dispose();
}
});
return Subject.create(replay, observable);
}
const socket = createSocketSubject('ws://echo.websocket.org');
socket.onNext('one');
socket.onNext('two');
socket.subscribe(x => console.log('response: ' + x.data));
socket.onNext('three');
socket.onNext('four');
使用 Subject.create(observer, observable)
创建 Rx.Subject
时,Subject
太懒了。当我尝试在没有订阅的情况下使用 subject.onNext
时,它不会继续传递消息。如果我先 subject.subscribe()
,我可以在之后立即使用 onNext
。
假设我有一个 Observer
,是这样创建的:
function createObserver(socket) {
return Observer.create(msg => {
socket.send(msg);
}, err => {
console.error(err);
}, () => {
socket.removeAllListeners();
socket.close();
});
}
然后,我创建一个接受消息的 Observable:
function createObservable(socket) {
return Observable.fromEvent(socket, 'message')
.map(msg => {
// Trim out unnecessary data for subscribers
delete msg.blobs;
// Deep freeze the message
Object.freeze(msg);
return msg;
})
.publish()
.refCount();
}
主题是使用这两个函数创建的。
observer = createObserver(socket);
observable = createObservable(socket);
subject = Subject.create(observer, observable);
使用此设置,我无法立即 subject.onNext
(即使我不关心订阅)。这是设计使然吗?什么是好的解决方法?
这些实际上是 TCP 套接字,这就是为什么我没有依赖超级灵活的 websocket 主题。
基本解决方案,在使用 ReplaySubject 订阅之前缓存下一个:
我想你想做的就是使用 ReplaySubject
作为你的观察者。
const { Observable, Subject, ReplaySubject } = Rx;
const replay = new ReplaySubject();
const observable = Observable.create(observer => {
replay.subscribe(observer);
});
const mySubject = Subject.create(replay, observable);
mySubject.onNext(1);
mySubject.onNext(2);
mySubject.onNext(3);
mySubject.subscribe(x => console.log(x));
mySubject.onNext(4);
mySubject.onNext(5);
结果:
1
2
3
4
5
套接字实现(例如,不要使用)
...但是如果您正在考虑执行 Socket 实现,它会变得更加复杂。这是一个有效的套接字实现,但我不建议您使用它。相反,我建议您在 rxjs-dom (if you're an RxJS 4 or lower) or as part of RxJS 5 中使用社区支持的实现之一,这两个实现都是我帮助完成的。
function createSocketSubject(url) {
let replay = new ReplaySubject();
let socket;
const observable = Observable.create(observer => {
socket = new WebSocket(url);
socket.onmessage = (e) => {
observer.onNext(e);
};
socket.onerror = (e) => {
observer.onError(e);
};
socket.onclose = (e) => {
if (e.wasClean) {
observer.onCompleted();
} else {
observer.onError(e);
}
}
let sub;
socket.onopen = () => {
sub = replay.subscribe(x => socket.send(x));
};
return () => {
socket && socket.readyState === 1 && socket.close();
sub && sub.dispose();
}
});
return Subject.create(replay, observable);
}
const socket = createSocketSubject('ws://echo.websocket.org');
socket.onNext('one');
socket.onNext('two');
socket.subscribe(x => console.log('response: ' + x.data));
socket.onNext('three');
socket.onNext('four');