具有作业亲和力的作业队列

Job queue with job affinity

我目前面临一个问题,我很确定有一个官方名称,但我不知道在网上搜索什么。我希望如果我描述问题和我想到的解决方案,有人能够告诉我设计模式的名称(如果有一个与我要描述的相匹配的名称)。

基本上,我想要的是一个作业队列:我有多个创建作业的客户(发布者),以及一些处理这些作业的工作人员(消费者)。现在我想将发布者创建的作业分发给各种消费者,这基本上可以使用几乎任何具有跨队列负载平衡的消息队列来实现,例如使用 RabbitMQ 甚至 MQTT 5.

但是,现在事情变得复杂了……每项工作都涉及一个外部实体,比方说一个用户。我想要的是单个用户的作业按顺序处理,但并行处理多个用户。我没有要求用户 X 的工作总是交给工作人员 Y,因为无论如何它们都应该按顺序处理。

现在我可以使用 RabbitMQ 及其一致的哈希交换来解决这个问题,但是当新工作人员进入集群时我会发生数据竞争,因为 RabbitMQ 不支持重新定位已经在队列中的作业。

MQTT 5 也不支持这个:这里这个想法被称为"sticky shared subscriptions",但这不是官方的。它可能 是 MQTT 6 的一部分,也可能不是。谁知道呢

我也看过 NSQ、NATS 和其他一些经纪商。他们中的大多数甚至不支持这种非常具体的场景,而那些支持使用一致性哈希的场景则存在前面提到的数据竞争问题。

现在,如果代理在作业到达后不将作业分类到队列中,问题就会消失,但如果它会跟踪特定用户的作业是否已经在处理中:如果是这样,它应该延迟该用户的所有其他作业,但其他用户的所有作业仍应处理。这是,AFAICS,使用 RabbitMQ 等是不可能的。

我很确定我不是唯一有这方面用例的人。我可以,例如想想用户上传视频到视频平台,虽然上传的视频是并行处理的,但是单个用户上传的所有视频都是顺序处理的。

所以,长话短说:我所描述的内容有一个通用名称吗?诸如 分布式作业队列 之类的东西? 具有任务亲和力的任务调度器?或者别的什么?我尝试了很多条款,但没有成功。这可能意味着对此没有解决方案,但如前所述,很难想象我是这个星球上唯一遇到此问题的人。

我可以寻找什么想法?并且:是否有任何工具可以实现这一点?任何协议?

PS:仅使用预定义的路由密钥不是一种选择,因为用户 ID(我在这里只是用作虚构的示例)基本上是 UUID,因此可能有数十亿个,所以我需要更有活力的东西。因此,一致性哈希基本上是正确的方法,但如前所述,分发必须逐个进行,而不是预先进行,以避免数据竞争。

通过搜索 "job queue with category ordering",我找到了 this 对您描述的行为类型的讨论。

很遗憾,他们似乎无法解决您的问题。

有一个 ,它建议不要为任何类型的 order-sensitive 或 business-logic-sensitive 任务使用任何类型的 message-broker 服务,原因可能或可能不适用于你正在做的事情。它还指出了一种技术,它似乎可以完成您正在尝试做的事情,但可能无法很好地适应手头的任务。

如果您有 stickiness 的选项,它会巧妙地解决您的问题,而且效率低下最少。当然,粘性有其自身的故障模式。没有理由认为您会找到与您所做的完全相同的实现 trade-offs。

我假设,因为你在这里问了这个问题,所以 per-user sequential-ness 重要 。在您给出的视频平台处理上传的示例中,sequential-ness 违规没什么大不了的。更广泛地说,大多数需要 massive-throughput load-balanced 工作队列的人不需要 strong 保证事情的处理顺序。

如果您最终需要自己构建,您将有很多选择。我得到的印象是您期望巨大的吞吐量、高度并行化的架构和 低 user-id 冲突率 。在这种情况下,您可以考虑维护 先决条件:
的列表 当有新任务进入时,平衡器会搜索所有 in-process、已分配和 not-yet-assigned 作业以查找与作业键 (user_id) 匹配的任何作业。
如果存在匹配项,则新作业将添加到 not-yet-assigned 列表中,前提是最早的作业共享其密钥。
每次工作完成时,工作人员都需要检查 not-yet-assigned 列表以查看它是否刚刚完成任何人的先决条件。如果是这样,工作人员可以标记该子作业以进行分配,或者只处理子作业本身。
当然,这有其自身的故障模式;你必须trade-offs。

Kafka 可以提供帮助,因为它将消息存储一段时间,因此您可以再次轮询它们

what I want to have is a job queue: I have multiple clients that create jobs (publishers), and a number of workers that process these jobs (consumers). Now I want to distribute the jobs created by the publishers to the various consumers, which is basically doable using almost any message queue with load balancing across a queue, e.g. using RabbitMQ or even MQTT 5.

However, now things get complicated... every job refers to an external entity, let's say a user. What I want is that the jobs for a single user get processed in order, but for multiple users in parallel. I do not have the requirement that the jobs for user X always go to worker Y, since they should be processed sequentially anyway.

即使不是这个特定的用例,我在几个月前对(动态)任务调度 [0] [1] 进行了调查,但没有发现类似的情况。

我读到的每个调度算法都有一些所有其他任务共有的属性,如优先级、年龄、入队时间、任务名称(以及平均处理时间)。如果您的任务都链接到一个用户,您可以 构建 一个调度程序,它考虑 user_id 从队列中选择任务。

但我想,您不想构建自己的调度程序,无论如何这都是浪费,因为根据这种需求的经验,现有的消息队列可以实现您的要求。

要总结您的要求,您需要:

A scheduler that run only one task per user at the same time.

解决方案是使用分布式锁,类似REDIS distlock,在任务开始前获取锁,并在任务执行过程中定时刷新。如果同一用户的新任务进入并尝试执行,它将无法获取锁并且将是 re-enqueued.

这是一个pseudo-code:

def my_task(user_id, *args, **kwargs):
    if app.distlock(user_id, blocking=False):
        exec_my_task(user_id, *args, **kwargs)
    else:
        raise RetryTask()

别忘了刷新释放.

采用类似的方法在爬虫中的每个请求之间实施 robots.txt 延迟。

对每个实体的处理顺序提出严格要求是一项挑战。

每个发布的任务如何long-running?如果它们总是很短,您可以通过哈希分配任务,并在每次改变形状时简单地耗尽 运行 作业的工作池,而不会损失太多生产力。

如果它们是 longer-running,可能会太慢。在那种情况下,您还可以让工作人员在执行期间为他们使用的每个任务的 user_id 从快速中央服务(如 Redis 或其他东西)中取出原子咨询锁。此服务还可以按用户 ID 范围或 what-have-you 单独扩展分区。如果在接收任务和执行任务的第一个副作用之间有足够的差距,工作人员甚至不需要阻止成功获取锁直到它即将提交,因此可能不会看到显着增加潜伏。争用*可能很少见:如果您已经在 user_id 上使用一些一致的散列方案来分配工作,它们确实很少见,并且仍然只在 worker-pool 拓扑发生变化时才会发生。您至少应该使用散列分布来保证只有两个工人竞争锁:旧的和新的。**

如果授予锁是按 first-come-first-serve 顺序服务的,并且请求锁的速度比 worker-pool 拓扑更改快(也就是说,工作人员一收到来自出版商),这甚至可以为您提供很好的订购保证,即使拓扑变化非常快。

编辑:

*我原来写的是"Failures";不完全是我的意思。这个想法是,除非拓扑发生变化,否则此锁定服务几乎永远不会遇到任何锁定争用,因为给定用户的任务将始终正常发送给同一工作人员。

**另一种可能性:您也可以仅通过 部分 工作池耗尽来提供良好的保证。如果没有 user-level 咨询锁,如果您使用一致的哈希方案来分发任务并且您可以为完成已分派的任务保持低水位线,则可以 延迟 启动任务其目标工作人员不同于当前最旧的当前执行任务开始时的目标工作人员(即,仅针对其分配的工作人员发生变化的用户排空 运行 任务)。这是相当多的额外复杂性;如果您可以有效地跟踪低水位线并且您没有 long-running 任务的长尾,那么它可能是一个不错的选择,可以让您省略锁定服务。然而,在撰写本文时,我并不清楚这是否会比锁更便宜;低水位标记的可靠实施通常并不便宜,并且工人在错误的时间死亡可能会延迟整个 1/N 队列的处理,这些队列改变了工人,而不仅仅是任务在 in-flight 上的用户它死亡时的工人。

如果我对您的场景的理解正确,我相信您描述的功能与 Message Sessions work in Azure Service Bus.

非常相似

在将消息推入队列之前,您基本上会将消息的 SessionId 属性 设置为 UserId

每个消费者将锁定一个接一个处理消息的会话,这些消息将属于同一个用户。完成后,消费者可以继续进行下一个可用会话。

此外,Azure Functions 最近发布了服务总线会话支持,目前处于预览状态,但您可以轻松实现所有这些功能。

不幸的是,我不够熟悉,不知道这个功能是否存在于一个开源替代品中,但我希望这能有所帮助。

Temporal Workflow 能够以最小的努力支持您的用例。

这里有一个稻草人设计可以满足你的要求:

  • 使用 userID 作为工作流 ID 向用户工作流发送 signalWithStart 请求。它要么将信号传递给工作流,要么首先启动工作流并将信号传递给它。
  • 对该工作流的所有请求都由它缓冲。 Temporal 提供了一种硬性保证,即只有一个具有给定 ID 的工作流才能处于打开状态。因此,所有信号(事件)都保证在属于用户的工作流中进行缓冲。在存在任何流程或基础设施故障的情况下,Temporal 会保留工作流中的所有数据(包括堆栈跟踪和局部变量)。因此无需显式保留 taskQueue 变量。
  • 内部工作流事件循环一个接一个地分派这些请求。
  • 当缓冲区为空时工作流可以完成。

这是在Java中实现它的工作流代码(也支持Go和PHP SDK,NodeJS处于alpha阶段):

@WorkflowInterface
public interface SerializedExecutionWorkflow {

    @WorkflowMethod
    void execute();

    @SignalMethod
    void addTask(Task t);
}

@ActivityInterface
public interface TaskProcessorActivity {
    void process(Task poll);
}

public class SerializedExecutionWorkflowImpl implements SerializedExecutionWorkflow {

    private final Queue<Task> taskQueue = new ArrayDeque<>();
    private final TaskProcesorActivity processor = Workflow.newActivityStub(TaskProcesorActivity.class);

    @Override
    public void execute() {
        while(!taskQueue.isEmpty()) {
            processor.process(taskQueue.poll());
        }
    }

    @Override
    public void addTask(Task t) {
        taskQueue.add(t);
    }
}

然后是通过信号方法将该任务排入工作流的代码:

private void addTask(WorkflowClient cadenceClient, Task task) {
    // Set workflowId to userId
    WorkflowOptions options = WorkflowOptions.newBuilder()
       .setTaskQueue(TASK_QUEUE)
       .setWorkflowId(task.getUserId())
       .build();
    // Use workflow interface stub to start/signal workflow instance
    SerializedExecutionWorkflow workflow = temporalClient.newWorkflowStub(SerializedExecutionWorkflow.class, options);
    BatchRequest request = temporalClient.newSignalWithStartRequest();
    request.add(workflow::execute);
    request.add(workflow::addTask, task);
    temporalClient.signalWithStart(request);
}

与使用队列进行任务处理相比,Temporal 提供了许多其他优势。

  • 构建了具有无限到期间隔的指数重试
  • 故障处理。例如,如果在配置的时间间隔内两次更新均未成功,它允许执行通知另一项服务的任务。
  • 支持长运行心跳操作
  • 能够实现复杂的任务依赖关系。例如,在发生不可恢复的故障时实施调用链或补偿逻辑 (SAGA)
  • 提供对更新当前状态的完整可见性。例如,当使用队列时,您知道队列中是否有一些消息,并且您需要额外的数据库来跟踪整体进度。使用 Temporal,每个事件都会被记录下来。
  • 能够取消更新 in-flight。
  • 分布式 CRON 支持

请参阅 the presentation,其中介绍了时间编程模型。它提到了 Cadence 项目,它是 Temporal 的前身。

只要锁冲突不经常发生,amirouche 所描述的就是一个简单的解决方案。如果是这样,您将浪费大量时间在您的工作人员获取他们必须拒绝并让消息代理重新排队的消息上。

Actor 模型/Actor 框架可以很好地解决此类问题。一些例子包括 Akka、Orleans、Protoactor 和 Cadence(上面提到过,尽管 Candence 不仅仅是一个 actor 框架)。这些框架可能会变得非常复杂,但其核心可以确保一次处理单个参与者的消息,但允许同时处理多个参与者(在您的场景中,每个用户 ID 都有一个参与者)。这些框架从您那里抽象出所有消息路由和并发性,极大地简化了实现,并且从长远来看应该更加健壮/可扩展。

Apache Qpid broker supports a feature called message groups,其中路由键和 worker 之间的关系是动态的,并且基于当前流量。

Consumption ordering means that the broker will not allow outstanding unacknowledged messages to more than one consumer for a given group.

This means that only one consumer can be processing messages from a particular group at a given time. When the consumer acknowledges all of its acquired messages, then the broker may pass the next pending message from that group to a different consumer.

这可能会更好地利用工人:

Note well that distinct message groups would not block each other from delivery. For example, assume a queue contains messages from two different message groups - say group "A" and group "B" - and they are enqueued such that "A"'s messages are in front of "B". If the first message of group "A" is in the process of being consumed by a client, then the remaining "A" messages are blocked, but the messages of the "B" group are available for consumption by other consumers - even though it is "behind" group "A" in the queue.

此功能仍然可能以显着的性能价格提供,when compared to other brokers. And there is not much interest in Qpid these days 4 5

编辑:还有其他经纪商也提供此功能:ActiveMQ and ActiveMQ Artemis. EDIT2:事实证明,ActiveMQ 和 Artemis 中的“消息组”工作方式不同——将组分配给工作人员是静态的(粘性的)而不是动态的。

Kafka 完全支持您的需求。你需要配置一个key,kafka会保证所有拥有相同key的消息都会被顺序处理。