如何使用 python return 从后台函数向 pubsub 确认
How to return acknowledgement from background function to pubsub using python
我正在设置一个新的 GCP 项目,以便在 CSV 文件上传到存储桶后立即读取和解析它。为此,我创建了一个发布到 pub/sub 的触发器。 Pub/Sub 本身将消息发送到后台函数。
一切似乎都运行良好,例如上传文件后,触发器就会实时向 Pubsub 发送消息,然后再向函数发送消息。我还可以看到传递给函数的消息。
但是,问题是将 Ack 发送回 pub/sub。我在某处读到发回任何 2xx 状态应该完成这项工作(从队列中删除消息),但事实并非如此。结果 pubsub "thinks" 消息没有送达,一遍又一遍地发送消息。
def parse_data(data, context):
if 'data' in data:
args = base64.b64decode(data['data']).decode('utf-8')
pubsub_message = args.replace('\n', ' ')
properties = json.loads(pubsub_message)
myBucket = validate_message(properties, 'bucket')
myFileName = validate_message(properties, "name")
fileLocation = 'gs://'+myBucket+'/'+myFileName
readAndEnhanceData(fileLocation)
return 'OK', 200
else:
return 'Something went wrong, no data received'
这里是日志文件,显示正在连续调用该函数。
D CSV_Parser_Raw_Data 518626734652287 Function execution took 72855 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518626734652287
D CSV_Parser_Raw_Data 518626708442766 Function execution took 131886 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518626708442766
D CSV_Parser_Raw_Data 518624470100006 Function execution took 65412 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518624470100006
D CSV_Parser_Raw_Data 518626734629237 Function execution took 68004 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518626734629237
D CSV_Parser_Raw_Data 518623777839079 Function execution took 131255 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518623777839079
D CSV_Parser_Raw_Data 518623548622842 Function execution took 131186 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518623548622842
D CSV_Parser_Raw_Data 518623769252453 Function execution took 133981 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518623769252453
所以我很高兴知道我在这里缺少什么! IE。我怎样才能打破这个循环?
* 问题更新 *
感谢@kamal,他强迫我睁开眼睛,要求自己重新创建 buckets/topics 等。当我在执行任务时,重新检查所有内容并意识到,我在子文件夹中使用了一个临时文件,但在与上传文件相同的存储桶!这就是问题所在。 Finalize 事件适用于在存储桶中任意位置创建的任何对象。所以 Kamal 是对的,正在进行多次上传!
如果您以同样的方式处理您的项目,请确保创建一个 tmp 文件夹并确保您没有向该文件夹添加任何触发器。
您不能只从函数中 return 200。您实际上需要 "ack" 发布订阅消息。您没有显示实际从 pubsub 获取消息的代码,但我假设在该代码的某处,您有类似的内容:
queue = Queue.Queue()
message = queue.get()
parse_data(message.data, context)
这是您需要确认消息的地方:
queue = Queue.Queue()
message = queue.get()
if parse_data(message.data, context):
message.ack()
一般来说,Google 云 Pub/Sub 保证 at least once delivery 消息。这意味着总是有可能得到重复的,尽管它们应该相对罕见。在您的情况下,并不是一遍又一遍地处理同一条消息,而是不同的消息。诸如 518626734652287 之类的数字是消息 ID。由于每次都不同,这意味着发布了多条消息。很可能发生以下两种情况之一:
- 文件被多次上传。
- 多次设置GCS触发器。您可以通过 运行
gsutil notification list gs://<bucket name>
. 检查
如果后者是问题所在,您将看到多个条目,例如:
projects/_/buckets/my-bucket/notificationConfigs/1
Cloud Pub/Sub topic: projects/cloud-pubsub-training-examples/topics/my-topic
projects/_/buckets/my-bucket/notificationConfigs/2
Cloud Pub/Sub topic: projects/cloud-pubsub-training-examples/topics/my-topic
projects/_/buckets/my-bucket/notificationConfigs/3
Cloud Pub/Sub topic: projects/cloud-pubsub-training-examples/topics/my-topic
您可以通过使用配置名称发出删除命令来删除额外的通知,例如 gsutil notification delete projects/_/buckets/my-bucket/notificationConfigs/2
。
还值得注意的是,使用 Cloud Functions 和 Pub/Sub,可以设置两种类型的订阅:用户配置的订阅和 Cloud Functions 本身配置的订阅。默认情况下,前者的确认截止时间为 10 秒。这意味着如果消息在 10 秒内未被确认,它将被重新传送。对于后者,默认值为 600 秒。如果消息的处理时间超过此时间段,则可能会发生重新投递。
您可以尝试减少处理消息所需的时间,也可以增加确认截止时间。您可以使用 gcloud
工具增加确认截止日期:
gcloud pubsub subscriptions update <subscription name> --ack-deadline=180
这会将截止时间增加到 3 分钟。您也可以在 Cloud Console Pub/Sub page 中执行此操作,方法是单击订阅,单击 "Edit,",然后将 "Acknowledgment Deadline" 更改为更大的值。
使用 Cloud Functions,您不需要 return HTTP 状态。仅当您直接使用 push subscription 时才需要这样做。
我正在设置一个新的 GCP 项目,以便在 CSV 文件上传到存储桶后立即读取和解析它。为此,我创建了一个发布到 pub/sub 的触发器。 Pub/Sub 本身将消息发送到后台函数。
一切似乎都运行良好,例如上传文件后,触发器就会实时向 Pubsub 发送消息,然后再向函数发送消息。我还可以看到传递给函数的消息。
但是,问题是将 Ack 发送回 pub/sub。我在某处读到发回任何 2xx 状态应该完成这项工作(从队列中删除消息),但事实并非如此。结果 pubsub "thinks" 消息没有送达,一遍又一遍地发送消息。
def parse_data(data, context):
if 'data' in data:
args = base64.b64decode(data['data']).decode('utf-8')
pubsub_message = args.replace('\n', ' ')
properties = json.loads(pubsub_message)
myBucket = validate_message(properties, 'bucket')
myFileName = validate_message(properties, "name")
fileLocation = 'gs://'+myBucket+'/'+myFileName
readAndEnhanceData(fileLocation)
return 'OK', 200
else:
return 'Something went wrong, no data received'
这里是日志文件,显示正在连续调用该函数。
D CSV_Parser_Raw_Data 518626734652287 Function execution took 72855 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518626734652287
D CSV_Parser_Raw_Data 518626708442766 Function execution took 131886 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518626708442766
D CSV_Parser_Raw_Data 518624470100006 Function execution took 65412 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518624470100006
D CSV_Parser_Raw_Data 518626734629237 Function execution took 68004 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518626734629237
D CSV_Parser_Raw_Data 518623777839079 Function execution took 131255 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518623777839079
D CSV_Parser_Raw_Data 518623548622842 Function execution took 131186 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518623548622842
D CSV_Parser_Raw_Data 518623769252453 Function execution took 133981 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518623769252453
所以我很高兴知道我在这里缺少什么! IE。我怎样才能打破这个循环?
* 问题更新 * 感谢@kamal,他强迫我睁开眼睛,要求自己重新创建 buckets/topics 等。当我在执行任务时,重新检查所有内容并意识到,我在子文件夹中使用了一个临时文件,但在与上传文件相同的存储桶!这就是问题所在。 Finalize 事件适用于在存储桶中任意位置创建的任何对象。所以 Kamal 是对的,正在进行多次上传!
如果您以同样的方式处理您的项目,请确保创建一个 tmp 文件夹并确保您没有向该文件夹添加任何触发器。
您不能只从函数中 return 200。您实际上需要 "ack" 发布订阅消息。您没有显示实际从 pubsub 获取消息的代码,但我假设在该代码的某处,您有类似的内容:
queue = Queue.Queue()
message = queue.get()
parse_data(message.data, context)
这是您需要确认消息的地方:
queue = Queue.Queue()
message = queue.get()
if parse_data(message.data, context):
message.ack()
一般来说,Google 云 Pub/Sub 保证 at least once delivery 消息。这意味着总是有可能得到重复的,尽管它们应该相对罕见。在您的情况下,并不是一遍又一遍地处理同一条消息,而是不同的消息。诸如 518626734652287 之类的数字是消息 ID。由于每次都不同,这意味着发布了多条消息。很可能发生以下两种情况之一:
- 文件被多次上传。
- 多次设置GCS触发器。您可以通过 运行
gsutil notification list gs://<bucket name>
. 检查
如果后者是问题所在,您将看到多个条目,例如:
projects/_/buckets/my-bucket/notificationConfigs/1
Cloud Pub/Sub topic: projects/cloud-pubsub-training-examples/topics/my-topic
projects/_/buckets/my-bucket/notificationConfigs/2
Cloud Pub/Sub topic: projects/cloud-pubsub-training-examples/topics/my-topic
projects/_/buckets/my-bucket/notificationConfigs/3
Cloud Pub/Sub topic: projects/cloud-pubsub-training-examples/topics/my-topic
您可以通过使用配置名称发出删除命令来删除额外的通知,例如 gsutil notification delete projects/_/buckets/my-bucket/notificationConfigs/2
。
还值得注意的是,使用 Cloud Functions 和 Pub/Sub,可以设置两种类型的订阅:用户配置的订阅和 Cloud Functions 本身配置的订阅。默认情况下,前者的确认截止时间为 10 秒。这意味着如果消息在 10 秒内未被确认,它将被重新传送。对于后者,默认值为 600 秒。如果消息的处理时间超过此时间段,则可能会发生重新投递。
您可以尝试减少处理消息所需的时间,也可以增加确认截止时间。您可以使用 gcloud
工具增加确认截止日期:
gcloud pubsub subscriptions update <subscription name> --ack-deadline=180
这会将截止时间增加到 3 分钟。您也可以在 Cloud Console Pub/Sub page 中执行此操作,方法是单击订阅,单击 "Edit,",然后将 "Acknowledgment Deadline" 更改为更大的值。
使用 Cloud Functions,您不需要 return HTTP 状态。仅当您直接使用 push subscription 时才需要这样做。