redis 集合原子 lpop 全部

redis collection atomic lpop all

redis链表,生产者保持lpush。在另一个线程中,消费者定期从列表中取出所有元素,并对元素进行分类。因为producer一直在push,所以take-all-out必须是原子的。那么有没有一种有效的方法来做到这一点? spring-data-redis可以用

// producer
getOpsForList.push(k, v);

// consumer
alist = range(k,0,-1); // take all out
alist.parallelStream() // during which a producer thread could push but I hope it is "blocked".
delete(k);  // list is now empty and push from producer is unblocked.

multiexec 没有达到我的目的,因为它实际上只在一个 t[=46 中提交了 lrangelpushdelete =]行动。到目前为止,我能想到的唯一方法是保留 lpop 并添加返回到 alist 直到列表为空。

编辑,这是我的想法: 当你想确定一个操作只有一次 运行 时,使用 watch:

watch key
val = get key
val = val + 1
multi
set key val
exec

当你想不"interrupted"(不是多线程中断),并且不关心它运行多少次时,t运行saction(multiexec) 就够了。

multi
val = lrange key 0 -1
delete key
exec

val 完成后仍然是一个列表,就像official-doc

中所说的

All the commands in a transaction are serialized and executed sequentially. It can never happen that a request issued by another client is served in the middle of the execution of a Redis transaction.

在redis之外,我把数据操作list.stream.parallelism去掉了,现在函数只关注数据getter,和上一段代码一模一样。 ;)

A good example to illustrate how WATCH can be used to create new atomic operations otherwise not supported by Redis is to implement ZPOP, that is a command that pops the element with the lower score from a sorted set in an atomic way.

文档中有 ZPOP 的实现,如下所示:

WATCH zset
element = ZRANGE zset 0 0
MULTI
ZREM zset element
EXEC

您需要做的是如果 EXEC 失败(即 returns 一个 Null 回复)重复上面的操作。生产者操作 lpush 是原子的,所以它不需要使用 watch 命令。例如:

// consumer pesudo code
do {
  watch(k);
  transaction = multi();
  alist = transaction.range(k,0,-1); 
  transaction.delete(k);  
  status = get status of transaction.exec();
} while(status == null);

alist.parallelStream()