Node.js Google PubSub 订阅者没有收到一些消息
Node.js Google PubSub Subscriber does not receive SOME messages
总结:
我的 Node.js
应用程序中有聊天功能,我想通过 socket.io 向客户发送消息。我通过 PubSub 触发发送到客户端。现在,当 运行 PubSub 订阅在大约 70% 的情况下一切正常(即打印出消息),但有时会停止而不做任何事情(特别是它也不会打印出错误)。
不过,我可以确认丢失的 30%(消息)正在发布到该主题,因为对同一主题的不同订阅会收到消息。
非常感谢任何有关调试的帮助。
详情
这是我的堆栈:
- Node.js
- socket.io
- express.js
- passport.js
- MongoDB
- react.js
进程:
- Anna 在聊天中发送消息(这会写入数据库并发布到 PubSub 主题 "messages")
- Node.js express 应用程序 运行订阅,然后根据消息内容发送给其他应该接收消息的人。
- BoB 在这种情况下,与 Anna 在同一频道的人会收到消息。
为什么我不直接从 Anna 发送给 Bob?原因是我想在消息之间有一个 AppEngine 并可能在那里添加一些逻辑,这似乎是一个很好的方法。
订阅
const pubSubClient = require('./client');
const errorHandler = function(error) {
console.error(`ERROR: ${error}`);
throw error;
};
module.exports = listenForMessages =(subscriptionName="messageReceiver",io) => {
const subscription = pubSubClient.subscription(subscriptionName);
// Listen for new messages until timeout is hit
subscription.on("message", (message) => {
console.log(`Received message ${message.id}:`);
const parsedMessage = JSON.parse(message.data)
parsedMessage._id = message.id
console.log(parsedMessage)
if (parsedMessage.to === "admin") {
io.to(`admin:${parsedMessage.from}`).emit("NewMessage", parsedMessage);
} else {
io.to(`${parsedMessage.to}`).emit("NewMessage", parsedMessage);
}
message.ack();
});
subscription.on('error', errorHandler);
}
Server.js
...
const listenForMessages = require("./message_processing/listen");
listenForMessages("messageReceiver", io);
示例控制台输出
以下控制台输出是由 运行 应用程序在本地生成的,两个浏览器 [一个处于隐身状态] 相互聊天。可以看出,只有最后一条消息才真正被侦听器接收(并打印出来)。有趣的是,由于调用的异步性质,收到的消息的打印输出出现在消息发送的日志之前(即延迟在这里肯定不是问题)。
[0] went into deserialize user
[0] Message 875007020424601 published.
[0] went into deserialize user
[0] Message 875006704834317 published.
[0] went into deserialize user
[0] Message 875006583857400 published.
[0] went into deserialize user
[0] Message 875006520104287 published.
[0] went into deserialize user
[0] Message 875006699141463 published.
[0] went into deserialize user
[0] Received message 875006881073134:
[0] {
[0] from: '5e949f73aeed81beefaf6daa',
[0] to: 'admin',
[0] content: 'i6',
[0] seenByUser: true,
[0] type: 'message',
[0] createdByUser: true,
[0] createdAt: '2020-04-20T07:44:54.280Z',
[0] _id: '875006881073134'
[0] }
[0] Message 875006881073134 published.
在其他一些情况下,较早的消息有效,然后听众似乎停止了。
您可以做几件事来检查发生了什么:
- 转到topic page、select主题查看详情,并查看发布率。确保就 Pub/Sub 而言,您认为正在发布的消息实际上已成功发布。如果发布失败,它们可能会交付给您的一个订阅者,而不是另一个。
- 转到 subscription page、select 订阅查看详细信息,并查看 "Unacked message count" 和 "Oldest unacked message age" 图表。如果这些不为零,则意味着有消息未传递给您的订阅者。如果它们为零,则意味着消息正在传递给您的订阅者并得到订阅者的确认。
如果未确认消息的数量为零,则可能的原因是流氓进程在接收消息的订阅上充当订阅者。也许您的服务的先前实例仍然 运行 出乎意料?或者另一个应该使用不同订阅的任务可能正在使用相同的订阅。
另一件需要注意的事情是,订阅者只会收到在消息发布之前创建的订阅的消息。因此,如果您启动发布者并发布了一些消息,然后创建了订阅,比如在订阅者启动时,那么订阅者将不会收到那些较早的消息。
总结:
我的 Node.js
应用程序中有聊天功能,我想通过 socket.io 向客户发送消息。我通过 PubSub 触发发送到客户端。现在,当 运行 PubSub 订阅在大约 70% 的情况下一切正常(即打印出消息),但有时会停止而不做任何事情(特别是它也不会打印出错误)。
不过,我可以确认丢失的 30%(消息)正在发布到该主题,因为对同一主题的不同订阅会收到消息。
非常感谢任何有关调试的帮助。
详情
这是我的堆栈:
- Node.js
- socket.io
- express.js
- passport.js
- MongoDB
- react.js
进程:
- Anna 在聊天中发送消息(这会写入数据库并发布到 PubSub 主题 "messages")
- Node.js express 应用程序 运行订阅,然后根据消息内容发送给其他应该接收消息的人。
- BoB 在这种情况下,与 Anna 在同一频道的人会收到消息。
为什么我不直接从 Anna 发送给 Bob?原因是我想在消息之间有一个 AppEngine 并可能在那里添加一些逻辑,这似乎是一个很好的方法。
订阅
const pubSubClient = require('./client');
const errorHandler = function(error) {
console.error(`ERROR: ${error}`);
throw error;
};
module.exports = listenForMessages =(subscriptionName="messageReceiver",io) => {
const subscription = pubSubClient.subscription(subscriptionName);
// Listen for new messages until timeout is hit
subscription.on("message", (message) => {
console.log(`Received message ${message.id}:`);
const parsedMessage = JSON.parse(message.data)
parsedMessage._id = message.id
console.log(parsedMessage)
if (parsedMessage.to === "admin") {
io.to(`admin:${parsedMessage.from}`).emit("NewMessage", parsedMessage);
} else {
io.to(`${parsedMessage.to}`).emit("NewMessage", parsedMessage);
}
message.ack();
});
subscription.on('error', errorHandler);
}
Server.js
...
const listenForMessages = require("./message_processing/listen");
listenForMessages("messageReceiver", io);
示例控制台输出
以下控制台输出是由 运行 应用程序在本地生成的,两个浏览器 [一个处于隐身状态] 相互聊天。可以看出,只有最后一条消息才真正被侦听器接收(并打印出来)。有趣的是,由于调用的异步性质,收到的消息的打印输出出现在消息发送的日志之前(即延迟在这里肯定不是问题)。
[0] went into deserialize user
[0] Message 875007020424601 published.
[0] went into deserialize user
[0] Message 875006704834317 published.
[0] went into deserialize user
[0] Message 875006583857400 published.
[0] went into deserialize user
[0] Message 875006520104287 published.
[0] went into deserialize user
[0] Message 875006699141463 published.
[0] went into deserialize user
[0] Received message 875006881073134:
[0] {
[0] from: '5e949f73aeed81beefaf6daa',
[0] to: 'admin',
[0] content: 'i6',
[0] seenByUser: true,
[0] type: 'message',
[0] createdByUser: true,
[0] createdAt: '2020-04-20T07:44:54.280Z',
[0] _id: '875006881073134'
[0] }
[0] Message 875006881073134 published.
在其他一些情况下,较早的消息有效,然后听众似乎停止了。
您可以做几件事来检查发生了什么:
- 转到topic page、select主题查看详情,并查看发布率。确保就 Pub/Sub 而言,您认为正在发布的消息实际上已成功发布。如果发布失败,它们可能会交付给您的一个订阅者,而不是另一个。
- 转到 subscription page、select 订阅查看详细信息,并查看 "Unacked message count" 和 "Oldest unacked message age" 图表。如果这些不为零,则意味着有消息未传递给您的订阅者。如果它们为零,则意味着消息正在传递给您的订阅者并得到订阅者的确认。
如果未确认消息的数量为零,则可能的原因是流氓进程在接收消息的订阅上充当订阅者。也许您的服务的先前实例仍然 运行 出乎意料?或者另一个应该使用不同订阅的任务可能正在使用相同的订阅。
另一件需要注意的事情是,订阅者只会收到在消息发布之前创建的订阅的消息。因此,如果您启动发布者并发布了一些消息,然后创建了订阅,比如在订阅者启动时,那么订阅者将不会收到那些较早的消息。