多线程&信号量&事件

Multi Thread & Semaphore & Events

我正在尝试执行一些来自 RabbitMQ 的命令。大约 5 msgs/sec。消息太多,我必须发送到一个线程来执行,但我没有那么多线程,所以我限制为 10 个。

所以我的想法是消息会传给工作人员,放入队列中,10 个线程中的任何一个都会达到峰值并执行。所有这些都使用信号量。

经过一些实验,我不知道为什么,但我的线程只执行了 3 或 4 个项目,之后它就停止了,没有错误...

我认为是事件调用方法执行时的逻辑问题,想不出更好的方法...

为什么只处理前 4 条消息?

执行此操作的模式或更好的方法?

以下是我的部分代码:

const int MaxThreads = 10;
private static Semaphore sem = new Semaphore(MaxThreads, MaxThreads);
private static Queue<BasicDeliverEventArgs> queue = new Queue<BasicDeliverEventArgs>();

static void Main(string[] args)
{
consumer.Received += (sender, ea) =>
               {
                var m = JsonConvert.DeserializeObject<Mail>(ea.Body.GetString());
                Console.WriteLine($"Sub-> {m.Subject}");
                queue.Enqueue(ea);
                RUN();
              };

            channel.BasicConsume(queueName, false, consumer);

            Console.Read();
}

private static void RUN()
{
            while (queue.Count > 0)
            {
                sem.WaitOne();
                var item = queue.Dequeue();
                ThreadPool.QueueUserWorkItem(sendmail, item);
            }
}

private static void sendmail(Object item)
{

//.....soem processing stuff....

//tell rabbitMq that everything was OK
channel.BasicAck(deliveryTag: x.DeliveryTag, multiple: true);

//release thread
sem.Release();

}

我认为您可以在此处使用阻塞集合。它将简化代码。 所以您的电子邮件发件人看起来像这样:

public class ParallelEmailSender : IDisposable
{
    private readonly BlockingCollection<string> blockingCollection;

    public ParallelEmailSender(int threadsCount)
    {
        blockingCollection = new BlockingCollection<string>(new ConcurrentQueue<string>());
        for (int i = 0; i < threadsCount; i++)
        {
            Task.Factory.StartNew(SendInternal);
        }
    }

    public void Send(string message)
    {
        blockingCollection.Add(message);
    }

    private void SendInternal()
    {
        foreach (string message in blockingCollection.GetConsumingEnumerable())
        {
            // send method
        }
    }

    public void Dispose()
    {
        blockingCollection.CompleteAdding();
    }
}

当然你需要添加错误捕获逻辑,你也可以通过使用取消令牌来改进应用程序关闭过程。

我强烈建议阅读 Joseph Albahari 写的伟大 e-book about multithreading programming