在 machinery(go) 中路由任务的最佳方法是什么?
What is the best way to route tasks in machinery(go)?
我正在尝试将机器用作分布式任务队列,并希望为不同的任务组部署单独的工作人员。例如。在数据库服务器旁边有一个工作人员 运行 数据库相关任务和不同服务器上的多个工作人员 运行 cpu/memory 密集型任务。只有文档并不清楚如何做到这一点。
我最初尝试 运行 工作人员没有向他们注册不需要的任务,但这导致工作人员反复使用未注册的任务并使用以下消息重新排队:
INFO: 2022/01/27 08:33:13 redis.go:342 Task not registered with this worker. Requeuing message: {"UUID":"task_7026263a-d085-4492-8fa8-e4b83b2c8d59","Name":"add","RoutingKey":"","ETA":null,"GroupUUID":"","GroupTaskCount":0,"Args":[{"Name":"","Type":"int32","Value":2},{"Name":"","Type":"int32","Value":4}],"Headers":{},"Priority":0,"Immutable":false,"RetryCount":0,"RetryTimeout":0,"OnSuccess":null,"OnError":null,"ChordCallback":null,"BrokerMessageGroupId":"","SQSReceiptHandle":"","StopTaskDeletionOnError":false,"IgnoreWhenTaskNotRegistered":false}
我怀疑这可以通过将 IgnoreWhenTaskNotRegistered
设置为 True 来解决,但这似乎不是一个非常优雅的解决方案。
任务签名也有一个 RoutingKey
字段,但文档中没有关于如何将工作人员配置为仅使用来自特定路由键的任务的信息。
此外,另一种解决方案是拥有单独的机器任务服务器,但这会剥夺在工作人员之间使用工作流和编排任务的能力。
通过反复试验找到解决方案。
将 IgnoreWhenTaskNotRegistered
设置为 true
不是正确的解决方案,因为与我最初的想法不同,worker 仍然使用未注册的任务然后将其丢弃而不是重新排队。
路由任务的正确方法是将任务签名中的 RoutingKey
设置为所需队列的名称,并使用 taskserver.NewCustomQueueWorker
获取特定于队列的工作对象而不是 taskserver.NewWorker
正在将任务发送到特定队列:
task := tasks.Signature{
Name: "<TASKNAME>",
RoutingKey: "<QUEUE>",
Args: []tasks.Arg{
// args...
},
}
res, err := taskserver.SendTask(&task)
if err != nil {
// handle error
}
并启动一个 worker 从特定队列消费:
worker := taskserver.NewCustomQueueWorker("<WORKERNAME>", concurrency, "<QUEUE>")
if err := worker.Launch(); err != nil {
// handle error
}
Still not quite sure how to tell a worker to consume from a set of queues as `NewCustomQueueWorker` only accepts a single string as it's queue name, however that's a relatively minor detail.
我正在尝试将机器用作分布式任务队列,并希望为不同的任务组部署单独的工作人员。例如。在数据库服务器旁边有一个工作人员 运行 数据库相关任务和不同服务器上的多个工作人员 运行 cpu/memory 密集型任务。只有文档并不清楚如何做到这一点。
我最初尝试 运行 工作人员没有向他们注册不需要的任务,但这导致工作人员反复使用未注册的任务并使用以下消息重新排队:
INFO: 2022/01/27 08:33:13 redis.go:342 Task not registered with this worker. Requeuing message: {"UUID":"task_7026263a-d085-4492-8fa8-e4b83b2c8d59","Name":"add","RoutingKey":"","ETA":null,"GroupUUID":"","GroupTaskCount":0,"Args":[{"Name":"","Type":"int32","Value":2},{"Name":"","Type":"int32","Value":4}],"Headers":{},"Priority":0,"Immutable":false,"RetryCount":0,"RetryTimeout":0,"OnSuccess":null,"OnError":null,"ChordCallback":null,"BrokerMessageGroupId":"","SQSReceiptHandle":"","StopTaskDeletionOnError":false,"IgnoreWhenTaskNotRegistered":false}
我怀疑这可以通过将 IgnoreWhenTaskNotRegistered
设置为 True 来解决,但这似乎不是一个非常优雅的解决方案。
任务签名也有一个 RoutingKey
字段,但文档中没有关于如何将工作人员配置为仅使用来自特定路由键的任务的信息。
此外,另一种解决方案是拥有单独的机器任务服务器,但这会剥夺在工作人员之间使用工作流和编排任务的能力。
通过反复试验找到解决方案。
将 IgnoreWhenTaskNotRegistered
设置为 true
不是正确的解决方案,因为与我最初的想法不同,worker 仍然使用未注册的任务然后将其丢弃而不是重新排队。
路由任务的正确方法是将任务签名中的 RoutingKey
设置为所需队列的名称,并使用 taskserver.NewCustomQueueWorker
获取特定于队列的工作对象而不是 taskserver.NewWorker
正在将任务发送到特定队列:
task := tasks.Signature{
Name: "<TASKNAME>",
RoutingKey: "<QUEUE>",
Args: []tasks.Arg{
// args...
},
}
res, err := taskserver.SendTask(&task)
if err != nil {
// handle error
}
并启动一个 worker 从特定队列消费:
worker := taskserver.NewCustomQueueWorker("<WORKERNAME>", concurrency, "<QUEUE>")
if err := worker.Launch(); err != nil {
// handle error
}
Still not quite sure how to tell a worker to consume from a set of queues as `NewCustomQueueWorker` only accepts a single string as it's queue name, however that's a relatively minor detail.