如何将 SQS 队列订阅到 Java 中的 SNS 主题
How to subscribe a SQS queue to a SNS topic in Java
当我创建一个新队列并将其订阅到 Java 中的主题时,没有收到任何消息。通过 AWS Web 控制台同样可以正常工作。
我想我必须以某种方式确认订阅,但是 sns.confirmSubscription
方法需要一个令牌 - 我应该从哪里得到它?
这是我的 Java 代码:
String queueURL = sqs.createQueue("my-queue").getQueueUrl();
sns.subscribe(myTopicARN, "sqs", queueURL);
sns.publish(myTopicARN, "{\"payload\":\"test\"}");
sqs.receiveMessage(queueURL).getMessages()
.forEach(System.out::println); // nothing
我做错了什么?
看看这个:https://aws.amazon.com/blogs/developer/subscribing-queues-to-topics/
您应该这样订阅:
Topics.subscribeQueue(sns, sqs, myTopicARN, queueURL);
这种方便的方法为订阅创建了一个策略,以允许主题将消息发送到队列。
将队列订阅到sns不会自动创建允许sns向队列发送消息的策略(根据我使用sns/sqs的经验)所以你需要自己创建策略并授予sns权限要将消息发送到您的队列,这是一个关于如何使用队列 url 、队列 arn 和主题 arn
执行此操作的示例
import static com.amazonaws.auth.policy.Principal.All;
import static com.amazonaws.auth.policy.Statement.Effect.Allow;
import static com.amazonaws.auth.policy.actions.SQSActions.SendMessage;
import static com.amazonaws.auth.policy.conditions.ArnCondition.ArnComparisonType.ArnEquals;
final Statement mainQueueStatements = new Statement(Allow) //imported above
.withActions(SendMessage) //imported above
.withPrincipals(All) //imported above
.withResources(new Resource(queueArn)) // your queue arn
.withConditions(
new Condition()
.withType(ArnEquals.name()) //imported above
.withConditionKey(SOURCE_ARN_CONDITION_KEY) //imported above
.withValues(topicArn) // your topic arn
);
final Policy mainQueuePolicy = ()
.withId("MainQueuePolicy")
.withStatements(mainQueueStatements);
final HashMap<QueueAttributeName, String> attributes = new HashMap<>();
attributes.put(QueueAttributeName.Policy.toString(), mainQueuePolicy.toJson());
amazonSQS.setQueueAttributes(new SetQueueAttributesRequest().withAttributes(attributes).withQueueUrl(queueUrl)); // your queue url
当我创建一个新队列并将其订阅到 Java 中的主题时,没有收到任何消息。通过 AWS Web 控制台同样可以正常工作。
我想我必须以某种方式确认订阅,但是 sns.confirmSubscription
方法需要一个令牌 - 我应该从哪里得到它?
这是我的 Java 代码:
String queueURL = sqs.createQueue("my-queue").getQueueUrl();
sns.subscribe(myTopicARN, "sqs", queueURL);
sns.publish(myTopicARN, "{\"payload\":\"test\"}");
sqs.receiveMessage(queueURL).getMessages()
.forEach(System.out::println); // nothing
我做错了什么?
看看这个:https://aws.amazon.com/blogs/developer/subscribing-queues-to-topics/
您应该这样订阅:
Topics.subscribeQueue(sns, sqs, myTopicARN, queueURL);
这种方便的方法为订阅创建了一个策略,以允许主题将消息发送到队列。
将队列订阅到sns不会自动创建允许sns向队列发送消息的策略(根据我使用sns/sqs的经验)所以你需要自己创建策略并授予sns权限要将消息发送到您的队列,这是一个关于如何使用队列 url 、队列 arn 和主题 arn
执行此操作的示例import static com.amazonaws.auth.policy.Principal.All;
import static com.amazonaws.auth.policy.Statement.Effect.Allow;
import static com.amazonaws.auth.policy.actions.SQSActions.SendMessage;
import static com.amazonaws.auth.policy.conditions.ArnCondition.ArnComparisonType.ArnEquals;
final Statement mainQueueStatements = new Statement(Allow) //imported above
.withActions(SendMessage) //imported above
.withPrincipals(All) //imported above
.withResources(new Resource(queueArn)) // your queue arn
.withConditions(
new Condition()
.withType(ArnEquals.name()) //imported above
.withConditionKey(SOURCE_ARN_CONDITION_KEY) //imported above
.withValues(topicArn) // your topic arn
);
final Policy mainQueuePolicy = ()
.withId("MainQueuePolicy")
.withStatements(mainQueueStatements);
final HashMap<QueueAttributeName, String> attributes = new HashMap<>();
attributes.put(QueueAttributeName.Policy.toString(), mainQueuePolicy.toJson());
amazonSQS.setQueueAttributes(new SetQueueAttributesRequest().withAttributes(attributes).withQueueUrl(queueUrl)); // your queue url