如何使用 python return 从后台函数向 pubsub 确认

How to return acknowledgement from background function to pubsub using python

我正在设置一个新的 GCP 项目,以便在 CSV 文件上传到存储桶后立即读取和解析它。为此,我创建了一个发布到 pub/sub 的触发器。 Pub/Sub 本身将消息发送到后台函数。

一切似乎都运行良好,例如上传文件后,触发器就会实时向 Pubsub 发送消息,然后再向函数发送消息。我还可以看到传递给函数的消息。

但是,问题是将 Ack 发送回 pub/sub。我在某处读到发回任何 2xx 状态应该完成这项工作(从队列中删除消息),但事实并非如此。结果 pubsub "thinks" 消息没有送达,一遍又一遍地发送消息。

def parse_data(data, context):


    if 'data' in data:
        args = base64.b64decode(data['data']).decode('utf-8')
        pubsub_message = args.replace('\n', ' ')
        properties = json.loads(pubsub_message)
        myBucket = validate_message(properties, 'bucket')   
        myFileName = validate_message(properties, "name")
        fileLocation = 'gs://'+myBucket+'/'+myFileName
        readAndEnhanceData(fileLocation)
        return 'OK', 200
    else:
        return 'Something went wrong, no data received'   

这里是日志文件,显示正在连续调用该函数。

D  CSV_Parser_Raw_Data 518626734652287 Function execution took 72855 ms,
 finished with status: 'ok' CSV_Parser_Raw_Data 518626734652287

D  CSV_Parser_Raw_Data 518626708442766 Function execution took 131886 ms, 
finished with status: 'ok' CSV_Parser_Raw_Data 518626708442766 

D  CSV_Parser_Raw_Data 518624470100006 Function execution took 65412 ms, 
finished with status: 'ok' CSV_Parser_Raw_Data 518624470100006 

D  CSV_Parser_Raw_Data 518626734629237 Function execution took 68004 ms, 
finished with status: 'ok' CSV_Parser_Raw_Data 518626734629237

D  CSV_Parser_Raw_Data 518623777839079 Function execution took 131255 ms, 
finished with status: 'ok' CSV_Parser_Raw_Data 518623777839079 

D  CSV_Parser_Raw_Data 518623548622842 Function execution took 131186 ms, 
finished with status: 'ok' CSV_Parser_Raw_Data 518623548622842 

D  CSV_Parser_Raw_Data 518623769252453 Function execution took 133981 ms, 
finished with status: 'ok' CSV_Parser_Raw_Data 518623769252453 

所以我很高兴知道我在这里缺少什么! IE。我怎样才能打破这个循环?

* 问题更新 * 感谢@kamal,他强迫我睁开眼睛,要求自己重新创建 buckets/topics 等。当我在执行任务时,重新检查所有内容并意识到,我在子文件夹中使用了一个临时文件,但在与上传文件相同的存储桶!这就是问题所在。 Finalize 事件适用于在存储桶中任意位置创建的任何对象。所以 Kamal 是对的,正在进行多次上传!

如果您以同样的方式处理您的项目,请确保创建一个 tmp 文件夹并确保您没有向该文件夹添加任何触发器。


您不能只从函数中 return 200。您实际上需要 "ack" 发布订阅消息。您没有显示实际从 pubsub 获取消息的代码,但我假设在该代码的某处,您有类似的内容:

queue = Queue.Queue()
message = queue.get()
parse_data(message.data, context)

这是您需要确认消息的地方:

queue = Queue.Queue()
message = queue.get()
if parse_data(message.data, context):
    message.ack()

一般来说,Google 云 Pub/Sub 保证 at least once delivery 消息。这意味着总是有可能得到重复的,尽管它们应该相对罕见。在您的情况下,并不是一遍又一遍地处理同一条消息,而是不同的消息。诸如 518626734652287 之类的数字是消息 ID。由于每次都不同,这意味着发布了多条消息。很可能发生以下两种情况之一:

  1. 文件被多次上传。
  2. 多次设置GCS触发器。您可以通过 运行 gsutil notification list gs://<bucket name>.
  3. 检查

如果后者是问题所在,您将看到多个条目,例如:

projects/_/buckets/my-bucket/notificationConfigs/1
    Cloud Pub/Sub topic: projects/cloud-pubsub-training-examples/topics/my-topic

projects/_/buckets/my-bucket/notificationConfigs/2
    Cloud Pub/Sub topic: projects/cloud-pubsub-training-examples/topics/my-topic

projects/_/buckets/my-bucket/notificationConfigs/3
    Cloud Pub/Sub topic: projects/cloud-pubsub-training-examples/topics/my-topic

您可以通过使用配置名称发出删除命令来删除额外的通知,例如 gsutil notification delete projects/_/buckets/my-bucket/notificationConfigs/2

还值得注意的是,使用 Cloud Functions 和 Pub/Sub,可以设置两种类型的订阅:用户配置的订阅和 Cloud Functions 本身配置的订阅。默认情况下,前者的确认截止时间为 10 秒。这意味着如果消息在 10 秒内未被确认,它将被重新传送。对于后者,默认值为 600 秒。如果消息的处理时间超过此时间段,则可能会发生重新投递。

您可以尝试减少处理消息所需的时间,也可以增加确认截止时间。您可以使用 gcloud 工具增加确认截止日期:

gcloud pubsub subscriptions update <subscription name> --ack-deadline=180

这会将截止时间增加到 3 分钟。您也可以在 Cloud Console Pub/Sub page 中执行此操作,方法是单击订阅,单击 "Edit,",然后将 "Acknowledgment Deadline" 更改为更大的值。

使用 Cloud Functions,您不需要 return HTTP 状态。仅当您直接使用 push subscription 时才需要这样做。