RabbitMQ、.NET 和多线程
RabbitMQ , .NET and multithreading
我已经使用 RabbitMQ 及其 .NET 连接器 一段时间了。
经过一段时间的测试后,我的新系统使用 RabbitMQ 作为 Web 应用程序和 API 网络中的代理,投入生产。
所有应用程序都在几分钟内卡住了。
我想我达到了一些 OS 相关的阈值或搞砸了一些 TCP 堆栈(在阻塞之后甚至我的客户端的 TCP/IP 连接都不再访问网络服务器)。
关于如何处理通过数十个进程和数千个线程传播的密集流量,我还没有找到很好的 material。
我有一个系统每秒生成 10k+ 个线程,每个线程都必须通过连接丢弃一条消息,然后终止。
对于所有这些消息,我只有几个 'catchers'。
已经采取了一些反制措施。
不要为每个新线程使用新连接 -> 声明连接工厂并使用静态连接(据说连接和通道上的函数是线程安全的)-> 已解决
这是工厂的代码
public static class Factory
{
private static IConnection sharedConnection_cl;
public static IConnection SharedConnection_cl
{
get
{
if (sharedConnection_cl == null)
{
sharedConnection_cl = GetRabbitMqConnection();
}
return sharedConnection_cl;
}
private static IConnection GetRabbitMqConnection()
{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.HostName = "x.y.z.w";
connectionFactory.UserName = "notTheGuestUser";
connectionFactory.Password = "abcdef";
connectionFactory.VirtualHost = "/dedicatedHost";
return connectionFactory.CreateConnection();
}
不使用所有可用的 Erlang 进程。 Erlang 进程阈值在不到 10 分钟内就达到了(同一连接上的关闭通道不会触发服务器上相应 Erlang 进程的死亡)。->
为任何给定连接添加了最大通道数阈值,并使用信号量保护访问。每隔一段时间连接就会关闭并重新创建(相应的 Erlang 进程在连接关闭时终止)-> Solved
这是管理通道阈值的代码
public static class Factory
{
private static IConnection sharedConnection_cl;
private static int channelCount_int { get; set; }
static Semaphore connectionAccessSemaphore_sem = new Semaphore(1, 1);
public static IConnection SharedConnection_cl
{
get
{
if (sharedConnection_cl == null)
{
sharedConnection_cl = GetRabbitMqConnection();
channelCount_int = 0;
}
connectionAccessSemaphore_sem.WaitOne();
if (channelCount_int > 10000)
{
sharedConnection_cl.Close();
sharedConnection_cl = GetRabbitMqConnection();
channelCount_int = 0;
}
else
{
channelCount_int++;
}
connectionAccessSemaphore_sem.Release();
return sharedConnection_cl;
}
}
现在...超出增加 OS 标准阈值(这只会将不可避免的阻塞的痛苦从几分钟...延长到几个小时)...
是否有管理连接和通道的良好做法,以避免达到 OS 阈值和出现饱和趋势?
感谢您的支持。
好的,解决方案已经存在。只需向上移动信号量就可以了。我没有考虑到在系统重启时,当所有的 appPools 结束时,我在连接实例分配上遇到了并发问题。 -> 已解决
public static class Factory{
private static IConnection sharedConnection_cl;
private static int channelCount_int { get; set; }
static Semaphore connectionAccessSemaphore_sem = new Semaphore(1, 1);
public static IConnection SharedConnection_cl
{
get
{
connectionAccessSemaphore_sem.WaitOne();
if (sharedConnection_cl == null)
{
sharedConnection_cl = GetRabbitMqConnection();
channelCount_int = 0;
}
if (channelCount_int > 10000)
{
sharedConnection_cl.Close();
sharedConnection_cl = GetRabbitMqConnection();
channelCount_int = 0;
}
else
{
channelCount_int++;
}
connectionAccessSemaphore_sem.Release();
return sharedConnection_cl;
}
}
不确定为什么 AppPools 的锁锁定了服务器上的所有 TCP 连接。
我已经使用 RabbitMQ 及其 .NET 连接器 一段时间了。 经过一段时间的测试后,我的新系统使用 RabbitMQ 作为 Web 应用程序和 API 网络中的代理,投入生产。 所有应用程序都在几分钟内卡住了。 我想我达到了一些 OS 相关的阈值或搞砸了一些 TCP 堆栈(在阻塞之后甚至我的客户端的 TCP/IP 连接都不再访问网络服务器)。
关于如何处理通过数十个进程和数千个线程传播的密集流量,我还没有找到很好的 material。
我有一个系统每秒生成 10k+ 个线程,每个线程都必须通过连接丢弃一条消息,然后终止。 对于所有这些消息,我只有几个 'catchers'。
已经采取了一些反制措施。 不要为每个新线程使用新连接 -> 声明连接工厂并使用静态连接(据说连接和通道上的函数是线程安全的)-> 已解决
这是工厂的代码
public static class Factory
{
private static IConnection sharedConnection_cl;
public static IConnection SharedConnection_cl
{
get
{
if (sharedConnection_cl == null)
{
sharedConnection_cl = GetRabbitMqConnection();
}
return sharedConnection_cl;
}
private static IConnection GetRabbitMqConnection()
{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.HostName = "x.y.z.w";
connectionFactory.UserName = "notTheGuestUser";
connectionFactory.Password = "abcdef";
connectionFactory.VirtualHost = "/dedicatedHost";
return connectionFactory.CreateConnection();
}
不使用所有可用的 Erlang 进程。 Erlang 进程阈值在不到 10 分钟内就达到了(同一连接上的关闭通道不会触发服务器上相应 Erlang 进程的死亡)。-> 为任何给定连接添加了最大通道数阈值,并使用信号量保护访问。每隔一段时间连接就会关闭并重新创建(相应的 Erlang 进程在连接关闭时终止)-> Solved
这是管理通道阈值的代码
public static class Factory
{
private static IConnection sharedConnection_cl;
private static int channelCount_int { get; set; }
static Semaphore connectionAccessSemaphore_sem = new Semaphore(1, 1);
public static IConnection SharedConnection_cl
{
get
{
if (sharedConnection_cl == null)
{
sharedConnection_cl = GetRabbitMqConnection();
channelCount_int = 0;
}
connectionAccessSemaphore_sem.WaitOne();
if (channelCount_int > 10000)
{
sharedConnection_cl.Close();
sharedConnection_cl = GetRabbitMqConnection();
channelCount_int = 0;
}
else
{
channelCount_int++;
}
connectionAccessSemaphore_sem.Release();
return sharedConnection_cl;
}
}
现在...超出增加 OS 标准阈值(这只会将不可避免的阻塞的痛苦从几分钟...延长到几个小时)...
是否有管理连接和通道的良好做法,以避免达到 OS 阈值和出现饱和趋势?
感谢您的支持。
好的,解决方案已经存在。只需向上移动信号量就可以了。我没有考虑到在系统重启时,当所有的 appPools 结束时,我在连接实例分配上遇到了并发问题。 -> 已解决
public static class Factory{
private static IConnection sharedConnection_cl;
private static int channelCount_int { get; set; }
static Semaphore connectionAccessSemaphore_sem = new Semaphore(1, 1);
public static IConnection SharedConnection_cl
{
get
{
connectionAccessSemaphore_sem.WaitOne();
if (sharedConnection_cl == null)
{
sharedConnection_cl = GetRabbitMqConnection();
channelCount_int = 0;
}
if (channelCount_int > 10000)
{
sharedConnection_cl.Close();
sharedConnection_cl = GetRabbitMqConnection();
channelCount_int = 0;
}
else
{
channelCount_int++;
}
connectionAccessSemaphore_sem.Release();
return sharedConnection_cl;
}
}
不确定为什么 AppPools 的锁锁定了服务器上的所有 TCP 连接。