Google 云 Pub/Sub Python SDK 一次检索一条消息

Google Cloud Pub/Sub Python SDK retrieve single message at a time

问题:我的用例是我想从 Google 云 Pub/Sub 接收消息 - 使用 [=28= 一次一条消息] Api。当前所有示例都提到使用 Async/callback 选项从 Pub/Sub 订阅中提取消息。这种方法的问题是我需要让线程保持活动状态。

是否可以只接收 1 条消息并关闭连接,即是否有一个功能可以让我将参数(类似于 max_messages)设置为 1,以便一旦它接收到 1 条消息,线程终止?

文档 here 没有列出 Python 同步拉取的任何内容,它似乎有 num_of_messages 其他语言的选项,例如 Java。

根据官方文档here

...you can achieve exactly once processing of Pub/Sub message streams, as PubsubIO de-duplicates messages based on custom message identifiers or identifiers assigned by Pub/Sub.

因此,您应该能够使用记录 ID,即消息的标识符,以允许跨 Dataflow 和其他系统之间的边界进行恰好一次处理。要使用记录 ID,您可以在构造 PubsubIO.Read 或 PubsubIO.Write 转换时调用 idLabel,并传递您选择的字符串值。在 java 中,这将是:

public PubsubIO.Read.Bound<T> idLabel(String idLabel)

这个 returns 一个类似于这个的转换,但是它从给定的消息属性中读取唯一的消息 ID。

看下面的例子in this link:

from google.cloud import pubsub_v1

client = pubsub_v1.SubscriberClient()
subscription = client.subscription_path('[PROJECT]', '[SUBSCRIPTION]')
max_messages = 1

response = client.pull(subscription, max_messages)
print(response)

我自己试过了,一次只收到一条消息。

如果出现错误,请尝试将 pubsub 库更新到最新版本:

pip install --upgrade google-cloud-pubsub

In docs here 您有更多关于代码片段中使用的 pull 方法的信息:

The Pull method relies on a request/response model:

The application sends a request for messages. The server replies with zero or more messages and closes the connection.