使用 Subject.create 创建一个 Rx.Subject 允许 onNext 无需订阅

Create an Rx.Subject using Subject.create that allows onNext without subscription

使用 Subject.create(observer, observable) 创建 Rx.Subject 时,Subject 太懒了。当我尝试在没有订阅的情况下使用 subject.onNext 时,它不会继续传递消息。如果我先 subject.subscribe(),我可以在之后立即使用 onNext

假设我有一个 Observer,是这样创建的:

function createObserver(socket) {
  return Observer.create(msg => {
    socket.send(msg);
  }, err => {
    console.error(err);
  }, () => {
    socket.removeAllListeners();
    socket.close();
  });
}

然后,我创建一个接受消息的 Observable:

function createObservable(socket) {
  return Observable.fromEvent(socket, 'message')
                   .map(msg => {
                     // Trim out unnecessary data for subscribers
                     delete msg.blobs;
                     // Deep freeze the message
                     Object.freeze(msg);
                     return msg;
                   })
                   .publish()
                   .refCount();
}

主题是使用这两个函数创建的。

observer = createObserver(socket);
observable = createObservable(socket);
subject = Subject.create(observer, observable);

使用此设置,我无法立即 subject.onNext(即使我不关心订阅)。这是设计使然吗?什么是好的解决方法?

这些实际上是 TCP 套接字,这就是为什么我没有依赖超级灵活的 websocket 主题。

基本解决方案,在使用 ReplaySubject 订阅之前缓存下一个:

我想你想做的就是使用 ReplaySubject 作为你的观察者。

const { Observable, Subject, ReplaySubject } = Rx;

const replay = new ReplaySubject();

const observable = Observable.create(observer => {
  replay.subscribe(observer);
});

const mySubject = Subject.create(replay, observable);


mySubject.onNext(1);
mySubject.onNext(2);
mySubject.onNext(3);

mySubject.subscribe(x => console.log(x));

mySubject.onNext(4);
mySubject.onNext(5);

结果:

1
2
3
4
5

套接字实现(例如,不要使用)

...但是如果您正在考虑执行 Socket 实现,它会变得更加复杂。这是一个有效的套接字实现,但我不建议您使用它。相反,我建议您在 rxjs-dom (if you're an RxJS 4 or lower) or as part of RxJS 5 中使用社区支持的实现之一,这两个实现都是我帮助完成的。

function createSocketSubject(url) {
  let replay = new ReplaySubject();
  let socket;

  const observable = Observable.create(observer => {
    socket = new WebSocket(url);

    socket.onmessage = (e) => {
      observer.onNext(e);
    };

    socket.onerror = (e) => {
      observer.onError(e);
    };

    socket.onclose = (e) => {
      if (e.wasClean) {
        observer.onCompleted();
      } else {
        observer.onError(e);
      }
    }

    let sub;
    socket.onopen = () => {
      sub = replay.subscribe(x => socket.send(x));      
    };
    return () => {
      socket && socket.readyState === 1 && socket.close();
      sub && sub.dispose();
    }
  });

  return Subject.create(replay, observable);
}

const socket = createSocketSubject('ws://echo.websocket.org');

socket.onNext('one');
socket.onNext('two');
socket.subscribe(x => console.log('response: ' + x.data));
socket.onNext('three');
socket.onNext('four');

Here's the obligatory JsBin