Redis - 使用 BRPOPLPUSH 时清理处理队列的更好方法(可靠)

Redis - Better way of cleaning the processing queue(reliable) while using BRPOPLPUSH

我们目前的设计

环境 Redis 2.8.17

我们已经实现了我们的可靠队列,使用类似于 redis 文档中描述的模式的模式,在 RPOPLPUSH

但是,考虑到它的阻塞特性,我们正在使用 BRPOPLPUSH,并且 LPUSH为了保证先进先出的顺序。

生产者: 多个线程(来自多个服务器)使用 LPUSH 推送项目.

消费者: 多个线程(来自多个服务器)使用 BRPOPLPUSH 来处理项目.

BRPOPLPUSH q processing-q

如文档所述,redis 从队列 'q' 中弹出项目,同时将它们添加到 'processing-q'。

问题

由于我们应用程序的多线程(异步)性质,我们无法控制 消费者 将完成他们的处理。

因此,如果我们使用 LREM(根据文档)从 [=36 中删除已处理的元素=]processing-q,这只会移除processing-q的顶部元素。由于它无法保证是否已删除由相应的 consumer.

处理的实际元素

因此,如果我们什么都不做,processing-q 会继续增长(消耗内存),这非常恕我直言。

有什么建议或想法吗?

我会采用的方法是使用每个消费者处理-q(例如processing-q:consumer-id)。这将解决您当前的问题,但您仍然需要以某种方式处理崩溃的消费者。为此,我建议您还保留每个消费者弹出任务的最后时间并定期检查超时。如果消费者已经超时,将其任务移回主队列并删除其队列。

在一个类似的项目中,我使用主机名和工作进程的进程 ID 作为备份队列。每个worker都有自己的备份队列,如果worker死了,物品也不会丢失。

查看 README and the implementation 了解更多详情。

除了建议的解决方案之外,您还可以 ltrim 处理队列达到对您的服务有意义的数量。这将确保处理队列永远不会超出比例。

但是如果达到 trim 限制,您将开始丢失物品。对于您的用例,这可能会或可能不会被接受。

http://redis.io/commands/ltrim

您只需在调用 LREM 时包含要删除的作业。

LREM 采用以下形式:

LREM queue count "object"

它将从 queue 中删除 count 个等于 "object" 的项目。因此,要删除您的消费者线程正在处理的特定工作,您需要做这样的事情。

LREM processing-q 1 "job_identifier"

有关更多信息,请参阅此处的文档:http://redis.io/commands/lrem

然后,为了处理崩溃的消费者和放弃的作业,您可以使用 SETEX 创建具有到期时间的锁,并定期检查没有锁的作业。

所以整个过程是这样的:

制作人

  1. RPUSH q "job_identifier"

消费者

  1. SETEX lock:processing-q:job_identifier 60(先设置锁以避免竞争条件)
  2. BRPOPLPUSH q processing-queue
  3. 处理作业
  4. LREM processing-queue "job_identifier"

过期作业监视器

  1. 职位 = LRANGE processing-queue 0 -1
  2. foreach jobs in jobs : lock = GET lock:processing-q:job_identifier
  3. 如果锁定为空,则此作业超时,因此从 processing-q LREM processing-queue "job_identifier"
  4. 中删除
  5. 并使用 RPUSH q "job_identifier"
  6. 重试

@NotAUser 发布了开源 java 实现,此处:https://github.com/graknlabs/redisq/tree/master/src/main/java/ai/grakn/redisq