RxJS:将 Node.js 套接字转换为 Observables 并将它们合并为一个流
RxJS: Converting Node.js sockets into Observables and merge them into one stream
我正在尝试使用 RxJS 将 Node 的套接字转换为流。目标是让每个套接字创建自己的流并将所有流合并为一个。当新套接字连接时,将创建一个流 socketStream = Rx.Observable.fromEvent(socket, 'message')
.
然后流被合并到主流中,类似
mainStream = mainStream.merge(socketStream)
这似乎工作正常,问题是在 200-250 个客户端连接后,服务器抛出 RangeError: Maximum call stack size exceeded
。
我有示例服务器和客户端代码,在此处的要点上演示了此行为:
Sample Server and Client
我怀疑作为客户端 connect/disconnect,主流没有得到正确清理。
问题是您正在递归合并 Observable
。每次你做
cmdStream = cmdStream.merge(socketStream);
您正在创建新的 MergeObservable/MergeObserver
对。
看看 source,您可以看到您对每个订阅所做的基本上是按顺序订阅您之前的每个流,因此不难看出250 个连接,您的调用堆栈可能至少有 1000 个调用深度。
解决此问题的更好方法是使用 flatMap
运算符进行转换,并将您的连接视为创建 Observables
的 Observable
。
//Turn the connections themselves into an Observable
var connections = Rx.Observable.fromEvent(server, 'connection',
socket => new JsonSocket(socket));
connections
//flatten the messages into their own Observable
.flatMap(socket => {
return Rx.Observable.fromEvent(socket.__socket, 'message')
//Handle the socket closing as well
.takeUntil(Rx.Observable.fromEvent(socket.__socket, 'close'));
}, (socket, msg) => {
//Transform each message to include the socket as well.
return { socket : socket.__socket, data : msg};
})
.subscribe(processData, handleError);
以上内容我没有测试过,但应该可以解决您的 SO 错误。
我可能还会质疑这个的整体设计。将所有 Observable
合并在一起到底能得到什么?您仍然通过将套接字对象与消息一起传递来区分它们,因此看起来这些可能都是不同的流。
我正在尝试使用 RxJS 将 Node 的套接字转换为流。目标是让每个套接字创建自己的流并将所有流合并为一个。当新套接字连接时,将创建一个流 socketStream = Rx.Observable.fromEvent(socket, 'message')
.
然后流被合并到主流中,类似
mainStream = mainStream.merge(socketStream)
这似乎工作正常,问题是在 200-250 个客户端连接后,服务器抛出 RangeError: Maximum call stack size exceeded
。
我有示例服务器和客户端代码,在此处的要点上演示了此行为: Sample Server and Client
我怀疑作为客户端 connect/disconnect,主流没有得到正确清理。
问题是您正在递归合并 Observable
。每次你做
cmdStream = cmdStream.merge(socketStream);
您正在创建新的 MergeObservable/MergeObserver
对。
看看 source,您可以看到您对每个订阅所做的基本上是按顺序订阅您之前的每个流,因此不难看出250 个连接,您的调用堆栈可能至少有 1000 个调用深度。
解决此问题的更好方法是使用 flatMap
运算符进行转换,并将您的连接视为创建 Observables
的 Observable
。
//Turn the connections themselves into an Observable
var connections = Rx.Observable.fromEvent(server, 'connection',
socket => new JsonSocket(socket));
connections
//flatten the messages into their own Observable
.flatMap(socket => {
return Rx.Observable.fromEvent(socket.__socket, 'message')
//Handle the socket closing as well
.takeUntil(Rx.Observable.fromEvent(socket.__socket, 'close'));
}, (socket, msg) => {
//Transform each message to include the socket as well.
return { socket : socket.__socket, data : msg};
})
.subscribe(processData, handleError);
以上内容我没有测试过,但应该可以解决您的 SO 错误。
我可能还会质疑这个的整体设计。将所有 Observable
合并在一起到底能得到什么?您仍然通过将套接字对象与消息一起传递来区分它们,因此看起来这些可能都是不同的流。