Azure Redis 缓存 - ConnectionMultiplexer 对象池

Azure Redis Cache - pool of ConnectionMultiplexer objects

我们在我们的应用程序中使用 C1 Azure Redis 缓存。最近我们遇到很多 GET 操作超时。

According to this article,一种可能的解决方案是实现 ConnectionMultiplexer 对象池。

Another possible solution is to use a pool of ConnectionMultiplexer objects in your client, and choose the “least loaded” ConnectionMultiplexer when sending a new request. This should prevent a single timeout from causing other requests to also timeout.

使用 C# 实现 ConnectionMultiplexer 对象池会是什么样子?

编辑:

如果您正在使用 StackExchange.Redis,根据此 github issue,您可以在连接多路复用器对象上使用 TotalOutstanding 属性。

这是我想出的一个实现,它工作正常:

public static int POOL_SIZE = 100;
private static readonly Object lockPookRoundRobin = new Object();
private static Lazy<Context>[] lazyConnection = null;

//Static initializer to be executed once on the first call
private static void InitConnectionPool()
{
    lock (lockPookRoundRobin)
    {
        if (lazyConnection == null) {
             lazyConnection = new Lazy<Context>[POOL_SIZE];
        }


        for (int i = 0; i < POOL_SIZE; i++){
            if (lazyConnection[i] == null)
                lazyConnection[i] = new Lazy<Context>(() => new Context("YOUR_CONNECTION_STRING", new CachingFramework.Redis.Serializers.JsonSerializer()));
        }
    }
}

private static Context GetLeastLoadedConnection()
{
    //choose the least loaded connection from the pool
    /*
    var minValue = lazyConnection.Min((lazyCtx) => lazyCtx.Value.GetConnectionMultiplexer().GetCounters().TotalOutstanding);
    var lazyContext = lazyConnection.Where((lazyCtx) => lazyCtx.Value.GetConnectionMultiplexer().GetCounters().TotalOutstanding == minValue).First();
    */

    // UPDATE following @Luke Foust comment below
    Lazy<Connection> lazyContext;

    var loadedLazys = lazyConnection.Where((lazy) => lazy.IsValueCreated);
    if(loadedLazys.Count()==lazyConnection.Count()){
        var minValue = loadedLazys.Min((lazy) => lazy.Value.TotalOutstanding);
        lazyContext = loadedLazys.Where((lazy) => lazy.Value.TotalOutstanding == minValue).First();
    }else{
        lazyContext = lazyConnection[loadedLazys.Count()];
    }
    return lazyContext.Value;
}

private static Context Connection
{
    get
    {
        lock (lockPookRoundRobin)
        {
            return GetLeastLoadedConnection();
        }
    }
}

public RedisCacheService()
{
    InitConnectionPool();
}

您还可以使用 StackExchange.Redis.Extensions

以更简单的方式完成此操作

示例代码:

    using StackExchange.Redis;
    using StackExchange.Redis.Extensions.Core.Abstractions;
    using StackExchange.Redis.Extensions.Core.Configuration;
    using System;
    using System.Collections.Concurrent;
    using System.Linq;

    namespace Pool.Redis
    {
    /// <summary>
    /// Provides redis pool
    /// </summary>
    public class RedisConnectionPool : IRedisCacheConnectionPoolManager
    {
        private static ConcurrentBag<Lazy<ConnectionMultiplexer>> connections;
        private readonly RedisConfiguration redisConfiguration;

        public RedisConnectionPool(RedisConfiguration redisConfiguration)
        {
            this.redisConfiguration = redisConfiguration;
            Initialize();
        }

        public IConnectionMultiplexer GetConnection()
        {
            Lazy<ConnectionMultiplexer> response;
            var loadedLazys = connections.Where(lazy => lazy.IsValueCreated);

            if (loadedLazys.Count() == connections.Count)
            {
                response = connections.OrderBy(x => x.Value.GetCounters().TotalOutstanding).First();
            }
            else
            {
                response = connections.First(lazy => !lazy.IsValueCreated);
            }

            return response.Value;
        }

        private void Initialize()
        {
            connections = new ConcurrentBag<Lazy<ConnectionMultiplexer>>();

            for (int i = 0; i < redisConfiguration.PoolSize; i++)
            {
                connections.Add(new Lazy<ConnectionMultiplexer>(() => ConnectionMultiplexer.Connect(redisConfiguration.ConfigurationOptions)));
            }
        }

        public void Dispose()
        {
            var activeConnections = connections.Where(lazy => lazy.IsValueCreated).ToList();
            activeConnections.ForEach(connection => connection.Value.Dispose());
            Initialize();
        }
    }
}

其中 RedisConfiguration 是这样的:

            return new RedisConfiguration()
        {
            AbortOnConnectFail = true,
            Hosts = new RedisHost[] {
                                      new RedisHost() 
                                      {
                                          Host = ConfigurationManager.AppSettings["RedisCacheAddress"].ToString(),
                                          Port = 6380
                                      },
                                    },
            ConnectTimeout = Convert.ToInt32(ConfigurationManager.AppSettings["RedisTimeout"].ToString()),
            Database = 0,
            Ssl = true,
            Password = ConfigurationManager.AppSettings["RedisCachePassword"].ToString(),
            ServerEnumerationStrategy = new ServerEnumerationStrategy()
            {
                Mode = ServerEnumerationStrategy.ModeOptions.All,
                TargetRole = ServerEnumerationStrategy.TargetRoleOptions.Any,
                UnreachableServerAction = ServerEnumerationStrategy.UnreachableServerActionOptions.Throw
            },
            PoolSize = 50
        };