Semaphoreslim.Wait(0)(防止多次执行)导致不执行的可能性

Possibility of Semaphoreslim.Wait(0) (to prevent multiple execution) causing non execution

我不确定的情况涉及 "threadsafe" PipeStream 的使用,其中多个线程可以添加要写入的消息。如果没有要写入的消息队列,则当前线程将开始写入读取方。如果有队列,并且队列在管道写入时增长,我希望开始写入的线程耗尽队列。

我 "hope" 这种设计(如下所示)阻碍了 SemaphoreSlim 的连续 entering/releasing 并减少了计划任务的数量。我说 "hope" 是因为我应该测试这个并发症是否有任何积极的性能影响。然而,在测试之前,我应该首先了解代码是否按照我的预期进行,因此请考虑以下 class,下面是一系列事件;

注意:我知道任务的执行与任何特定线程无关,但我发现这是最简单的解释方式。

class SemaphoreExample
{
    // Wrapper around a NamedPipeClientStream
    private readonly MessagePipeClient m_pipe =
        new MessagePipeClient("somePipe");

    private readonly SemaphoreSlim m_semaphore =
        new SemaphoreSlim(1, 1);

    private readonly BlockingCollection<Message> m_messages =
        new BlockingCollection<Message>(new ConcurrentQueue<Message>());

    public Task Send<T>(T content)
        where T : class
    {
        if (!this.m_messages.TryAdd(new Message<T>(content)))
            throw new InvalidOperationException("No more requests!");

        Task dequeue = TryDequeue();

        return Task.FromResult(true);
        // In reality this class (and method) is more complex.
        // There is a similiar pipe (and wrkr) in the other direction.
        // The "sent jobs" is kept in a dictionary and this method
        // returns a task belonging to a completionsource tied
        // to the "sent job". The wrkr responsible for the other
        // pipe reads a response and sets the corresponding
        // completionsource.
    }

    private async Task TryDequeue()
    {
        if (!this.m_semaphore.Wait(0))
            return; // someone else is already here

        try
        {
            Message message;
            while (this.m_messages.TryTake(out message))
            {
                await this.m_pipe.WriteAsync(message);
            }
        }
        finally { this.m_semaphore.Release(); }
    }
}
  1. Wrkr1 完成写入管道。 (在 TryDequeue 中)
  2. Wrkr1 确定队列为空。 (在 TryDequeue 中)
  3. Wrkr2 将项目添加到队列中。 (发送中)
  4. Wrkr2判断Wrkr1占用Semaphore,returns。 (发送中)
  5. Wrkr1 释放信号量。 (在 TryDequeue 中)
  6. 队列中有 1 个项目在 x 时间内不会被处理。

这一系列事件可能吗?我是否应该完全忘记这个想法,让每个对 "Send" 的调用都等待 "TryDeque" 和其中的信号量?也许每个方法调用调度另一个任务的潜在性能影响可以忽略不计,即使在 "high" 频率下也是如此。

更新:

根据 Alex 的建议,我正在执行以下操作; 让 "Send" 的调用者指定一个 "maxWorkload" 整数,该整数指定在将工作委托给另一个线程以处理任何额外工作之前,调用者准备执行多少项(对于其他调用者,在最坏的情况下)。但是,在创建新线程之前,"Send" 的其他调用者有机会进入信号量,从而可能阻止使用其他线程。

为了不让任何工作在队列中徘徊,任何成功进入信号量并完成工作的工作人员必须在退出信号量后检查是否有新的工作加入。如果这是真的,同一个工作人员将尝试重新进入(如果未达到 "maxWorkload")或如上所述委派工作。

示例如下:Send 现在将 "TryPool" 设置为 "TryDequeue" 的延续。 "TryPool" 只有在 "TryDequeue" returns 为真时才开始(即在输入信号量时做了一些工作)。

    // maxWorkload cannot be -1 for this method
    private async Task<bool> TryDequeue(int maxWorkload)
    {
        int currWorkload = 0;
        while (this.m_messages.Count != 0 && this.m_semaphore.Wait(0))
        {
            try
            {
                currWorkload = await Dequeue(currWorkload, maxWorkload);
                if (currWorkload >= maxWorkload)
                    return true;
            }
            finally
            {
                this.m_semaphore.Release();
            }
        }
        return false;
    }

    private Task TryPool()
    {
        if (this.m_messages.Count == 0 || !this.m_semaphore.Wait(0))
            return Task<bool>.FromResult(false);

        return Task.Run(async () =>
        {
            do
            {
                try
                {
                    await Dequeue(0, -1);
                }
                finally
                {
                    this.m_semaphore.Release();
                }
            }
            while (this.m_messages.Count != 0 && this.m_semaphore.Wait(0));
        });
    }

    private async Task<int> Dequeue(int currWorkload, int maxWorkload)
    {
        while (currWorkload < maxWorkload || maxWorkload == -1)
        {
            Message message;
            if (!this.m_messages.TryTake(out message))
                return currWorkload;

            await this.m_pipe.WriteAsync(message);

            currWorkload++;
        }
        return maxWorkload;
    }

我倾向于称这种模式为 "GatedBatchWriter",即第一个通过门的线程处理一批任务;它自己和其他一些人代表其他作家,直到它完成了足够的工作。

此模式主要用于批处理工作效率更高时,因为与该工作相关的开销。例如。一次将较大的块写入磁盘,而不是多个小块。

是的,这个特定的模式有一个特定的竞争条件需要注意:"responsible writer",即通过门的那个,确定队列中没有更多消息并在释放之前停止信号量(即它的写责任)。第二位作家到了,在这两个决策点之间未能获得写作责任。现在队列中有一条消息将不会被传递(或延迟传递,当下一个写入者到达时)。

此外,就日程安排而言,您现在所做的是不公平的。如果有很多消息,队列可能永远不会为空,通过门的作者将永远忙于代表其他人写消息。您需要限制负责作者的批量大小。

您可能想要更改的其他一些内容是:

  1. 让您的 Message 包含任务完成令牌。
  2. 让无法获得写责任的作者将他们的消息排入队列并等待两个任务完成中的任何一个:与他们的消息相关的任务完成,写责任的释放。
  3. 让负责的作者为其处理的消息设置完成。
  4. 让负责的编写者在完成足够的工作后解除其编写责任。
  5. 当等待的作者被两个任务完成之一唤醒时:
    • 如果是由于其消息上的完成标记,它可以顺利进行。
    • 否则,尝试获取写入权限,冲洗,重复...

另外注意:如果有很多消息,即平均消息负载高,专用线程/长运行任务处理队列通常会有更好的性能。