这是 .NET 中 StackExchange.Redis 中流水线的正确实现吗?

Is this correct implementation of pipelining in StackExchange.Redis in .NET?

我正在尝试提高从 Redis 读取哈希值的性能,并且我已经尝试实施流水线,这是执行此操作的正确方法吗?

public Task FetchHashesFromRedis(List<string> redisKeys, CancellationToken cancellationToken) 
{
      var parallel = Environment.ProcessorCount;
      var semafore = new SemaphoreSlim(initialCount: parallel, maxCount: parallel);
      var tasks = new List<Task>();

      for (int i = 0; i < redisKeys.Count; i++)
      {
          var currentKey = redisKeys.ElementAt(i);
          var task = FetchFromRedis(currentKey, semafore, cancellationToken)
          tasks.Add(task);
      }

      return Task.WhenAll(tasks);
}

这段代码可以改进什么?

如果您使用异步 API 和一系列任务,

StackExchange.Redis 将自动为您管道化所有内容。您还需要确保在哈希的情况下,您利用 HSET/HMGET 命令的可变性来减少需要向 Redis 发出的原始命令的数量。

我不确定 FetchFromRedis 在您提供的代码的上下文中做了什么。假设它异步执行 HGET/HMGET/HGETALL 操作之一,并且 returns 从中执行任务,我怀疑你离得太远了。但是我还没有看到你是如何得出结果的?

以下代码在 Redis 中创建并从 10k Hashes 中获取数据。这有点微不足道,但展示了您正在寻找的行为。在我的机器上(当然,我要去本地主机),执行时间约为 75 毫秒 - 或大约 3.8 微秒/操作。完成后,结果将在 getTasks.

的任务中全部可用

你会想要确保(用一个现实的 data-load)你不会把它推得太重,因为粉碎 StackExchange.Redis 会导致它抛出超时异常(基本上,如果你排队太多东西,操作很可能会卡在多路复用器的 message-queue 中并失败。

var stopwatch = Stopwatch.StartNew();

var setTasks = new List<Task>();

for (var i = 1; i < 10000; i++)
{
    setTasks.Add(db.HashSetAsync($"hash:{i}", new HashEntry[]{new HashEntry("foo", "bar"), new HashEntry("baz", 3)}));
}

await Task.WhenAll(setTasks);

var getTasks = new List<Task<RedisValue[]>>(); 

for (var i = 1; i < 10000; i++)
{
    getTasks.Add(db.HashGetAsync($"hash:{i}", new RedisValue[]{"foo","baz"}));
}

await Task.WhenAll(getTasks);

stopwatch.Stop();

Console.Write($"total execution time:{stopwatch.ElapsedMilliseconds}");