ZeroMQ + NodeJs:订阅者未收到消息
ZeroMQ + NodeJs: Messages not received by subscriber
我的发布者 (zpub.js
) 循环发布,如下所示。
async function publishLoop() {
let payload = []
_.forEach(array, (a) => {
// process a here to generate someKey and someValue
payload.push({someKey:someValue})
})
return Promise.all(payload.map(async (p) => {
await zmqp.publish({t:'topicString', m:p})
}))
}
zmqp.publish
就是下面这样
async publish(payload) {
// this.sock is just a bind to tcp://127.0.0.1:4030
await this.sock.send([payload.t, JSON.stringify(payload.m, null, null)])
return Promise.resolve()
}
我的订户 (zsub.js
) 是在 ZeroMQ 网站上看到的代码版本。
const zmq = require("zeromq")
const mom = require('moment-timezone')
async function run() {
const sock = new zmq.Subscriber
sock.connect("tcp://127.0.0.1:4030")
sock.subscribe("topicString")
for await (const [topic, msg] of sock) {
console.log(`${mom().tz('Asia/Kolkata').format('YYYY-MM-DDTHH:mm:ss.SSS')}`)
}
}
run()
- 我的订阅者是
node zsub.js > out
。
- 我以
node zpub.js
的身份启动我的发布者。所有消息均已成功接收。
zpub.js
进程结束但zsub.js
保持运行ning。当我重新运行 node zpub.js
时,订阅者没有收到一条消息。 out
中的记录数保持不变。
- 运行
zpub.js
似乎又一次或两次向订阅者传递消息(最近的消息;不是时间戳所见的较早的消息)。
因此,我不确定在 pub/sub 端应该做什么才能使消息不 'lost'。请指教
Pub-Sub 本质上是不可靠的,因为订阅者没有向发布者反馈以确认已收到消息。 The guide 对此进行了详细描述并提供了一些解决方案。解决方案之一是不使用 Pub-Sub,而是使用 Router-Dealer。这是否是一个可行的替代方案取决于您use-case。
关于您的具体问题,订阅者最终确定与发布者的连接丢失,并将尝试重新连接,直到连接 re-established。根据时间的不同,订阅者可能会错过发布者发送的初始(或所有)消息。
一般来说,如果发布者是通信的稳定部分(保持在线状态,就像服务器一样)并且订阅者可以来来去去(像客户端),Pub-Sub 效果最好。
我的发布者 (zpub.js
) 循环发布,如下所示。
async function publishLoop() {
let payload = []
_.forEach(array, (a) => {
// process a here to generate someKey and someValue
payload.push({someKey:someValue})
})
return Promise.all(payload.map(async (p) => {
await zmqp.publish({t:'topicString', m:p})
}))
}
zmqp.publish
就是下面这样
async publish(payload) {
// this.sock is just a bind to tcp://127.0.0.1:4030
await this.sock.send([payload.t, JSON.stringify(payload.m, null, null)])
return Promise.resolve()
}
我的订户 (zsub.js
) 是在 ZeroMQ 网站上看到的代码版本。
const zmq = require("zeromq")
const mom = require('moment-timezone')
async function run() {
const sock = new zmq.Subscriber
sock.connect("tcp://127.0.0.1:4030")
sock.subscribe("topicString")
for await (const [topic, msg] of sock) {
console.log(`${mom().tz('Asia/Kolkata').format('YYYY-MM-DDTHH:mm:ss.SSS')}`)
}
}
run()
- 我的订阅者是
node zsub.js > out
。 - 我以
node zpub.js
的身份启动我的发布者。所有消息均已成功接收。 zpub.js
进程结束但zsub.js
保持运行ning。当我重新运行node zpub.js
时,订阅者没有收到一条消息。out
中的记录数保持不变。- 运行
zpub.js
似乎又一次或两次向订阅者传递消息(最近的消息;不是时间戳所见的较早的消息)。
因此,我不确定在 pub/sub 端应该做什么才能使消息不 'lost'。请指教
Pub-Sub 本质上是不可靠的,因为订阅者没有向发布者反馈以确认已收到消息。 The guide 对此进行了详细描述并提供了一些解决方案。解决方案之一是不使用 Pub-Sub,而是使用 Router-Dealer。这是否是一个可行的替代方案取决于您use-case。
关于您的具体问题,订阅者最终确定与发布者的连接丢失,并将尝试重新连接,直到连接 re-established。根据时间的不同,订阅者可能会错过发布者发送的初始(或所有)消息。
一般来说,如果发布者是通信的稳定部分(保持在线状态,就像服务器一样)并且订阅者可以来来去去(像客户端),Pub-Sub 效果最好。