如何使用 fromWebSocket Subject 缓冲流
How to buffer stream using fromWebSocket Subject
这个 RxJava buffer example (with marble chart!) 完美地描述了想要的结果:
collect items in buffers during the bursty periods and emit them at the end of each burst, by using the debounce operator to emit a buffer closing indicator to the buffer operator
编辑:查看 后,我的问题似乎与 使用 Subject 而不是直接 Observable 有关。
使用socket流产生window关闭事件(如下)导致2个socket打开,没有事件流出:
ws = Rx.DOM.fromWebSocket(wsURI, null, wsOpenObserver, wsCloseObserver);
var closer = ws.flatMapFirst(Rx.Observable.timer(250));
ws.buffer(closer)
.subscribe(function(e) { console.log(e, 'socket messages');});
您可以像 RxJava 版本一样将 Observable
直接传递给 buffer
运算符:
source.buffer(source.debounce(150))
有效。参见 here。
使用您展示的选择器方法的替代语法将在每次缓冲区关闭时调用该方法,然后订阅它产生的 Observable。
此外,RxJava 示例中的去抖正在发出缓冲区运算符的结果,它默认情况下不会发出累积结果。
在此处总结调查结果问题:
Rx.DOM.fromWebSocket
returns 环绕 websocket 的 Rx.subject
。该主题由一个观察者和一个可观察者组成(通过 new Rx.Subject(observer, observable)
。据我了解,该观察者允许通过其 onNext
方法写入套接字,而可观察者允许从套接字读取。
- 你总是读到主题是热源,但显然这里只意味着观察者会立即将它的值推送到主题,这里将它推送到套接字。在正常情况下(
new Rx.Subject()
),默认的observer和observable是让observable监听observer的,所以默认的observable是hot的。然而,这里的 observable 是一个冷源,然后任何订阅都将重新执行创建另一个 websocket 的回调。因此创建了两个套接字。
- 例如
Rx.dom.fromEvent
不会发生这种情况,因为创建的(冷)可观察对象是共享的(通过 publish().refCount()
)。
- 这样,重复的问题就可以解决了。这意味着在这种特殊情况下,在您的代码中使用
ws = Rx.DOM.fromWebSocket(wsURI, null, wsOpenObserver, wsCloseObserver).share();
,share
作为 publish().refCount()
. 的别名
- 我想知道
Rx.DOM.fromWebSocket
的这种行为是否应该报告为错误
两种方法的代码:
这个 RxJava buffer example (with marble chart!) 完美地描述了想要的结果:
collect items in buffers during the bursty periods and emit them at the end of each burst, by using the debounce operator to emit a buffer closing indicator to the buffer operator
编辑:查看
使用socket流产生window关闭事件(如下)导致2个socket打开,没有事件流出:
ws = Rx.DOM.fromWebSocket(wsURI, null, wsOpenObserver, wsCloseObserver);
var closer = ws.flatMapFirst(Rx.Observable.timer(250));
ws.buffer(closer)
.subscribe(function(e) { console.log(e, 'socket messages');});
您可以像 RxJava 版本一样将 Observable
直接传递给 buffer
运算符:
source.buffer(source.debounce(150))
有效。参见 here。
使用您展示的选择器方法的替代语法将在每次缓冲区关闭时调用该方法,然后订阅它产生的 Observable。
此外,RxJava 示例中的去抖正在发出缓冲区运算符的结果,它默认情况下不会发出累积结果。
在此处总结调查结果问题:
Rx.DOM.fromWebSocket
returns 环绕 websocket 的Rx.subject
。该主题由一个观察者和一个可观察者组成(通过new Rx.Subject(observer, observable)
。据我了解,该观察者允许通过其onNext
方法写入套接字,而可观察者允许从套接字读取。- 你总是读到主题是热源,但显然这里只意味着观察者会立即将它的值推送到主题,这里将它推送到套接字。在正常情况下(
new Rx.Subject()
),默认的observer和observable是让observable监听observer的,所以默认的observable是hot的。然而,这里的 observable 是一个冷源,然后任何订阅都将重新执行创建另一个 websocket 的回调。因此创建了两个套接字。 - 例如
Rx.dom.fromEvent
不会发生这种情况,因为创建的(冷)可观察对象是共享的(通过publish().refCount()
)。 - 这样,重复的问题就可以解决了。这意味着在这种特殊情况下,在您的代码中使用
ws = Rx.DOM.fromWebSocket(wsURI, null, wsOpenObserver, wsCloseObserver).share();
,share
作为publish().refCount()
. 的别名
- 我想知道
Rx.DOM.fromWebSocket
的这种行为是否应该报告为错误
两种方法的代码: