google 云发布订阅 node.js 客户端与 google 云功能不兼容
google cloud pubsub node.js client not compatible with google cloud functions
架构:
我们有一个使用 2 个 pubsub topic/subscription 对的架构:
- 主题
T1
由 cronjob 定期触发(例如每 5 分钟一次)。订阅 S1
是我们云功能的触发器。
- 主题
T2
用作我们的一项服务发布的后台作业的队列。云函数在每次执行时读取订阅 S2
以服务排队的后台作业。
这使我们能够控制后台作业的服务频率,而不管它们何时被添加到队列中。
云函数(由 S1
触发)通过 pulling 读取来自 S2
的消息。它决定哪些后台作业已就绪,并在成功为该作业提供服务后,确认相关消息。未准备好或失败的作业不会被确认以稍后提供服务。
问题:
我们在使用来自 google 的官方 node.js pubusb client 时遇到问题:
- 有时已确认的消息会重新出现(似乎无限)。我们已验证消息在 ACK 截止日期之前已被确认,并且通过调查我们的日志确定我们正在调用
ack()
。
- 有时在第一次执行后(重新部署函数后),后续执行永远不会收到新消息。我们可以通过验证 stackdriver 中未确认的消息计数或通过重新部署函数并查看消息得到服务来验证消息在订阅中排队
S2
。
我们认为这是 google 的 node.js pubsub 客户端的问题。云函数文档明确指出 not start background activities。但是,查看 node.js pubsub 客户端源代码,它显然在后台使用超时服务确认。
google 的 node.js pubsub 客户端与 google 云功能不兼容吗? Googlerecommends accessing the service API's only when a client library does not exist or does not meet other needs。 运行 客户端是否在云功能 "other needs" 中,需要我们使用服务 API 编写自己的客户端?
尝试解决方法:
作为 "workaround" 我们尝试延迟 cloudfunction 执行的结束,以允许 node.js pubsub 客户端中的任何 "background" 进程完成,但这并没有始终如一地消除我们的问题。似乎 pubsub 客户端对云功能不友好,无法从云功能执行之间的停止中恢复。
2018 年 2 月 22 日更新
我写了 an article on our blog 详细描述了 为什么 我们以这种方式使用 PubSub 以及我们如何解决 node.js pubsub 客户端是与云功能不兼容。
你是如何触发你的功能的?
根据 docs,如果您的函数正在使用 pubsub 消息,那么您应该使用 pubsub 触发器。使用 pubsub 触发器时,不需要该库。只需在函数末尾调用 callback()
,pubsub 消息就会得到正确确认。
对于您打算做的事情,我认为您当前的架构不是正确的选择。
我会把你的第一步移动到 Google App Engine with a cron task,让这个任务简单地将消息从 T2
移动到 T1
,留下函数拥有触发器 S2
并处理消息。
因此,您的工作将在 T2
上发布,并且您将拥有一个 GAE 应用程序,该应用程序具有由 cron 任务触发的拉取订阅 S2
,并且该应用程序将重新发布消息给 T1
。然后,您的函数将由订阅 S1
主题 T1
触发,并将 运行 消息中的作业,避免导入 pubsub 库和使用产品的额外处理不出所料。
此外,我不确定您最初是如何将作业发布到主题的,但是 Task Queues are a good GAE (and product-agnostic in Alpha) 限速任务的选项。
仅用于此(设置 1 个最大实例)的 GAE 应用程序将在 always free limit 内,因此成本不会显着增加。
来自 node.js pubsub 客户端的开发人员 confirmed 不支持使用客户端从 Cloud Functions 拉取消息的用例。
另一种方法是在尝试从订阅中提取 所有 消息时使用 service APIs. However, the REST APIs have their own caveats。
我运行遇到了同样的问题,我想更好地控制.ack()
。从 google 查看 nodejs 库,可以选择将 ack()
重构为 return 一个承诺,这样函数就可以等待 ack()
完成。
Subscriber.prototype.ack_ = function(message) {
var breakLease = this.breakLease_.bind(this, message);
this.histogram.add(Date.now() - message.received);
if (this.writeToStreams_ && this.isConnected_()) {
this.acknowledge_(message.ackId, message.connectionId).then(breakLease);
return;
}
this.inventory_.ack.push(message.ackId);
this.setFlushTimeout_().then(breakLease);
};
架构:
我们有一个使用 2 个 pubsub topic/subscription 对的架构:
- 主题
T1
由 cronjob 定期触发(例如每 5 分钟一次)。订阅S1
是我们云功能的触发器。 - 主题
T2
用作我们的一项服务发布的后台作业的队列。云函数在每次执行时读取订阅S2
以服务排队的后台作业。
这使我们能够控制后台作业的服务频率,而不管它们何时被添加到队列中。
云函数(由 S1
触发)通过 pulling 读取来自 S2
的消息。它决定哪些后台作业已就绪,并在成功为该作业提供服务后,确认相关消息。未准备好或失败的作业不会被确认以稍后提供服务。
问题:
我们在使用来自 google 的官方 node.js pubusb client 时遇到问题:
- 有时已确认的消息会重新出现(似乎无限)。我们已验证消息在 ACK 截止日期之前已被确认,并且通过调查我们的日志确定我们正在调用
ack()
。 - 有时在第一次执行后(重新部署函数后),后续执行永远不会收到新消息。我们可以通过验证 stackdriver 中未确认的消息计数或通过重新部署函数并查看消息得到服务来验证消息在订阅中排队
S2
。
我们认为这是 google 的 node.js pubsub 客户端的问题。云函数文档明确指出 not start background activities。但是,查看 node.js pubsub 客户端源代码,它显然在后台使用超时服务确认。
google 的 node.js pubsub 客户端与 google 云功能不兼容吗? Googlerecommends accessing the service API's only when a client library does not exist or does not meet other needs。 运行 客户端是否在云功能 "other needs" 中,需要我们使用服务 API 编写自己的客户端?
尝试解决方法:
作为 "workaround" 我们尝试延迟 cloudfunction 执行的结束,以允许 node.js pubsub 客户端中的任何 "background" 进程完成,但这并没有始终如一地消除我们的问题。似乎 pubsub 客户端对云功能不友好,无法从云功能执行之间的停止中恢复。
2018 年 2 月 22 日更新
我写了 an article on our blog 详细描述了 为什么 我们以这种方式使用 PubSub 以及我们如何解决 node.js pubsub 客户端是与云功能不兼容。
你是如何触发你的功能的?
根据 docs,如果您的函数正在使用 pubsub 消息,那么您应该使用 pubsub 触发器。使用 pubsub 触发器时,不需要该库。只需在函数末尾调用 callback()
,pubsub 消息就会得到正确确认。
对于您打算做的事情,我认为您当前的架构不是正确的选择。
我会把你的第一步移动到 Google App Engine with a cron task,让这个任务简单地将消息从 T2
移动到 T1
,留下函数拥有触发器 S2
并处理消息。
因此,您的工作将在 T2
上发布,并且您将拥有一个 GAE 应用程序,该应用程序具有由 cron 任务触发的拉取订阅 S2
,并且该应用程序将重新发布消息给 T1
。然后,您的函数将由订阅 S1
主题 T1
触发,并将 运行 消息中的作业,避免导入 pubsub 库和使用产品的额外处理不出所料。
此外,我不确定您最初是如何将作业发布到主题的,但是 Task Queues are a good GAE (and product-agnostic in Alpha) 限速任务的选项。
仅用于此(设置 1 个最大实例)的 GAE 应用程序将在 always free limit 内,因此成本不会显着增加。
来自 node.js pubsub 客户端的开发人员 confirmed 不支持使用客户端从 Cloud Functions 拉取消息的用例。
另一种方法是在尝试从订阅中提取 所有 消息时使用 service APIs. However, the REST APIs have their own caveats。
我运行遇到了同样的问题,我想更好地控制.ack()
。从 google 查看 nodejs 库,可以选择将 ack()
重构为 return 一个承诺,这样函数就可以等待 ack()
完成。
Subscriber.prototype.ack_ = function(message) {
var breakLease = this.breakLease_.bind(this, message);
this.histogram.add(Date.now() - message.received);
if (this.writeToStreams_ && this.isConnected_()) {
this.acknowledge_(message.ackId, message.connectionId).then(breakLease);
return;
}
this.inventory_.ack.push(message.ackId);
this.setFlushTimeout_().then(breakLease);
};