为什么我的工人工作分配计数不等于此 System.Threading.Channel 样本中的生产项目总数?

Why my workers work distribution count does not total the number of produced items in this System.Threading.Channel sample?

正在关注 this post, I have been playing with System.Threading.Channel to get confident enough and use it in my production code, replacing the Threads/Monitor.Pulse/Wait based approach I am currently using (described in the referred post)。

基本上,我创建了一个带有边界通道的示例,其中我 运行 开始时有几个生产者任务,无需等待,开始我的消费者任务,这些任务开始从通道推送元素。 等待生产者任务完成后,我会向通道发出完成信号,这样消费者任务就可以停止收听新的通道元素。

我的频道是 Channel<Action>,在每个操作中,我都会增加 WorkDistribution 并发字典中每个给定工作人员的计数,并在示例结束时打印它以便我可以检查我消费了多少商品,以及渠道如何在消费者之间分配操作。

出于某种原因,此“工作分配页脚”打印的项目数与生产者任务生成的项目总数不同。

我错过了什么? 添加存在的一些变量的唯一目的是帮助解决问题。

完整代码如下:

public class ChannelSolution
{
    object LockObject = new object();
    Channel<Action<string>> channel;
    int ItemsToProduce;
    int WorkersCount;
    int TotalItemsProduced;
    ConcurrentDictionary<string, int> WorkDistribution;
    CancellationToken Ct;
    public ChannelSolution(int workersCount, int itemsToProduce, int maxAllowedItems,
        CancellationToken ct)
    {
        WorkersCount = workersCount;
        ItemsToProduce = itemsToProduce;
        channel = Channel.CreateBounded<Action<string>>(maxAllowedItems);
        Console.WriteLine($"Created channel with max {maxAllowedItems} items");
        WorkDistribution = new ConcurrentDictionary<string, int>();
        Ct = ct;
    }

    async Task ProduceItems(int cycle)
    {
        for (var i = 0; i < ItemsToProduce; i++)
        {
            var index = i + 1 + (ItemsToProduce * cycle);
            bool queueHasRoom;
            var stopwatch = new Stopwatch();
            stopwatch.Start();
            do
            {
                if (Ct.IsCancellationRequested)
                {
                    Console.WriteLine("exiting read loop - cancellation requested !");
                    break;
                }
                queueHasRoom = await channel.Writer.WaitToWriteAsync();
                if (!queueHasRoom)
                {
                    if (Ct.IsCancellationRequested)
                    {
                        Console.WriteLine("exiting read loop - cancellation"
                            + " requested !");
                        break;
                    }

                    if (stopwatch.Elapsed.Seconds % 3 == 0)
                        Console.WriteLine("Channel reached maximum capacity..."
                            + " producer waiting for items to be freed...");
                }
            }
            while (!queueHasRoom);
            channel.Writer.TryWrite((workerName) => action($"A{index}", workerName));
            Console.WriteLine($"Channel has room, item {index} added"
                + $" - channel items count: [{channel.Reader.Count}]");
            Interlocked.Increment(ref TotalItemsProduced);
        }
    }

    List<Task> GetConsumers()
    {
        var tasks = new List<Task>();
        for (var i = 0; i < WorkersCount; i++)
        {
            var workerName = $"W{(i + 1).ToString("00")}";
            tasks.Add(Task.Run(async () =>
            {
                while (await channel.Reader.WaitToReadAsync())
                {
                    if (Ct.IsCancellationRequested)
                    {
                        Console.WriteLine("exiting write loop - cancellation"
                            + "requested !");
                        break;
                    }

                    if (channel.Reader.TryRead(out var action))
                    {
                        Console.WriteLine($"dequed action in worker [{workerName}]");
                        action(workerName);
                    }
                }
            }));
        }

        return tasks;
    }

    void action(string actionNumber, string workerName)
    {
        Console.WriteLine($"processing {actionNumber} in worker {workerName}...");
        var secondsToWait = new Random().Next(2, 5);
        Thread.Sleep(TimeSpan.FromSeconds(secondsToWait));
        Console.WriteLine($"action {actionNumber} completed by worker {workerName}"
            + $" after {secondsToWait} secs! channel items left:"
            + $" [{channel.Reader.Count}]");

        if (WorkDistribution.ContainsKey(workerName))
        {
            lock (LockObject)
            {
                WorkDistribution[workerName]++;
            }
        }
        else
        {
            var succeeded = WorkDistribution.TryAdd(workerName, 1);
            if (!succeeded)
            {
                Console.WriteLine($"!!! failed incremeting dic value !!!");
            }

        }
    }

    public void Summarize(Stopwatch stopwatch)
    {
        Console.WriteLine("--------------------------- Thread Work Distribution "
            + "------------------------");
        foreach (var kv in this.WorkDistribution)
            Console.WriteLine($"thread: {kv.Key} items consumed: {kv.Value}");

        Console.WriteLine($"Total actions consumed: "
            + $"{WorkDistribution.Sum(w => w.Value)} - Elapsed time: "
            + $"{stopwatch.Elapsed.Seconds} secs");

    }

    public void Run(int producerCycles)
    {
        var stopwatch = new Stopwatch();
        stopwatch.Start();
        var producerTasks = new List<Task>();

        Console.WriteLine($"Started running at {DateTime.Now}...");
        for (var i = 0; i < producerCycles; i++)
        {
            producerTasks.Add(ProduceItems(i));
        }
        var consumerTasks = GetConsumers();
        Task.WaitAll(producerTasks.ToArray());
        Console.WriteLine($"-------------- Completed waiting for PRODUCERS -"
            + " total items produced: [{TotalItemsProduced}] ------------------");
        channel.Writer.Complete(); //just so I can complete this demo

        Task.WaitAll(consumerTasks.ToArray());
        Console.WriteLine("----------------- Completed waiting for CONSUMERS "
            + "------------------");
        //Task.WaitAll(GetConsumers().Union(producerTasks/*.Union(
        //    new List<Task> { taskKey })*/).ToArray());
        //Console.WriteLine("Completed waiting for tasks");

        Summarize(stopwatch);
    }
}

这里是Program.cs

中的调用代码
var workersCount = 5;
var itemsToProduce = 10;
var maxItemsInQueue = 5;
var cts = new CancellationTokenSource();
var producerConsumerTests = new ProducerConsumerTests(workersCount, itemsToProduce,
    maxItemsInQueue, cts.Token);
producerConsumerTests.Run(2);

快速查看 ProduceItems 方法中的 queueHasRoom 变量周围存在竞争条件。你不需要这个变量。 channel.Writer.TryWrite method will tell you whether there is room in the channel's buffer or not. Alternatively you could simply await the WriteAsync 方法,而不是使用 WaitToWriteAsync/TryWrite 组合。 AFAIK 这个组合旨在作为前一种方法的性能优化。如果您在尝试 post 值之前绝对需要知道是否有可用的 space,那么 Channel<T> 可能不适合您的用例。你需要找到在“check-for-available-space -> create-the-value -> post-the-value”的整个操作过程中可以锁定的东西,这样这个操作可以原子化。

附带说明一下,使用锁来保护 ConcurrentDictionary 的更新是多余的。 ConcurrentDictionary 提供了 AddOrUpdate 方法,可以原子地用另一个值替换它包含的值。如果字典包含可变对象,您可能必须锁定,并且您需要使用线程安全来改变这些对象。但在您的情况下,值的类型为 Int32,这是一个不可变的结构。你不改变它,你只是用一个新的 Int32 替换它,它是根据现有值创建的:

WorkDistribution.AddOrUpdate(workerName, 1, (_, existing) => existing + 1);