Redis - 使用 BRPOPLPUSH 时清理处理队列的更好方法(可靠)
Redis - Better way of cleaning the processing queue(reliable) while using BRPOPLPUSH
我们目前的设计
环境 Redis 2.8.17
我们已经实现了我们的可靠队列,使用类似于 redis 文档中描述的模式的模式,在 RPOPLPUSH
但是,考虑到它的阻塞特性,我们正在使用 BRPOPLPUSH,并且 LPUSH为了保证先进先出的顺序。
生产者: 多个线程(来自多个服务器)使用 LPUSH 推送项目.
消费者: 多个线程(来自多个服务器)使用 BRPOPLPUSH 来处理项目.
BRPOPLPUSH q processing-q
如文档所述,redis 从队列 'q' 中弹出项目,同时将它们添加到 'processing-q'。
问题
由于我们应用程序的多线程(异步)性质,我们无法控制 消费者 将完成他们的处理。
因此,如果我们使用 LREM(根据文档)从 [=36 中删除已处理的元素=]processing-q,这只会移除processing-q的顶部元素。由于它无法保证是否已删除由相应的 consumer.
处理的实际元素
因此,如果我们什么都不做,processing-q 会继续增长(消耗内存),这非常恕我直言。
有什么建议或想法吗?
我会采用的方法是使用每个消费者处理-q(例如processing-q:consumer-id)。这将解决您当前的问题,但您仍然需要以某种方式处理崩溃的消费者。为此,我建议您还保留每个消费者弹出任务的最后时间并定期检查超时。如果消费者已经超时,将其任务移回主队列并删除其队列。
在一个类似的项目中,我使用主机名和工作进程的进程 ID 作为备份队列。每个worker都有自己的备份队列,如果worker死了,物品也不会丢失。
查看 README and the implementation 了解更多详情。
除了建议的解决方案之外,您还可以 ltrim
处理队列达到对您的服务有意义的数量。这将确保处理队列永远不会超出比例。
但是如果达到 trim 限制,您将开始丢失物品。对于您的用例,这可能会或可能不会被接受。
您只需在调用 LREM 时包含要删除的作业。
LREM 采用以下形式:
LREM queue count "object"
它将从 queue 中删除 count 个等于 "object" 的项目。因此,要删除您的消费者线程正在处理的特定工作,您需要做这样的事情。
LREM processing-q 1 "job_identifier"
有关更多信息,请参阅此处的文档:http://redis.io/commands/lrem
然后,为了处理崩溃的消费者和放弃的作业,您可以使用 SETEX 创建具有到期时间的锁,并定期检查没有锁的作业。
所以整个过程是这样的:
制作人
RPUSH q "job_identifier"
消费者
SETEX lock:processing-q:job_identifier 60
(先设置锁以避免竞争条件)
BRPOPLPUSH q processing-queue
- 处理作业
LREM processing-queue "job_identifier"
过期作业监视器
- 职位 =
LRANGE processing-queue 0 -1
- foreach jobs in jobs : lock =
GET lock:processing-q:job_identifier
- 如果锁定为空,则此作业超时,因此从 processing-q
LREM processing-queue "job_identifier"
中删除
- 并使用
RPUSH q "job_identifier"
重试
@NotAUser 发布了开源 java 实现,此处:https://github.com/graknlabs/redisq/tree/master/src/main/java/ai/grakn/redisq
我们目前的设计
环境 Redis 2.8.17
我们已经实现了我们的可靠队列,使用类似于 redis 文档中描述的模式的模式,在 RPOPLPUSH
但是,考虑到它的阻塞特性,我们正在使用 BRPOPLPUSH,并且 LPUSH为了保证先进先出的顺序。
生产者: 多个线程(来自多个服务器)使用 LPUSH 推送项目.
消费者: 多个线程(来自多个服务器)使用 BRPOPLPUSH 来处理项目.
BRPOPLPUSH q processing-q
如文档所述,redis 从队列 'q' 中弹出项目,同时将它们添加到 'processing-q'。
问题
由于我们应用程序的多线程(异步)性质,我们无法控制 消费者 将完成他们的处理。
因此,如果我们使用 LREM(根据文档)从 [=36 中删除已处理的元素=]processing-q,这只会移除processing-q的顶部元素。由于它无法保证是否已删除由相应的 consumer.
处理的实际元素因此,如果我们什么都不做,processing-q 会继续增长(消耗内存),这非常恕我直言。
有什么建议或想法吗?
我会采用的方法是使用每个消费者处理-q(例如processing-q:consumer-id)。这将解决您当前的问题,但您仍然需要以某种方式处理崩溃的消费者。为此,我建议您还保留每个消费者弹出任务的最后时间并定期检查超时。如果消费者已经超时,将其任务移回主队列并删除其队列。
在一个类似的项目中,我使用主机名和工作进程的进程 ID 作为备份队列。每个worker都有自己的备份队列,如果worker死了,物品也不会丢失。
查看 README and the implementation 了解更多详情。
除了建议的解决方案之外,您还可以 ltrim
处理队列达到对您的服务有意义的数量。这将确保处理队列永远不会超出比例。
但是如果达到 trim 限制,您将开始丢失物品。对于您的用例,这可能会或可能不会被接受。
您只需在调用 LREM 时包含要删除的作业。
LREM 采用以下形式:
LREM queue count "object"
它将从 queue 中删除 count 个等于 "object" 的项目。因此,要删除您的消费者线程正在处理的特定工作,您需要做这样的事情。
LREM processing-q 1 "job_identifier"
有关更多信息,请参阅此处的文档:http://redis.io/commands/lrem
然后,为了处理崩溃的消费者和放弃的作业,您可以使用 SETEX 创建具有到期时间的锁,并定期检查没有锁的作业。
所以整个过程是这样的:
制作人
RPUSH q "job_identifier"
消费者
SETEX lock:processing-q:job_identifier 60
(先设置锁以避免竞争条件)BRPOPLPUSH q processing-queue
- 处理作业
LREM processing-queue "job_identifier"
过期作业监视器
- 职位 =
LRANGE processing-queue 0 -1
- foreach jobs in jobs : lock =
GET lock:processing-q:job_identifier
- 如果锁定为空,则此作业超时,因此从 processing-q
LREM processing-queue "job_identifier"
中删除
- 并使用
RPUSH q "job_identifier"
重试
@NotAUser 发布了开源 java 实现,此处:https://github.com/graknlabs/redisq/tree/master/src/main/java/ai/grakn/redisq