多线程&信号量&事件
Multi Thread & Semaphore & Events
我正在尝试执行一些来自 RabbitMQ 的命令。大约 5 msgs/sec。消息太多,我必须发送到一个线程来执行,但我没有那么多线程,所以我限制为 10 个。
所以我的想法是消息会传给工作人员,放入队列中,10 个线程中的任何一个都会达到峰值并执行。所有这些都使用信号量。
经过一些实验,我不知道为什么,但我的线程只执行了 3 或 4 个项目,之后它就停止了,没有错误...
我认为是事件调用方法执行时的逻辑问题,想不出更好的方法...
为什么只处理前 4 条消息?
执行此操作的模式或更好的方法?
以下是我的部分代码:
const int MaxThreads = 10;
private static Semaphore sem = new Semaphore(MaxThreads, MaxThreads);
private static Queue<BasicDeliverEventArgs> queue = new Queue<BasicDeliverEventArgs>();
static void Main(string[] args)
{
consumer.Received += (sender, ea) =>
{
var m = JsonConvert.DeserializeObject<Mail>(ea.Body.GetString());
Console.WriteLine($"Sub-> {m.Subject}");
queue.Enqueue(ea);
RUN();
};
channel.BasicConsume(queueName, false, consumer);
Console.Read();
}
private static void RUN()
{
while (queue.Count > 0)
{
sem.WaitOne();
var item = queue.Dequeue();
ThreadPool.QueueUserWorkItem(sendmail, item);
}
}
private static void sendmail(Object item)
{
//.....soem processing stuff....
//tell rabbitMq that everything was OK
channel.BasicAck(deliveryTag: x.DeliveryTag, multiple: true);
//release thread
sem.Release();
}
我认为您可以在此处使用阻塞集合。它将简化代码。
所以您的电子邮件发件人看起来像这样:
public class ParallelEmailSender : IDisposable
{
private readonly BlockingCollection<string> blockingCollection;
public ParallelEmailSender(int threadsCount)
{
blockingCollection = new BlockingCollection<string>(new ConcurrentQueue<string>());
for (int i = 0; i < threadsCount; i++)
{
Task.Factory.StartNew(SendInternal);
}
}
public void Send(string message)
{
blockingCollection.Add(message);
}
private void SendInternal()
{
foreach (string message in blockingCollection.GetConsumingEnumerable())
{
// send method
}
}
public void Dispose()
{
blockingCollection.CompleteAdding();
}
}
当然你需要添加错误捕获逻辑,你也可以通过使用取消令牌来改进应用程序关闭过程。
我强烈建议阅读 Joseph Albahari 写的伟大 e-book about multithreading programming。
我正在尝试执行一些来自 RabbitMQ 的命令。大约 5 msgs/sec。消息太多,我必须发送到一个线程来执行,但我没有那么多线程,所以我限制为 10 个。
所以我的想法是消息会传给工作人员,放入队列中,10 个线程中的任何一个都会达到峰值并执行。所有这些都使用信号量。
经过一些实验,我不知道为什么,但我的线程只执行了 3 或 4 个项目,之后它就停止了,没有错误...
我认为是事件调用方法执行时的逻辑问题,想不出更好的方法...
为什么只处理前 4 条消息?
执行此操作的模式或更好的方法?
以下是我的部分代码:
const int MaxThreads = 10;
private static Semaphore sem = new Semaphore(MaxThreads, MaxThreads);
private static Queue<BasicDeliverEventArgs> queue = new Queue<BasicDeliverEventArgs>();
static void Main(string[] args)
{
consumer.Received += (sender, ea) =>
{
var m = JsonConvert.DeserializeObject<Mail>(ea.Body.GetString());
Console.WriteLine($"Sub-> {m.Subject}");
queue.Enqueue(ea);
RUN();
};
channel.BasicConsume(queueName, false, consumer);
Console.Read();
}
private static void RUN()
{
while (queue.Count > 0)
{
sem.WaitOne();
var item = queue.Dequeue();
ThreadPool.QueueUserWorkItem(sendmail, item);
}
}
private static void sendmail(Object item)
{
//.....soem processing stuff....
//tell rabbitMq that everything was OK
channel.BasicAck(deliveryTag: x.DeliveryTag, multiple: true);
//release thread
sem.Release();
}
我认为您可以在此处使用阻塞集合。它将简化代码。 所以您的电子邮件发件人看起来像这样:
public class ParallelEmailSender : IDisposable
{
private readonly BlockingCollection<string> blockingCollection;
public ParallelEmailSender(int threadsCount)
{
blockingCollection = new BlockingCollection<string>(new ConcurrentQueue<string>());
for (int i = 0; i < threadsCount; i++)
{
Task.Factory.StartNew(SendInternal);
}
}
public void Send(string message)
{
blockingCollection.Add(message);
}
private void SendInternal()
{
foreach (string message in blockingCollection.GetConsumingEnumerable())
{
// send method
}
}
public void Dispose()
{
blockingCollection.CompleteAdding();
}
}
当然你需要添加错误捕获逻辑,你也可以通过使用取消令牌来改进应用程序关闭过程。
我强烈建议阅读 Joseph Albahari 写的伟大 e-book about multithreading programming。