如何以编程方式将 SQS 队列订阅到 Go 中的 SNS 主题?

How does one programmatically subscribe an SQS queue to an SNS topic in Go?

这是我尝试过的,使用 aws-sdk-go 的版本 53eb8b070e9a5067829fd029539966181632032a。

// main.go
package main

import (
    "errors"
    "fmt"
    "log"
    "net/http"

    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/sns"
    "github.com/aws/aws-sdk-go/service/sqs"
)

func main() {
    if err := makeTopicAndQueue(); err != nil {
        log.Fatalf("aws-sns-sqs: %v", err)
    }
}

func makeTopicAndQueue() error {
    sess, err := session.NewSession(&aws.Config{
        HTTPClient:  &http.Client{},
        Region:      aws.String("us-east-2"),
        Credentials: nil,
        MaxRetries:  aws.Int(0),
    })

    log.Printf("Creating an SNS topic.")
    snsClient := sns.New(sess, &aws.Config{})
    topicName := "test-topic"
    out, err := snsClient.CreateTopic(&sns.CreateTopicInput{Name: aws.String(topicName)})
    if err != nil {
        return fmt.Errorf(`creating topic "%s": %v`, topicName, err)
    }
    defer snsClient.DeleteTopic(&sns.DeleteTopicInput{TopicArn: out.TopicArn})

    log.Printf("Creating an SQS queue.")
    sqsClient := sqs.New(sess, &aws.Config{})
    subName := "test-subscription"
    out2, err := sqsClient.CreateQueue(&sqs.CreateQueueInput{QueueName: aws.String(subName)})
    if err != nil {
        return fmt.Errorf(`creating subscription queue "%s": %v`, subName, err)
    }

    log.Printf("Getting queue ARN.")
    out3, err := sqsClient.GetQueueAttributes(&sqs.GetQueueAttributesInput{
        QueueUrl:       out2.QueueUrl,
        AttributeNames: []*string{aws.String("QueueArn")},
    })
    if err != nil {
        return fmt.Errorf("getting queue ARN for %s: %v", *out2.QueueUrl, err)
    }
    qARN := out3.Attributes["QueueArn"]

    log.Printf("Subscribing the queue to the topic.")
    _, err = snsClient.Subscribe(&sns.SubscribeInput{
        TopicArn: out.TopicArn,
        Endpoint: qARN,
        Protocol: aws.String("sqs"),
    })
    if err != nil {
        return fmt.Errorf("subscribing: %v", err)
    }

    log.Printf("Getting the confirmation token from the queue.")
    out4, err := sqsClient.ReceiveMessage(&sqs.ReceiveMessageInput{
        QueueUrl: out2.QueueUrl,
    })
    if err != nil {
        return fmt.Errorf("receiving subscription confirmation message from queue: %v", err)
    }
    ms := out4.Messages
    var token *string
    switch len(ms) {
    case 0:
        return errors.New("no subscription confirmation message found in queue")
    case 1:
        m := ms[0]
        token = m.Body
    default:
        return fmt.Errorf("%d messages found in queue, want exactly 1", len(ms))
    }

    log.Printf("Using the token to finish subscribing.")
    _, err = snsClient.ConfirmSubscription(&sns.ConfirmSubscriptionInput{
        TopicArn: out.TopicArn,
        Token:    token,
    })
    if err != nil {
        return fmt.Errorf("confirming subscription: %v", err)
    }
    sqsClient.DeleteQueue(&sqs.DeleteQueueInput{QueueUrl: out2.QueueUrl})

    return nil
}

我原以为它会结束,但它失败了,输出如下:

[ ~/src/aws-sqs-issue ] go run main.go
2019/01/15 09:31:19 Creating an SNS topic.
2019/01/15 09:31:19 Creating an SQS queue.
2019/01/15 09:31:20 Getting queue ARN.
2019/01/15 09:31:20 Subscribing the queue to the topic.
2019/01/15 09:31:21 Getting the confirmation token from the queue.
2019/01/15 09:31:21 aws-sns-sqs: no subscription confirmation message found in queue

我是不是做错了什么或者这是 SDK 中的错误?

关于这个我不知道还能说些什么。这里有一些额外的措辞,以某种方式获得关于 post 主要是代码消失的警告。最好在此时停止阅读,因为其余的所有内容必然会使阅读变得乏味。我不知道还能继续胡说八道多久来满足这个愚蠢的算法。为什么他们不允许包含大量代码的简单 post?我不知道。呃,好吧。我的桌子上有一个倒置的地鼠。我认为这是故意的。由于可怜的小动物的解剖结构,他做的更多是眼球倒立而不是头倒立。我的办公桌植物在假期里表现不佳。最好给它喝点水。哇,这东西真的需要很多话。好吧,我的目标是取悦。如果我继续下去,我会不会不小心输出一些莎士比亚?嗯,完了,谢谢巴德

这个问题很老了,但这里有解决它的技巧:

For an Amazon SNS topic to be able to send messages to a queue, you must set a policy on the queue that allows the Amazon SNS topic to perform the sqs:SendMessage action. See more

要使用当前版本的 V1 SDK 执行此操作,在创建队列时,您必须手动将策略定义为队列属性,以允许 SNS 主题将消息发送到您的 SQS 队列。

这里是代码示例:

    queueARN := fmt.Sprintf("arn:aws:sqs:%s:%s:%s", "us-east-1", "xxx", "my-queue")
    topicARN := fmt.Sprintf("arn:aws:sns:%s:%s:%s", "us-east-1", "xxx", "my-topic")
    _, err := b.sqs.CreateQueue(&sqs.CreateQueueInput{
        QueueName: aws.String("my-queue"),
        Attributes: map[string]*string{
            "Policy": aws.String(fmt.Sprintf(`{
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Sid": "SNSTopicSendMessage",
                        "Effect": "Allow",
                        "Principal": "*",
                        "Action": "SQS:SendMessage",
                        "Resource": "%s",
                        "Condition": {
                            "ArnEquals": {
                                "aws:SourceArn": "%s"
                            }
                        }
                    }
                ]
            }`, queueARN, b.eventsTopicARN)),
        },
    })
    if err != nil {
        return err
    }