PubSub 确认截止日期
PubSub acknowledge deadline
我有一个云功能,可以向 PubSub 发布消息并触发云 运行 执行存档文件过程。当有大文件时,我的云 运行 python 代码需要一些时间来处理数据,看起来 PubSub 在 20 秒(默认确认截止时间)后重试消息,这会触发我的另一个实例云运行。我已将确认截止日期增加到 600 秒并重新部署所有内容,但它仍在 20 秒后重试消息。我遗漏了什么?
云函数发布消息代码:
# Publishes a message
try:
publish_future = publisher.publish(topic_path, data=message_bytes)
publish_future.result() # Verify the publish succeeded
return 'Message published.'
except Exception as e:
print(e)
return (e, 500)
这是 PubSub 订阅配置:
记录显示 20 秒后触发了第二个实例:
云运行代码:
@app.route("/", methods=["POST"])
def index():
envelope = request.get_json()
if not envelope:
msg = "no Pub/Sub message received"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
if not isinstance(envelope, dict) or "message" not in envelope:
msg = "invalid Pub/Sub message format"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
pubsub_message = envelope["message"]
if isinstance(pubsub_message, dict) and "data" in pubsub_message:
#Decode base64 event['data']
event_data = base64.b64decode(pubsub_message['data']).decode('utf-8')
message = json.loads(event_data)
#logic to process data/archive
return ("", 204)
您应该能够通过设置 minimumBackoff retrypolicy 来控制重试。您可以将 minimumBackoff 时间设置为最大值 600 秒,就像您的 ack 截止时间一样,这样重新传递的消息将超过 600 秒。这应该会减少您看到的出现次数。
要处理重复项,建议使您的订阅者幂等。您需要应用某种代码检查以查看之前是否处理过 messageId。
您可以在下面的文档中找到 at-least-once-delivery :
通常,Pub/Sub 每条消息按发布顺序发送一次。但是,消息有时可能会乱序发送或多次发送。一般来说,适应多次传递需要您的订阅者在处理消息时是幂等的。您可以使用 Apache Beam 编程模型实现 Pub/Sub 消息流的精确一次处理。 Apache Beam I/O 连接器可让您通过受控的源和接收器与 Cloud Dataflow 交互。您可以使用 Apache Beam PubSubIO 连接器(用于 Java 和 Python)从云端读取 Pub/Sub。您还可以使用 Cloud Dataflow 服务的标准排序 API 实现有序处理。或者,为了实现排序,您订阅的主题的发布者可以在消息中包含一个序列标记。
我有一个云功能,可以向 PubSub 发布消息并触发云 运行 执行存档文件过程。当有大文件时,我的云 运行 python 代码需要一些时间来处理数据,看起来 PubSub 在 20 秒(默认确认截止时间)后重试消息,这会触发我的另一个实例云运行。我已将确认截止日期增加到 600 秒并重新部署所有内容,但它仍在 20 秒后重试消息。我遗漏了什么?
云函数发布消息代码:
# Publishes a message
try:
publish_future = publisher.publish(topic_path, data=message_bytes)
publish_future.result() # Verify the publish succeeded
return 'Message published.'
except Exception as e:
print(e)
return (e, 500)
这是 PubSub 订阅配置:
记录显示 20 秒后触发了第二个实例:
云运行代码:
@app.route("/", methods=["POST"])
def index():
envelope = request.get_json()
if not envelope:
msg = "no Pub/Sub message received"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
if not isinstance(envelope, dict) or "message" not in envelope:
msg = "invalid Pub/Sub message format"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
pubsub_message = envelope["message"]
if isinstance(pubsub_message, dict) and "data" in pubsub_message:
#Decode base64 event['data']
event_data = base64.b64decode(pubsub_message['data']).decode('utf-8')
message = json.loads(event_data)
#logic to process data/archive
return ("", 204)
您应该能够通过设置 minimumBackoff retrypolicy 来控制重试。您可以将 minimumBackoff 时间设置为最大值 600 秒,就像您的 ack 截止时间一样,这样重新传递的消息将超过 600 秒。这应该会减少您看到的出现次数。
要处理重复项,建议使您的订阅者幂等。您需要应用某种代码检查以查看之前是否处理过 messageId。
您可以在下面的文档中找到 at-least-once-delivery :
通常,Pub/Sub 每条消息按发布顺序发送一次。但是,消息有时可能会乱序发送或多次发送。一般来说,适应多次传递需要您的订阅者在处理消息时是幂等的。您可以使用 Apache Beam 编程模型实现 Pub/Sub 消息流的精确一次处理。 Apache Beam I/O 连接器可让您通过受控的源和接收器与 Cloud Dataflow 交互。您可以使用 Apache Beam PubSubIO 连接器(用于 Java 和 Python)从云端读取 Pub/Sub。您还可以使用 Cloud Dataflow 服务的标准排序 API 实现有序处理。或者,为了实现排序,您订阅的主题的发布者可以在消息中包含一个序列标记。