使用队列时跳过 SQS 消息
Skip SQS messages while consuming a queue
我有一个消息生成器,将 {"user_id": 1, "message": "Ciao"}
之类的消息发送到 SQS 队列。
我有三个 Websocket 实例 API,它们的名字分别是 A、B 和 C。
假设有五个用户连接到 API,他们的 ID 分别是 1、2、3、4 和 5。
每个用户都连接到 API,我的平衡器(sticky ofc)使客户端以这种方式连接到 API:
A: 1, 3
B: 5
C: 2, 4
现在WebsocketAPI就是上面提到的SQS队列的消费者
当我为用户 1 排队消息时,任何消费者都可以先将其出队,比方说 C。
实例 C
无法对 user_id 1 的消息执行任何操作。A
确实保持与该用户的实际连接。
在我看来,这会像这样工作:任何消息都会被所有三个 API 实例接收。
- C 将读取它,并将其留在队列中
- B 将读取它,并将其留在队列中
- A 将读取它、处理它并将其从队列中移除
我的问题是:
上述工作流程用SQS实现可行吗?
是否可以从队列中读取消息并将其留在那里?或者如果当前消费者无法处理,我是否必须将其出队并重新入队?
这是发布者-订阅者架构的用例。要在 AWS 基础设施上实现这一点,首先假设 SQS 作为订阅者。在使用订阅的消息时,不应进行消息过滤。这是 Publisher 的一部分。 AWS 提供可以作为发布者的 SNS。
假设以下设置。使用 SNS 作为发布者。在此,您可以进行消息过滤 (https://docs.aws.amazon.com/sns/latest/dg/sns-message-filtering.html)。这允许 SNS 有选择地发送消息。
在您的用例中,
[Mesage for user 1/3]
-> SNS -> A 的 SQS -> Websocket A only gets messages for user 1/3
[Mesage for user 5]
-> SNS -> B 的 SQS -> Websocket A only gets messages for user 5
请注意,两种情况下的 SNS 主题保持不变。但是订阅者过滤消息,你会有多个 SQS 队列。
仅使用 SQS 是可行的(但需要在消费者中自定义代码)。
示例:
SQS 根据消费者发送的确认删除消息。如果消费者发送确认,SQS 将删除该条目。因此,一种设计方法是:
a) 消费消息
b) 检查它是否是想要的消息
c) 如果是,则向SQS发送ACK,SQS将删除该条目
d) 如果消息不是想要的消息,不发送并确认,SQS会保留记录
其他方法:
可以结合SQS和SNS来实现。
您可以为 3 个 Websocket 使用 3 个单独的队列 API 例如SQS_A, SQS_B, SQS_C
。你可以说一个 SNS 主题,MyTopic
。然后,可以让Queues根据订阅过滤订阅SNS主题。
示例,
如果事件过滤器中有 user_id 1 and 3
,SQS_A
将订阅 SNS 主题。
如果事件过滤器中有 user_id 5
,SQS_B
将订阅 SNS 主题。
如果事件过滤器中包含 user_id 2
,SQS_C
将订阅 SNS 主题。
SQS_A 的事件过滤看起来像这样:
{
"EventType": [
1,
3"
]
}
所以...现在当发布者发布消息时,它会将消息与事件类型一起发布到 SNS 主题。
示例:
sns.PublishInput{
Message: msg,
TopicArn: topic,
MessageAttributes: {
EventType: {
DataType: "String",
StringValue: <listOfUserId>
}
}
}
现在,SNS 只会将此消息发送到 SQS_A
而不会发送到 SQS 的其余部分。因此,只有 Websocket A 会使用该消息。
更多:https://docs.aws.amazon.com/sns/latest/dg/sns-subscription-filter-policies.html
注意:这不是一个健壮的、易于扩展的、可扩展的设计
我有一个消息生成器,将 {"user_id": 1, "message": "Ciao"}
之类的消息发送到 SQS 队列。
我有三个 Websocket 实例 API,它们的名字分别是 A、B 和 C。
假设有五个用户连接到 API,他们的 ID 分别是 1、2、3、4 和 5。
每个用户都连接到 API,我的平衡器(sticky ofc)使客户端以这种方式连接到 API:
A: 1, 3
B: 5
C: 2, 4
现在WebsocketAPI就是上面提到的SQS队列的消费者
当我为用户 1 排队消息时,任何消费者都可以先将其出队,比方说 C。
实例 C
无法对 user_id 1 的消息执行任何操作。A
确实保持与该用户的实际连接。
在我看来,这会像这样工作:任何消息都会被所有三个 API 实例接收。
- C 将读取它,并将其留在队列中
- B 将读取它,并将其留在队列中
- A 将读取它、处理它并将其从队列中移除
我的问题是:
上述工作流程用SQS实现可行吗?
是否可以从队列中读取消息并将其留在那里?或者如果当前消费者无法处理,我是否必须将其出队并重新入队?
这是发布者-订阅者架构的用例。要在 AWS 基础设施上实现这一点,首先假设 SQS 作为订阅者。在使用订阅的消息时,不应进行消息过滤。这是 Publisher 的一部分。 AWS 提供可以作为发布者的 SNS。
假设以下设置。使用 SNS 作为发布者。在此,您可以进行消息过滤 (https://docs.aws.amazon.com/sns/latest/dg/sns-message-filtering.html)。这允许 SNS 有选择地发送消息。
在您的用例中,
[Mesage for user 1/3]
-> SNS -> A 的 SQS -> Websocket A only gets messages for user 1/3
[Mesage for user 5]
-> SNS -> B 的 SQS -> Websocket A only gets messages for user 5
请注意,两种情况下的 SNS 主题保持不变。但是订阅者过滤消息,你会有多个 SQS 队列。
仅使用 SQS 是可行的(但需要在消费者中自定义代码)。
示例: SQS 根据消费者发送的确认删除消息。如果消费者发送确认,SQS 将删除该条目。因此,一种设计方法是:
a) 消费消息 b) 检查它是否是想要的消息 c) 如果是,则向SQS发送ACK,SQS将删除该条目 d) 如果消息不是想要的消息,不发送并确认,SQS会保留记录
其他方法:
可以结合SQS和SNS来实现。
您可以为 3 个 Websocket 使用 3 个单独的队列 API 例如SQS_A, SQS_B, SQS_C
。你可以说一个 SNS 主题,MyTopic
。然后,可以让Queues根据订阅过滤订阅SNS主题。
示例,
-
如果事件过滤器中有
SQS_A
将订阅 SNS 主题。
如果事件过滤器中有 SQS_B
将订阅 SNS 主题。
如果事件过滤器中包含 SQS_C
将订阅 SNS 主题。
user_id 1 and 3
,user_id 5
,user_id 2
,SQS_A 的事件过滤看起来像这样:
{
"EventType": [
1,
3"
]
}
所以...现在当发布者发布消息时,它会将消息与事件类型一起发布到 SNS 主题。
示例:
sns.PublishInput{
Message: msg,
TopicArn: topic,
MessageAttributes: {
EventType: {
DataType: "String",
StringValue: <listOfUserId>
}
}
}
现在,SNS 只会将此消息发送到 SQS_A
而不会发送到 SQS 的其余部分。因此,只有 Websocket A 会使用该消息。
更多:https://docs.aws.amazon.com/sns/latest/dg/sns-subscription-filter-policies.html
注意:这不是一个健壮的、易于扩展的、可扩展的设计