Amazon SQS FIFO 是否提供消息排序?
Does Amazon SQS FIFO provide message ordering?
我正在尝试使用 spring boot(v2.2.6.RELEASE).
实现 AWS SQS FIFO 队列
创建了一个队列,aws 中的“Testing.fifo”。
在创建队列时将所有其他字段保留为默认值。
我的生产者和消费者在单个服务上排队运行。
将消息放入队列的代码
private void sendMessageToSqsFiFo(String queueName, Object obj, String messageGroupId, String messageDeduplicationId) {
// TODO Auto-generated method stub
if (messageGroupId == null) {
// setting a default group
messageGroupId = "default_group_id";
}
if (messageDeduplicationId == null) {
// setting unique deduplication id for all messages
messageDeduplicationId = "sqs-fifo-deduplication-" +UUID.randomUUID();
}
Map<String, Object> headers = new HashMap<>();
headers.put("message-group-id", messageGroupId);
headers.put("message-deduplication-id", messageDeduplicationId);
queueMessagingTemplate.convertAndSend(queueName, obj, headers);
System.out.println("Message sent to FIFO "+ queueName);
}
队列侦听器代码
@SqsListener(value = "Testing.fifo", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
private void getMessageFromgeneralQdasd(Map<String, String> msg, @Header("MessageId") String messageId, Acknowledgment ack) {
System.out.println("Message from FIFO: " + msg.toString());
ack.acknowledge();
}
我在服务启动时将一些消息放入队列
@EventListener(ApplicationReadyEvent.class)
public void doSomethingAfterStartup() {
System.out.println("hello world, I have just started up");
for(int i=0;i<25;i++) {
Map<String, String> data = new HashMap<String, String>();
data.put("message", i + "th msg");
sendMessageToSqsFiFo("Testing.fifo", data, null, null);
}
}
我期待这样的系统日志
Message from FIFO: {message=0th msg}
Message from FIFO: {message=1th msg}
Message from FIFO: {message=2th msg}
Message from FIFO: {message=3th msg}
......
Message from FIFO: {message=24th msg}
按照我推送数据到队列的顺序
但是我得到了
Message from FIFO: {message=0th msg}
Message from FIFO: {message=2th msg}
Message from FIFO: {message=3th msg}
Message from FIFO: {message=1th msg}
Message from FIFO: {message=4th msg}
Message from FIFO: {message=6th msg}
Message from FIFO: {message=7th msg}
Message from FIFO: {message=5th msg}
Message from FIFO: {message=8th msg}
Message from FIFO: {message=9th msg}
Message from FIFO: {message=10th msg}
Message from FIFO: {message=11th msg}
Message from FIFO: {message=12th msg}
Message from FIFO: {message=13th msg}
Message from FIFO: {message=14th msg}
Message from FIFO: {message=15th msg}
Message from FIFO: {message=16th msg}
Message from FIFO: {message=17th msg}
Message from FIFO: {message=19th msg}
Message from FIFO: {message=18th msg}
Message from FIFO: {message=20th msg}
Message from FIFO: {message=21th msg}
Message from FIFO: {message=22th msg}
Message from FIFO: {message=23th msg}
Message from FIFO: {message=24th msg}
我是不是做错了什么?
使用依赖 aws-java-sdk 1.11.586
这可能会有所帮助:
消息组 ID 用于按组对 SQS 消息进行排序。
因此,如果您有多个组 ID,例如 1 和 2,那么在这些组中,SQS 消息将始终按顺序排列。顺序将相对于组而不是整个队列保留。
这可以用于以下情况:如果您要为不同的用户详细信息更新发送事件,您可能希望按顺序对特定用户的数据进行分组,同时您不关心整个队列的排序。在这种情况下,您可以将组 ID 设置为唯一用户 ID。
如果没有创建群组的用例,并且只有一个群组,并且您希望将消息顺序作为一个整体保留,只需对所有消息使用相同的 groupID。
一个原因可能是您的 SQSListener 在默认创建的 thread pool 上 运行。由于不同的线程从有序队列中获取消息,但 post 这完全取决于线程调度,所以基本上你不能保证 order.If 在消费者端有多个线程处理,你可以获得以下好处水平缩放但不能保证顺序。如果线程在线程池中是单个线程,那么它将是有序的。同样,如果有多个消费者 运行ning 实例(即使每个消费者都有单个线程),也将无法保证顺序。您可以通过在程序中打印线程 id/name 来验证这一点,它可以判断是否有多个线程正在处理。
我正在尝试使用 spring boot(v2.2.6.RELEASE).
实现 AWS SQS FIFO 队列创建了一个队列,aws 中的“Testing.fifo”。 在创建队列时将所有其他字段保留为默认值。
我的生产者和消费者在单个服务上排队运行。
将消息放入队列的代码
private void sendMessageToSqsFiFo(String queueName, Object obj, String messageGroupId, String messageDeduplicationId) {
// TODO Auto-generated method stub
if (messageGroupId == null) {
// setting a default group
messageGroupId = "default_group_id";
}
if (messageDeduplicationId == null) {
// setting unique deduplication id for all messages
messageDeduplicationId = "sqs-fifo-deduplication-" +UUID.randomUUID();
}
Map<String, Object> headers = new HashMap<>();
headers.put("message-group-id", messageGroupId);
headers.put("message-deduplication-id", messageDeduplicationId);
queueMessagingTemplate.convertAndSend(queueName, obj, headers);
System.out.println("Message sent to FIFO "+ queueName);
}
队列侦听器代码
@SqsListener(value = "Testing.fifo", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
private void getMessageFromgeneralQdasd(Map<String, String> msg, @Header("MessageId") String messageId, Acknowledgment ack) {
System.out.println("Message from FIFO: " + msg.toString());
ack.acknowledge();
}
我在服务启动时将一些消息放入队列
@EventListener(ApplicationReadyEvent.class)
public void doSomethingAfterStartup() {
System.out.println("hello world, I have just started up");
for(int i=0;i<25;i++) {
Map<String, String> data = new HashMap<String, String>();
data.put("message", i + "th msg");
sendMessageToSqsFiFo("Testing.fifo", data, null, null);
}
}
我期待这样的系统日志
Message from FIFO: {message=0th msg}
Message from FIFO: {message=1th msg}
Message from FIFO: {message=2th msg}
Message from FIFO: {message=3th msg}
......
Message from FIFO: {message=24th msg}
按照我推送数据到队列的顺序
但是我得到了
Message from FIFO: {message=0th msg}
Message from FIFO: {message=2th msg}
Message from FIFO: {message=3th msg}
Message from FIFO: {message=1th msg}
Message from FIFO: {message=4th msg}
Message from FIFO: {message=6th msg}
Message from FIFO: {message=7th msg}
Message from FIFO: {message=5th msg}
Message from FIFO: {message=8th msg}
Message from FIFO: {message=9th msg}
Message from FIFO: {message=10th msg}
Message from FIFO: {message=11th msg}
Message from FIFO: {message=12th msg}
Message from FIFO: {message=13th msg}
Message from FIFO: {message=14th msg}
Message from FIFO: {message=15th msg}
Message from FIFO: {message=16th msg}
Message from FIFO: {message=17th msg}
Message from FIFO: {message=19th msg}
Message from FIFO: {message=18th msg}
Message from FIFO: {message=20th msg}
Message from FIFO: {message=21th msg}
Message from FIFO: {message=22th msg}
Message from FIFO: {message=23th msg}
Message from FIFO: {message=24th msg}
我是不是做错了什么?
使用依赖 aws-java-sdk 1.11.586
这可能会有所帮助: 消息组 ID 用于按组对 SQS 消息进行排序。
因此,如果您有多个组 ID,例如 1 和 2,那么在这些组中,SQS 消息将始终按顺序排列。顺序将相对于组而不是整个队列保留。
这可以用于以下情况:如果您要为不同的用户详细信息更新发送事件,您可能希望按顺序对特定用户的数据进行分组,同时您不关心整个队列的排序。在这种情况下,您可以将组 ID 设置为唯一用户 ID。
如果没有创建群组的用例,并且只有一个群组,并且您希望将消息顺序作为一个整体保留,只需对所有消息使用相同的 groupID。
一个原因可能是您的 SQSListener 在默认创建的 thread pool 上 运行。由于不同的线程从有序队列中获取消息,但 post 这完全取决于线程调度,所以基本上你不能保证 order.If 在消费者端有多个线程处理,你可以获得以下好处水平缩放但不能保证顺序。如果线程在线程池中是单个线程,那么它将是有序的。同样,如果有多个消费者 运行ning 实例(即使每个消费者都有单个线程),也将无法保证顺序。您可以通过在程序中打印线程 id/name 来验证这一点,它可以判断是否有多个线程正在处理。