如何使用 XREAD .NET Redis 读取实际更改

how to work with XREAD .NET Redis to read actual changes

当我将停止点设置在 XREAD 程序不执行任何操作的行时。也许我需要配置这个 XREAD 命令?

public async void ListenTask()
        {
        var readTask = Task.Run(async () =>
        {
            while (!Token.IsCancellationRequested)
            {
                var result = db.StreamRead(streamName, "$", 1);

                if (result.Any())
                {
                    var dict = ParseResult(result.Last());
                    var sb = new StringBuilder();
                    foreach (var key in dict.Keys)
                    {
                        sb.Append(dict[key]);
                    }

                    Console.WriteLine(sb.ToString());

                }

                await Task.Delay(1000);
            }
        });
    }

StackExchange.Redis

上的非法操作

由于其独特的多路复用架构,StackExchange.Redis(您似乎正在使用的库)不支持阻塞 XREAD 操作。这是因为通过交互界面的所有命令(基本上所有 non-pub/sub)都使用相同的连接。如果您阻止其中一个连接,您的应用程序中依赖于多路复用器的所有其他内容都将被备份,等待阻止完成。 StackExchange.Redis 库实际上将 $ id 视为非法 id,毕竟它的唯一目的是阻止。最有可能发生的事情(您看不到它发生是因为它被同步上下文吞没了)是 var result = db.StreamRead(streamName, "$", 1); 抛出 InvalidOperationException:System.InvalidOperationException: StreamPosition.NewMessages cannot be used with StreamRead.

解决方法

在这种情况下有 2 种可能的解决方法,首先,您可以使用带 XRANGE 命令的轮询而不是使用阻塞读取。

var readTask = Task.Run(async () =>
{
    var lastId = "-";
    while (!token.IsCancellationRequested)
    {
        var result = await db.StreamRangeAsync("a-stream", lastId, "+");
        if(result.Any()
        {
            lastId = result.Last().Id;            
        }
        await Task.Delay(1000);
    }
});

您已经有效地进行了线程休眠,因此此轮询操作可能足以满足您的需求。

如果您确实需要进行阻塞操作,则必须使用不同的库,如果您尝试使用 StackExchange.Redis(您可以使用 Execute/ExecuteAsync 命令)你会严重地降低它的性能。

关于 ServiceStack.Redis and CsRedis are available on the redis developer site 的文章(我是它们的作者)

最后一件事

可能想确保当您发出这些命令时尽可能异步,您在异步上下文中使用同步 XREAD 命令(大多数 StackExchange.Redis 中的每个命令都有一个sync/async 您可以使用的版本 - 尽可能使用异步)。