使用 ZeroMQ.js 在一个线程中处理多个套接字
Working with multiple sockets in one thread using ZeroMQ.js
我正在尝试使用 ZeroMQ
和 Node.js
实施服务。
我正在使用 the ZeroMQ.js bindings for it. The ZeroMQ Guide shows how to process multiple sockets in one thread: using the zmq_poller.
我正在尝试在 Node.js 中实现类似的东西,但我找不到如何做到这一点的好方法。
我试过像这样使用 Promise.race():
while(true) {
const message = await Promise.race([socket1.receive(), socket2.receive()]);
// process message, start over again
}
但是这会在第二次进入 while 循环时引发错误,因为在第一次迭代中较慢的套接字已经处于接收模式。
第二次调用 receive()
将导致错误。
所以我的问题是:有没有办法用 ZeroMQ.js 模仿 zmq_poller
或者是否有另一种明智的方法来处理多个套接字。
我想避免分叉,因为我尝试实现的用例要求我为一个用例使用多个套接字。
如果您只是想在没有任何同步的情况下读取两个套接字,那么两个等待数据的异步循环可能会有所帮助:
async function sub(){
const sock = new zmq.Subscriber
sock.connect('tcp://127.0.0.1:3000')
sock.subscribe('rabbits')
for await (const [topic, msg] of sock) {
console.log('received [%s]', topic, msg)
}
}
async function pull(){
const sock = new zmq.Pull
sock.connect('tcp://127.0.0.1:3001')
for await (const [msg] of sock) {
console.log('pulled', msg)
}
}
pull().catch(e => {console.error(e); process.exit(1)})
sub().catch(e => {console.error(e); process.exit(1)})
console.log('waiting for data')
zeromq 套接字上的 await
是非阻塞操作,因此 JS 事件循环继续进行。不用担心底层线程实现。
我正在尝试使用 ZeroMQ
和 Node.js
实施服务。
我正在使用 the ZeroMQ.js bindings for it. The ZeroMQ Guide shows how to process multiple sockets in one thread: using the zmq_poller.
我正在尝试在 Node.js 中实现类似的东西,但我找不到如何做到这一点的好方法。
我试过像这样使用 Promise.race():
while(true) {
const message = await Promise.race([socket1.receive(), socket2.receive()]);
// process message, start over again
}
但是这会在第二次进入 while 循环时引发错误,因为在第一次迭代中较慢的套接字已经处于接收模式。
第二次调用 receive()
将导致错误。
所以我的问题是:有没有办法用 ZeroMQ.js 模仿 zmq_poller
或者是否有另一种明智的方法来处理多个套接字。
我想避免分叉,因为我尝试实现的用例要求我为一个用例使用多个套接字。
如果您只是想在没有任何同步的情况下读取两个套接字,那么两个等待数据的异步循环可能会有所帮助:
async function sub(){
const sock = new zmq.Subscriber
sock.connect('tcp://127.0.0.1:3000')
sock.subscribe('rabbits')
for await (const [topic, msg] of sock) {
console.log('received [%s]', topic, msg)
}
}
async function pull(){
const sock = new zmq.Pull
sock.connect('tcp://127.0.0.1:3001')
for await (const [msg] of sock) {
console.log('pulled', msg)
}
}
pull().catch(e => {console.error(e); process.exit(1)})
sub().catch(e => {console.error(e); process.exit(1)})
console.log('waiting for data')
zeromq 套接字上的 await
是非阻塞操作,因此 JS 事件循环继续进行。不用担心底层线程实现。