delivery_attempt 超过 max_delivery_attempts 后,PubSub 会将消息转发到死信主题
Will PubSub forward message to dead letter topic after delivery_attempt exceeds max_delivery_attempts
我的订阅者是这样的:
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import DeadLetterPolicy
dead_letter_policy = DeadLetterPolicy(
dead_letter_topic='dead_letter_topic',
max_delivery_attempts=5,
)
topic_path = subscriber.topic_path(PROJECT, TOPIC)
subscriber.create_subscription(sub_path, topic_path, dead_letter_policy=dead_letter_policy)
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(PROJECT, SUBSCRIPTION)
def callback(message):
print("Received message: {}".format(message))
print('Attempted:', message.delivery_attempt, 'times')
data = message.data.decode('utf-8')
data_d = json.loads(data)
if data_d["name"] == "some_file.json":
message.nack()
else:
message.ack()
收到的消息如下所示:
Received message: Message {
data: b'{\n "kind": "storage#object",\n "id": "...'
attributes: {
"bucketId": "sample_bucket",
...
}
}
Attempted: 12 times
明明尝试了5次以上,为什么我还能从PubSub主题中拉取这条消息?
这是订阅信息:
ackDeadlineSeconds: 10
deadLetterPolicy:
deadLetterTopic: projects/someproject/topics/dead_letter
maxDeliveryAttempts: 5
expirationPolicy:
ttl: 2678400s
messageRetentionDuration: 604800s
name: projects/someproject/subscriptions/new_sub
pushConfig: {}
topic: projects/someproject/topics/pubsub_sample
通常,如果您未授予 Pub/Sub 发布到您的死信主题或订阅您的订阅的权限,就会发生这种情况。您需要确保 运行 具备以下条件:
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud pubsub topics add-iam-policy-binding <dead letter topic> \
--member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
--role='roles/pubsub.publisher'
gcloud pubsub subscriptions add-iam-policy-binding <subscription with dead letter queue> \
--member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
--role='roles/pubsub.subscriber'
如果写入死信队列主题失败,Cloud Pub/Sub 将继续将消息传递给您的订阅者。
我的订阅者是这样的:
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import DeadLetterPolicy
dead_letter_policy = DeadLetterPolicy(
dead_letter_topic='dead_letter_topic',
max_delivery_attempts=5,
)
topic_path = subscriber.topic_path(PROJECT, TOPIC)
subscriber.create_subscription(sub_path, topic_path, dead_letter_policy=dead_letter_policy)
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(PROJECT, SUBSCRIPTION)
def callback(message):
print("Received message: {}".format(message))
print('Attempted:', message.delivery_attempt, 'times')
data = message.data.decode('utf-8')
data_d = json.loads(data)
if data_d["name"] == "some_file.json":
message.nack()
else:
message.ack()
收到的消息如下所示:
Received message: Message {
data: b'{\n "kind": "storage#object",\n "id": "...'
attributes: {
"bucketId": "sample_bucket",
...
}
}
Attempted: 12 times
明明尝试了5次以上,为什么我还能从PubSub主题中拉取这条消息? 这是订阅信息:
ackDeadlineSeconds: 10
deadLetterPolicy:
deadLetterTopic: projects/someproject/topics/dead_letter
maxDeliveryAttempts: 5
expirationPolicy:
ttl: 2678400s
messageRetentionDuration: 604800s
name: projects/someproject/subscriptions/new_sub
pushConfig: {}
topic: projects/someproject/topics/pubsub_sample
通常,如果您未授予 Pub/Sub 发布到您的死信主题或订阅您的订阅的权限,就会发生这种情况。您需要确保 运行 具备以下条件:
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud pubsub topics add-iam-policy-binding <dead letter topic> \
--member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
--role='roles/pubsub.publisher'
gcloud pubsub subscriptions add-iam-policy-binding <subscription with dead letter queue> \
--member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
--role='roles/pubsub.subscriber'
如果写入死信队列主题失败,Cloud Pub/Sub 将继续将消息传递给您的订阅者。