RabbitMQ、.NET 和多线程

RabbitMQ , .NET and multithreading

我已经使用 RabbitMQ 及其 .NET 连接器 一段时间了。 经过一段时间的测试后,我的新系统使用 RabbitMQ 作为 Web 应用程序和 API 网络中的代理,投入生产。 所有应用程序都在几分钟内卡住了。 我想我达到了一些 OS 相关的阈值或搞砸了一些 TCP 堆栈(在阻塞之后甚至我的客户端的 TCP/IP 连接都不再访问网络服务器)。

关于如何处理通过数十个进程和数千个线程传播的密集流量,我还没有找到很好的 material。

我有一个系统每秒生成 10k+ 个线程,每个线程都必须通过连接丢弃一条消息,然后终止。 对于所有这些消息,我只有几个 'catchers'。

已经采取了一些反制措施。 不要为每个新线程使用新连接 -> 声明连接工厂并使用静态连接(据说连接和通道上的函数是线程安全的)-> 已解决

这是工厂的代码

    public static class Factory
    {
        private static IConnection sharedConnection_cl;
        public static IConnection SharedConnection_cl
        {
            get
            {
                if (sharedConnection_cl == null)
                {
                    sharedConnection_cl = GetRabbitMqConnection();
                }
               return sharedConnection_cl;
         }

    private static IConnection GetRabbitMqConnection()
            {
                ConnectionFactory connectionFactory = new             ConnectionFactory();
                connectionFactory.HostName = "x.y.z.w";
                connectionFactory.UserName = "notTheGuestUser";
                connectionFactory.Password = "abcdef";
                connectionFactory.VirtualHost = "/dedicatedHost";
                return connectionFactory.CreateConnection();
          }

不使用所有可用的 Erlang 进程。 Erlang 进程阈值在不到 10 分钟内就达到了(同一连接上的关闭通道不会触发服务器上相应 Erlang 进程的死亡)。-> 为任何给定连接添加了最大通道数阈值,并使用信号量保护访问。每隔一段时间连接就会关闭并重新创建(相应的 Erlang 进程在连接关闭时终止)-> Solved

这是管理通道阈值的代码

public static class Factory
{
    private static IConnection sharedConnection_cl;
    private static int channelCount_int { get; set; }
    static Semaphore connectionAccessSemaphore_sem = new Semaphore(1, 1);
    public static IConnection SharedConnection_cl
    {
        get
        {
            if (sharedConnection_cl == null)
            {
                sharedConnection_cl = GetRabbitMqConnection();
                channelCount_int = 0;
            }
            connectionAccessSemaphore_sem.WaitOne();
            if (channelCount_int > 10000)
            {

                sharedConnection_cl.Close();
                sharedConnection_cl = GetRabbitMqConnection();
                channelCount_int = 0;
            }
            else
            {
                channelCount_int++;
            }
            connectionAccessSemaphore_sem.Release();
            return sharedConnection_cl;
        }
    }

现在...超出增加 OS 标准阈值(这只会将不可避免的阻塞的痛苦从几分钟...延长到几个小时)...

是否有管理连接和通道的良好做法,以避免达到 OS 阈值和出现饱和趋势?

感谢您的支持。

好的,解决方案已经存在。只需向上移动信号量就可以了。我没有考虑到在系统重启时,当所有的 appPools 结束时,我在连接实例分配上遇到了并发问题。 -> 已解决

public static class Factory{
private static IConnection sharedConnection_cl;
private static int channelCount_int { get; set; }
static Semaphore connectionAccessSemaphore_sem = new Semaphore(1, 1);
public static IConnection SharedConnection_cl
{
    get
    {
        connectionAccessSemaphore_sem.WaitOne();

        if (sharedConnection_cl == null)
        {
            sharedConnection_cl = GetRabbitMqConnection();
            channelCount_int = 0;
        }

        if (channelCount_int > 10000)
        {

            sharedConnection_cl.Close();
            sharedConnection_cl = GetRabbitMqConnection();
            channelCount_int = 0;
        }
        else
        {
            channelCount_int++;
        }
        connectionAccessSemaphore_sem.Release();
        return sharedConnection_cl;
    }
}

不确定为什么 AppPools 的锁锁定了服务器上的所有 TCP 连接。