Google Cloud PubSub 在异步拉取中监听消息时抛出 504 Deadline Exceeded 错误

Google Cloud PubSub throws 504 Deadline Exceeded error while listening for messages in asynchronous pull

我有一个订阅 PubSub 主题并使用异步拉取的服务。空闲 10 分钟后未收到任何消息,PubSub 抛出 504 Deadline exceeded 错误。

错误总是在大约 10 分钟后出现。我发现的每个类似问题都与同步拉取有关,而不是我使用的异步拉取。

错误信息:

INFO:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Observed non-terminating stream error 504 Deadline Exceeded                                                                                                           
INFO:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Observed recoverable stream error 504 Deadline Exceeded                                                                                                                 
INFO:google.api_core.bidi:Re-established stream                                                                         
INFO:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Observed non-terminating stream error 504 Deadline Exceeded                                                                                                             
INFO:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Observed recoverable stream error 504 Deadline Exceeded

订户Class:

from google.cloud import pubsub_v1
class Subscriber():

    def __init__(self):
        self.subscriber = pubsub_v1.SubscriberClient()
        self.project_id = "my-project-id"
        self.subscription_id = "SUBSCRIPTION_MAIL_RECIEVE"
        self.subscription_path = self.subscriber.subscription_path(self.project_id,
                                                                   self.subscription_id)

    def subscribe(self, callback):
        """
        Parameters:
            callback (callable): function to be called for incoming messages on given topic
        """
        streaming_pull_future = self.subscriber.subscribe(self.subscription_path,
                                                          callback=callback)
        return streaming_pull_future

正在侦听消息:

subscriber = Subscriber()
pull_future = subscriber.subscribe(my_callback_function(message))

with subscriber.subscriber:
        try:
            print("Waiting for messages")
            pull_future.result()
        except TimeoutError:
            pull_future.cancel()

这是正常现象,在您的流重新建立时可以忽略这些消息。如果没有重新建立流并且没有发送心跳,那么您可能需要将 google 核心 API 和 google 云 Pub/Sub 升级到最新版本版本。

pip install google-cloud-pubsub --upgrade

pip install google-api-core --upgrade

您还可以按照 link https://github.com/googleapis/google-cloud-python/issues/5800

中的建议过滤这些错误消息