StackExchange.Redis 挡风设计

StackExchange.Redis Blocking pop design

所以我们有一个使用 ServiceStack.Redis 的现有帮助程序库,目前正在尝试将其与 StackExchange.Redis 交换。我们使用的是 BlockingPop (BLPOP),但由于 StackExchange.Redis 不支持它。我们实现如下

public static void Push(string Qname, string val)
{
    IDatabase db = redis.GetDatabase();
    db.ListLeftPush(Qname, val);
    ISubscriber sub = redis.GetSubscriber();
    sub.Publish(Qname + "_msg", "1");
}

和带有阻止选项的 Pop 如下:

    public static string Pop(string Qname, 
    bool block_until_available = false,int timeout_secs=0)
{   
    IDatabase db = redis.GetDatabase();            
    var popped = db.ListRightPop(Qname);
    if (popped.IsNull)
    {
        if (block_until_available == false)
            return null;
    }
    else
        return popped;

    //wait for an item to be pushed in.
    ISubscriber sub = redis.GetSubscriber();
    AutoResetEvent autoEvent = new AutoResetEvent(false);
    string obj = null;
    Task.Run(() =>
    {
        sub.Subscribe(Qname + "_msg", (channel, message) =>
        {
            popped = db.ListRightPop(Qname);
            if (!popped.IsNull)
            {
                obj = popped;
                sub.Unsubscribe(Qname + "_msg");
                autoEvent.Set();
            }
        });
    });
    if (timeout_secs > 0)
        autoEvent.WaitOne(timeout_secs * 1000);
    else
        autoEvent.WaitOne();
    return obj;
}

你们都看到这种方法有什么明显的问题吗?

此外,我很快 运行 进入以下错误。我增加了 syncTimeout。希望这会解决它?

System.TimeoutException: Timeout performing RPOP DL_PROD, 
inst: 0, mgr: ProcessReadQueue, err: never, queue: 0, qu: 0, qs: 0, qc: 0, 
wr: 0, wq: 0, in: 0, ar: 1, IOCP: (Busy=0,Free=1000,Min=8,Max=1000), 
WORKER: (Busy=2,Free=32765,Min=8,Max=32767), 
clientName: CD147RE1 at 
StackExchange.Redis.ConnectionMultiplexer.ExecuteSyncImpl[T]
(Message message, ResultProcessor`1 processor, ServerEndPoint server) 

RPOP 在这里超时没有具体原因,除非它与带宽相关(巨大的有效负载)。该错误看起来与 IMO 问题无关。这种方法很丑陋,但是……好吧,它可能会奏效。不过,这里没有必要使用 Task.Run。我认为,该方法的一个问题是它不能同时正确工作。它似乎取消订阅该频道/连接的所有代表,而不仅仅是一个。可以自行取消订阅,但坦率地说,我想知道只有一个自动重置事件和一个订阅是否更容易,如果一条消息在您等待时解锁大门:很好.


自行退订受托人的一般模式本质上是:

YourDelegateType handler = null;
handler = (args) => {
    DoTheThing();
    UnSubscribe(handler);
};
Subscribe(handler);

这利用词法捕获变量的乐趣来提供对委托实例inside的访问,这意味着委托可以传递自身 取消订阅方法。上面显示的模式应该适用于所有基于委托的回调场景——包括调节事件和诸如 SE.Redis pub/sub 处理程序之类的东西。