如何在 Python 中使用 AckID 确认 Google PubSub 消息

How to Acknowledge a Google PubSub message using AckID in Python

我正在查看 PubSub 拉取文档 here

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"
# TODO timeout = 5.0  # "How long the subscriber should listen for
# messages in seconds"

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = subscriber.subscription_path(
    project_id, subscription_name
)

def callback(message):
    print("Received message: {}".format(message))
    message.ack()

streaming_pull_future = subscriber.subscribe(
    subscription_path, callback=callback
)
print("Listening for messages on {}..\n".format(subscription_path))

# result() in a future will block indefinitely if `timeout` is not set,
# unless an exception is encountered first.
try:
    streaming_pull_future.result(timeout=timeout)
except:  # noqa
    streaming_pull_future.cancel()

在上面的例子中,消息一收到就被确认。但是我只想在我的本地 celery 工作人员完成消息处理时确认,以便 PubSub 在工作人员失败时可以重新发送消息。所以我获取消息的 ack_id,并将其传递给工作人员。

params["ack_id"] = message._ack_id
start_aggregation.delay(params)

我只是不知道如何使用工作程序中的 ack_id 来确认消息。我知道您可以使用 pubsub 端点来确认给定 here 之类的消息。但我不知道如何使用服务帐户凭据来做同样的事情——他们在该文档中使用 OAuth 来做到这一点。任何指针表示赞赏。谢谢。

通过直接调用 acknowledge API 从客户端库接收到的确认消息会导致客户端出现问题。客户端有 flow control limits,它决定了未完成(已交付但未确认)的最大消息数。当有人调用 message.ack()message.nack() 时,就会从计数中删除消息。如果您直接调用 acknowledge API,则此计数不会更改,从而导致消息在达到限制后不再流动。

如果您尝试使用 celery 在您的处理中获得更多的并行性,您可能可以直接执行此操作而无需此中间步骤。一种选择是在不同进程中启动具有相同订阅的订阅者客户端实例。消息将在订阅者之间分发。或者,您可以将 scheduler 替换为基于进程而不是基于线程的 scheduler,尽管这样会多做一些工作。