使用 AMQP 维护与远程工作者的长期连接
Using AMQP to maintain a long-term connection to a remote worker
我正在尝试为以下场景建模
服务器通过 amqp
向工作进程分派 'START' 操作,如下所示(假设先前提供了 channel
和 action
并且操作是 START
有一些有效载荷。)
channel.assertQueue('', { exclusive: true }).then(({ queue }) => {
const cId = uuid()
channel.consume(queue, (msg) => {
if (msg.properties.correlationId === cId) {
const response = JSON.parse(msg.content.toString())
console.log('response', response)
resolve(response)
}
}, { noAck: true })
const msg = JSON.stringify(action)
channel.sendToQueue(
QUEUE_NAME,
new Buffer(msg),
{ correlationId: cId, replyTo: queue }
)
}, reject)
worker 获取 START action
以及 correlationId
和 replyTo
队列名称,将负载添加到它自己的待办事项内部列表中,并响应 replyTo
队列有 'START_SUCCESS' 操作。
现在工作人员将遍历其内部待办事项列表并完成它们,并通过相同的 replyTo
队列将 'UPDATE' 操作发回服务器,因此服务器需要知道继续监听该队列以获取更新,并且需要知道哪个工作人员正在处理任何特定任务的更新。服务器足够聪明,知道特定任务已经开始,因此在这种情况下不会再次调度它。
但是当工作人员停止执行任务时,服务器需要知道向哪个工作人员发送 'STOP' 消息。工作人员是否有办法将某种直接的 amqp 通道发送回服务器,服务器可以使用该通道发送 STOP 消息?
最简单的答案似乎是让工作人员创建一个 "reply" 队列,并在 'START_SUCCESS' 消息中将该标识符发送到服务器,然后服务器将该状态存储在某处。
但是,我认为 RabbitMQ 的强大功能很大程度上来自于这样一个事实,即消息不是直接发布到队列,而是发布到交换器,并且它们的最终目的地由它们的路由键决定。 (按队列名称发布实际上是通过使用路由键作为队列名称的交换器。)如果您不熟悉不同类型的交换器,请通读 the RabbitMQ Getting Started tutorials.
在这种情况下,与其考虑服务器和工作人员需要知道彼此的身份,不如考虑他们发布和订阅彼此的更新。如果一切都发布到交易所,那么服务器和工作人员实际上不需要了解彼此的身份。
我是这样认为的:
- 服务器为特定作业生成唯一 ID。
- 服务器将 START 消息发布到交换器
jobs.new
,其中包含对作业类型进行分类的路由键以及消息中的作业 ID。
- 服务器将匿名队列绑定到直接或主题交换器
jobs.status
,绑定键设置为作业 ID。
- 工作人员启动并从
jobs.ready
(或 jobs.ready.some_type
)获取一条消息。
- worker 将匿名队列绑定到
jobs.control
交换,并将作业 ID 作为绑定键。
- worker 启动任务,并发布 START_SUCCESS 消息到 exchange
jobs.status
,以作业 ID 作为路由键。
- 服务器从它在第 3 步绑定的队列接收 START_SUCCESS 消息,并为该作业更新其状态。
- worker周期性地向
jobs.status
交易所发送UPDATE消息;同样,路由键与作业 ID 匹配,因此服务器收到消息。
- 当服务器想要停止(或修改)运行 作业时,它会向
jobs.control
交换器发布一个 STOP 消息,并将作业 ID 作为路由键。
- 工作人员在第 5 步绑定的队列上收到此消息,并停止作业。
从 RabbitMQ 端来看,您有这些元素:
- 3次交流:
jobs.new
服务器发布新职位的地方。如果所有工作人员都可以处理所有工作,这可能是一个简单的扇出交换,或者它可以是一个主题交换,将其路由到不同类型工作人员的不同工作队列中。
jobs.status
其中更新由工作人员发布。这将是直接或主题交换,其路由键是或包含作业 ID。
jobs.control
服务器发布更新以控制现有作业。同样,这将是直接或主题交换,其路由键是或包含作业 ID。
- 永久队列:
- 单个
jobs.ready
队列,或不同的 jobs.ready.some_type
队列,绑定到 jobs.new
交换。
- 匿名队列:
- 服务器为每个作业创建一个队列,并使用该作业的 ID 绑定到
jobs.status
交换。或者,服务器进程可以有一个入站流量队列,并简单地从接收到的消息中读取作业 ID。
- 每个工人创建一个队列,并使用当前正在处理的作业的 ID 绑定到
jobs.control
交换。
请注意,您可以将额外的队列附加到这些交换中的任何一个以获取流量的副本,例如用于记录或调试。对于主题交换,只需将额外队列与绑定键 #
绑定,它将获得所有消息的副本,而不会中断任何现有绑定。
我正在尝试为以下场景建模
服务器通过 amqp
向工作进程分派 'START' 操作,如下所示(假设先前提供了 channel
和 action
并且操作是 START
有一些有效载荷。)
channel.assertQueue('', { exclusive: true }).then(({ queue }) => {
const cId = uuid()
channel.consume(queue, (msg) => {
if (msg.properties.correlationId === cId) {
const response = JSON.parse(msg.content.toString())
console.log('response', response)
resolve(response)
}
}, { noAck: true })
const msg = JSON.stringify(action)
channel.sendToQueue(
QUEUE_NAME,
new Buffer(msg),
{ correlationId: cId, replyTo: queue }
)
}, reject)
worker 获取 START action
以及 correlationId
和 replyTo
队列名称,将负载添加到它自己的待办事项内部列表中,并响应 replyTo
队列有 'START_SUCCESS' 操作。
现在工作人员将遍历其内部待办事项列表并完成它们,并通过相同的 replyTo
队列将 'UPDATE' 操作发回服务器,因此服务器需要知道继续监听该队列以获取更新,并且需要知道哪个工作人员正在处理任何特定任务的更新。服务器足够聪明,知道特定任务已经开始,因此在这种情况下不会再次调度它。
但是当工作人员停止执行任务时,服务器需要知道向哪个工作人员发送 'STOP' 消息。工作人员是否有办法将某种直接的 amqp 通道发送回服务器,服务器可以使用该通道发送 STOP 消息?
最简单的答案似乎是让工作人员创建一个 "reply" 队列,并在 'START_SUCCESS' 消息中将该标识符发送到服务器,然后服务器将该状态存储在某处。
但是,我认为 RabbitMQ 的强大功能很大程度上来自于这样一个事实,即消息不是直接发布到队列,而是发布到交换器,并且它们的最终目的地由它们的路由键决定。 (按队列名称发布实际上是通过使用路由键作为队列名称的交换器。)如果您不熟悉不同类型的交换器,请通读 the RabbitMQ Getting Started tutorials.
在这种情况下,与其考虑服务器和工作人员需要知道彼此的身份,不如考虑他们发布和订阅彼此的更新。如果一切都发布到交易所,那么服务器和工作人员实际上不需要了解彼此的身份。
我是这样认为的:
- 服务器为特定作业生成唯一 ID。
- 服务器将 START 消息发布到交换器
jobs.new
,其中包含对作业类型进行分类的路由键以及消息中的作业 ID。 - 服务器将匿名队列绑定到直接或主题交换器
jobs.status
,绑定键设置为作业 ID。 - 工作人员启动并从
jobs.ready
(或jobs.ready.some_type
)获取一条消息。 - worker 将匿名队列绑定到
jobs.control
交换,并将作业 ID 作为绑定键。 - worker 启动任务,并发布 START_SUCCESS 消息到 exchange
jobs.status
,以作业 ID 作为路由键。 - 服务器从它在第 3 步绑定的队列接收 START_SUCCESS 消息,并为该作业更新其状态。
- worker周期性地向
jobs.status
交易所发送UPDATE消息;同样,路由键与作业 ID 匹配,因此服务器收到消息。 - 当服务器想要停止(或修改)运行 作业时,它会向
jobs.control
交换器发布一个 STOP 消息,并将作业 ID 作为路由键。 - 工作人员在第 5 步绑定的队列上收到此消息,并停止作业。
从 RabbitMQ 端来看,您有这些元素:
- 3次交流:
jobs.new
服务器发布新职位的地方。如果所有工作人员都可以处理所有工作,这可能是一个简单的扇出交换,或者它可以是一个主题交换,将其路由到不同类型工作人员的不同工作队列中。jobs.status
其中更新由工作人员发布。这将是直接或主题交换,其路由键是或包含作业 ID。jobs.control
服务器发布更新以控制现有作业。同样,这将是直接或主题交换,其路由键是或包含作业 ID。
- 永久队列:
- 单个
jobs.ready
队列,或不同的jobs.ready.some_type
队列,绑定到jobs.new
交换。
- 单个
- 匿名队列:
- 服务器为每个作业创建一个队列,并使用该作业的 ID 绑定到
jobs.status
交换。或者,服务器进程可以有一个入站流量队列,并简单地从接收到的消息中读取作业 ID。 - 每个工人创建一个队列,并使用当前正在处理的作业的 ID 绑定到
jobs.control
交换。
- 服务器为每个作业创建一个队列,并使用该作业的 ID 绑定到
请注意,您可以将额外的队列附加到这些交换中的任何一个以获取流量的副本,例如用于记录或调试。对于主题交换,只需将额外队列与绑定键 #
绑定,它将获得所有消息的副本,而不会中断任何现有绑定。