多生产者一消费者互斥

Mutual Exclusion with Multiple Producers and One Consumer

我有一个有趣的问题需要在一些生产代码上解决。我们目前正在开发一种 Web 服务,该服务将从许多不同的应用程序中调用,并且主要用于发送电子邮件。每当发送一封新电子邮件时,我们最终都需要将该电子邮件的收据添加到数据库中,但理想情况下我们不想立即执行此操作,因此我们将随着时间的推移建立一个缓冲区。一旦缓冲区达到一定长度,或者经过足够长的时间后,缓冲区的内容将被刷新到数据库中。

这样想,当一个线程发送一封电子邮件时,它会锁定缓冲区,以便在不受干扰的情况下添加它的日志并维护线程安全。如果它发现缓冲区有一定大小(在这个例子中我们会说 1000),那么线程有责任将它全部写入数据库(我认为这是低效的,但我使用 Service Stack 作为我们的 web框架,所以如果有一种方法可以委派此任务,我宁愿采用这种方法。

现在,由于写入数据库可能很耗时,我们想添加一个辅助缓冲区以供使用。因此,一旦一个缓冲区已满,所有新请求都会在刷新第一个缓冲区时将其工作记录到第二个缓冲区中。同样,一旦第二个缓冲区已满,所有线程将移回第一个缓冲区,第二个缓冲区将被刷新。

我们需要解决的首要问题:

我更关心第二个要点。重新唤醒所有阻塞线程的最佳方法是什么,而不是让它们进入第一个缓冲区的临界区,而是让它们尝试为空线程获取锁?

编辑

根据下面的评论,我想出了一些我认为可行的方法。我不知道存在线程安全的数据结构。

    private readonly ConcurrentQueue<EmailResponse> _logBuffer = new ConcurrentQueue<EmailResponse>();
    private readonly object _lockobject = new object();
    private const int BufferThreshold = 1000;

    public void AddToBuffer(EmailResponse email)
    {
        _logBuffer.Enqueue(email);

        Monitor.Enter(_lockobject);
        if (_logBuffer.Count >= BufferThreshold)
            Task.Run(async () =>
            {
                EmailResponse response;
                for (var i = 0; i < BufferThreshold; i++)
                    if (_logBuffer.TryDequeue(out response))
                        await AddMail(response);
                Monitor.Exit(_lockobject);
            });
        else Monitor.Exit(_lockobject);
    }

我不确定您是否需要第二个缓冲区; ConcurrentQueue 我认为这是解决您问题的好方法。每个线程都可以在没有冲突的情况下入队,并且如果任何线程注意到队列的计数高于魔法阈值,您可以安全地出队最多那么多对象,即使其他线程入队更多。

我制作的一个(非常快速和肮脏的)工作示例:

static class Buffer
{
    private const int c_MagicThreshold = 10;
    private static ConcurrentQueue<string> s_Messages = new ConcurrentQueue<string>();
    private static object s_LockObj = new object();

    public static void Enqueue(string message)
    {
        s_Messages.Enqueue(message);
        // try to flush every time; spawn on a non-blocking thread and immediately return
        new Task(Flush).Start();
    }

    public static void Flush()
    {
        // do we flush at all?
        if (s_Messages.Count >= c_MagicThreshold)
        {
            lock (s_LockObj)
            {
                // make sure another thread didn't flush while we were waiting
                if (s_Messages.Count >= c_MagicThreshold)
                {
                    List<string> messages = new List<string>();
                    Console.WriteLine("Flushing " + c_MagicThreshold + " messages...");
                    for (int i = 0; i < c_MagicThreshold; i++)
                    {
                        string message;
                        if (!s_Messages.TryDequeue(out message))
                        {
                            throw new InvalidOperationException("How the hell did you manage that?");
                            // or just break from the loop if you don't care much, you spaz
                        }
                        messages.Add(message);
                    }
                    Console.WriteLine("[ " + String.Join(", ", messages) + " ]");

                    // number of new messages enqueued between threshold pass and now
                    Console.WriteLine(s_Messages.Count + " messages remaining in queue");
                }
            }
        }
    }
}

测试通话:

Parallel.For(0, 30, (i) =>
{
    Thread.Sleep(100);  // do other things
    Buffer.Enqueue(i.ToString());
});

测试的控制台输出 运行:

Flushing 10 messages...

[ 28, 21, 14, 0, 7, 29, 8, 15, 1, 22 ]

5 messages remaining in queue

Flushing 10 messages...

[ 16, 3, 9, 2, 23, 17, 10, 4, 24, 5 ]

1 messages remaining in queue

Flushing 10 messages...

[ 11, 18, 25, 19, 26, 12, 6, 20, 13, 27 ]

0 messages remaining in queue

你能给每个线程一个对象来保存两个缓冲区并让线程记录到这个对象吗?当每个线程要求它记录一些东西时,这个对象将决定写入哪个缓冲区。该对象还可能负责将完整缓冲区清空到数据库,而不是阻止线程写入。