如何使用 Python 从 Google Pub/Sub 可靠地提取消息?

How to pull messages from Google Pub/Sub reliably using Python?

因此,拉取方法有时 returns 0 条消息,即使此主题中有大量未决消息。我做错了什么吗?

import os
from google.cloud import pubsub
import ast

PROJECT_ID = os.environ['PROJECT_ID']
subscriber = pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(PROJECT_ID, 'subscription-name')

while True:
  response = subscriber.pull(
    request={
      "subscription": subscription_path,
      "max_messages": 50,
    }
  )

  if not response.received_messages:
    print('❌ no messages in pub/sub')
    break
  
  for msg in response.received_messages:
    
    message_data = ast.literal_eval(msg.message.data.decode('utf-8'))
    # transform data and publish to another topic

  ack_ids = [msg.ack_id for msg in response.received_messages]
  subscriber.acknowledge(
    request={
      "subscription": subscription_path,
      "ack_ids": ack_ids,
    }
  )

print(' No more messages left in the queue. Shutting down...')

零消息的 return 不能很好地指示是否有可用消息。该服务会尝试 return 向用户快速发送消息,如果没有立即可用的消息,它可能会 return 响应消息少于请求的消息甚至零消息。一次发送单个拉取请求将使消息不太可能被缓存并可以快速发送给客户端。

接收消息的最佳方式是使用asynchronous pull via the client libraries。客户端库使用流式拉取,这意味着与服务器有持久连接,并且消息可以在可用时立即传递。您可以使用异步订阅者并跟踪自收到最后一条消息以来的时间量,如果已经过了足够的时间,则关闭订阅者。但是,仍然有可能无法在这样的时间范围内传递消息,因为云中没有端到端延迟保证 Pub/Sub。

或者,如果您必须使用同步拉取,那么您需要遵循 synchronous pull documentation 中的指南:“请注意,要通过同步拉取实现低消息传递延迟,重要的是同时进行多个操作未完成的拉取请求。随着主题吞吐量的增加,需要更多的拉取请求。”这样,服务器总是有未完成的请求准备好接收消息。

理想情况下,订阅者 运行 不断离开,这样消息一发布就可以快速接收。如果您想在没有要处理的消息时将订阅者扩展到零资源,请考虑使用 Google Cloud Functions with Pub/Sub.

这并不意外,pub/sub 是一个复杂的分布式系统,它不保证交货时间、订购、重复......

请注意文档 (https://cloud.google.com/pubsub/docs/pull) 中的这段相关段落,以便更好地处理消息。

Note that to achieve low message delivery latency with synchronous pull, it is important to have many simultaneously outstanding pull requests. As the throughput of the topic increases, more pull requests are necessary. In general, asynchronous pull is preferable for latency-sensitive applications.