如何使用 fromWebSocket Subject 缓冲流

How to buffer stream using fromWebSocket Subject

这个 RxJava buffer example (with marble chart!) 完美地描述了想要的结果:

collect items in buffers during the bursty periods and emit them at the end of each burst, by using the debounce operator to emit a buffer closing indicator to the buffer operator

编辑:查看 后,我的问题似乎与 使用 Subject 而不是直接 Observable 有关。

使用socket流产生window关闭事件(如下)导致2个socket打开,没有事件流出:

ws = Rx.DOM.fromWebSocket(wsURI, null, wsOpenObserver, wsCloseObserver);
var closer = ws.flatMapFirst(Rx.Observable.timer(250));
ws.buffer(closer)
    .subscribe(function(e) { console.log(e, 'socket messages');});

您可以像 RxJava 版本一样将 Observable 直接传递给 buffer 运算符:

source.buffer(source.debounce(150))

有效。参见 here

使用您展示的选择器方法的替代语法将在每次缓冲区关闭时调用该方法,然后订阅它产生的 Observable。

此外,RxJava 示例中的去抖正在发出缓冲区运算符的结果,它默认情况下不会发出累积结果。

在此处总结调查结果问题:

  • Rx.DOM.fromWebSocket returns 环绕 websocket 的 Rx.subject。该主题由一个观察者和一个可观察者组成(通过 new Rx.Subject(observer, observable)。据我了解,该观察者允许通过其 onNext 方法写入套接字,而可观察者允许从套接字读取。
  • 你总是读到主题是热源,但显然这里只意味着观察者会立即将它的值推送到主题,这里将它推送到套接字。在正常情况下(new Rx.Subject()),默认的observer和observable是让observable监听observer的,所以默认的observable是hot的。然而,这里的 observable 是一个冷源,然后任何订阅都将重新执行创建另一个 websocket 的回调。因此创建了两个套接字。
  • 例如 Rx.dom.fromEvent 不会发生这种情况,因为创建的(冷)可观察对象是共享的(通过 publish().refCount())。
  • 这样,重复的问题就可以解决了。这意味着在这种特殊情况下,在您的代码中使用 ws = Rx.DOM.fromWebSocket(wsURI, null, wsOpenObserver, wsCloseObserver).share();share 作为 publish().refCount().
  • 的别名
  • 我想知道 Rx.DOM.fromWebSocket 的这种行为是否应该报告为错误

两种方法的代码: