从 GCP pub/sub 订阅接收消息时获取 concurrent.futures._base.TimeoutError

Getting concurrent.futures._base.TimeoutError when receiving message from the GCP pub/sub subscription

正在尝试使用 StreamingPull 从 Pub/Sub 订阅接收消息,但回调函数因超时错误而失败 concurrent.futures._base.TimeoutError。

错误:

streaming_pull_future.result(timeout=timeout)
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 447, in result
raise TimeoutError()" 

def callback(message):
    message.ack()
    message_dict = {}
    my_message = message.data.decode('utf8')
    message_dict = eval(my_message)
    data = json.dumps(message_dict)
    data_json = json.loads(data)
    # iterating over the json object and capturing & updating the json file 
    for i in pipeLine:
        if i['moduleName'] == 'module1':
            i['processingHistory'][0]['firstArg'] ['subArg'] = data_json[0]
            i['processingHistory'][1]['firstArg'] ['anotherArg'] = data_json[1]
        
        
        
    subscriber_r = pubsub.SubscriberClient()
    subscription_path = subscriber_r.subscription_path(project_id, cvd_subscription_id)
    streaming_pull_future = subscriber_r.subscribe(subscription_path, callback=callback)
    print(f"Listening for messages on {subscription_path}..\n")
        
    # Wrap subscriber in a 'with' block to automatically call close() when done.
    with subscriber_r:
        try:
            streaming_pull_future.result(timeout=timeout)
        except TimeoutError:
            streaming_pull_future.cancel()
            streaming_pull_future.result()

如何处理和修复此错误。

您可能正在将一个值传递给超时。设置超时后,它表示您希望订阅者接收消息的时间。超时后,对 streaming_pull_future.result(timeout=timeout) 的调用将出错。如果您希望订阅者无限期地接收消息,请不要在该语句中设置超时(或将其设置为None)。