使用 IEnumerable 源进行分区

Partitioning with IEnumerable source

我有 IProducerConsumerCollection 类型的 ConcurrentQueue,即

IProducerConsumerCollection<Job> _queue = new ConcurrentQueue<Job>();

以及将作业添加到 _queue 的生产者方法和处理来自 _queue 的作业的消费者方法。现在在消费者方法中,我喜欢并发处理作业。下面是带有生产者和消费者方法的示例 class 的代码:

public class TestQueue
{
    IProducerConsumerCollection<Job> _queue = new ConcurrentQueue<Job>();
    private static HttpClient _client = new HttpClient();

    public TestQueue()
    {
        WorkProducerThread();
        WorkConsumerThread();
    }

    public void WorkConsumerThread()
    {
        if (_queue.Count > 0)
        {
            //At this point, 4 partitions are created but all records are in 1st partition only; 2,3,4 partition are empty
            var partitioner = Partitioner.Create(_queue).GetPartitions(4);

            Task t = Task.WhenAll(
             from partition in partitioner
             select Task.Run(async () =>
             {
                 using (partition)
                 {
                     while (partition.MoveNext())
                         await CreateJobs(partition.Current);
                 }
             }));

            t.Wait();

            //At this point, queue count is still 20, how to remove item from _queue collection when processed?
        }
    }

    private async Task CreateJobs(Job job)
    {
        HttpContent bodyContent = null;
        await _client.PostAsync("job", bodyContent);
    }



    public void WorkProducerThread()
    {
        if (_queue.Count == 0)
        {
            try
            {
                for (int i = 0; i < 20; i++)
                {
                    Job job = new Job { Id = i, JobName = "j" + i.ToString(), JobCreated = DateTime.Now };
                    _queue.TryAdd(job);
                }
            }
            catch (Exception ex)
            {
                //_Log.Error("Exception while adding jobs to collection", ex);
            }
        }
    }

}

public class Job
{
    public int Id { get; set; }
    public string JobName { get; set; }
    public DateTime JobCreated { get; set; }
}

有2个问题,

  1. Partitioner.Create(_queue).GetPartitions(4); Partitioner.GetPartions 创建 4 个分区,但所有记录仅在第一个分区中; 2、3、4分区是空的。我找不到,为什么会这样?理想情况下,所有 4 个分区都应该有 5 个记录(因为队列中总共有 20 个记录)。我阅读了 this article from MSDN on partitioning but didn't get any clue. Also I checked the partitioning example from this 篇文章。

  2. 此外,我想在消费者方法处理后从 _queue 中删除项目,只有一种方法 _queue.TryTake 方法可以删除项目。我不知道如何将项目与分区一起删除?

我可以考虑任何替代方法来实现相同的结果。

提前致谢。

Partitioner.Create(_queue).GetPartitions(4); Partitioner.GetPartions creates 4 partitions but all records are in 1st partition only; 2,3,4 partition are empty.

这不正确,您的队列条目已正确分区。要验证,请稍微更改您的处理逻辑以记录正在执行工作的分区:

Task t = Task.WhenAll(
    from partition in partitioner.Select((jobs, i) => new { jobs, i })
    select Task.Run(async () =>
    {
        using (partition.jobs)
        {
            while (partition.jobs.MoveNext())
            {
                Console.WriteLine(partition.i);
                await CreateJobs(partition.jobs.Current);
            }
        }
    }));

您会注意到 Console.WriteLine 会将值从 0 写入 3 - 表明它们已正确分区。

Also, I want to remove the item from _queue after processing in consumer method and there is only one way _queue.TryTake method to remove item. I don't know how to remove item along with partitioning?

您可以通过稍微重写来实现。主要变化是切换到 BlockingCollection 并通过添加 this NuGet package 来访问 GetConsumingPartitioner.

试一试:

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;

namespace Test
{
    public class TestQueue
    {
        BlockingCollection<Job> _queue = new BlockingCollection<Job>();
        private static HttpClient _client = new HttpClient();

        public TestQueue()
        {
            WorkProducerThread();
            WorkConsumerThread();
        }

        public void WorkConsumerThread()
        {
            if (!_queue.IsCompleted)
            {
                //At this point, 4 partitions are created but all records are in 1st partition only; 2,3,4 partition are empty
                var partitioner = _queue.GetConsumingPartitioner().GetPartitions(4);

                Task t = Task.WhenAll(
                 from partition in partitioner
                 select Task.Run(async () =>
                 {
                     using (partition)
                     {
                         while (partition.MoveNext())
                             await CreateJobs(partition.Current);
                     }
                 }));


                t.Wait();

                Console.WriteLine(_queue.Count);
            }
        }

        private async Task CreateJobs(Job job)
        {
            //HttpContent bodyContent = null;
            //await _client.PostAsync("job", bodyContent);
            await Task.Delay(100);
        }



        public void WorkProducerThread()
        {
            if (_queue.Count == 0)
            {
                try
                {
                    for (int i = 0; i < 20; i++)
                    {
                        Job job = new Job { Id = i, JobName = "j" + i.ToString(), JobCreated = DateTime.Now };
                        _queue.TryAdd(job);
                    }

                    _queue.CompleteAdding();
                }
                catch (Exception ex)
                {
                    //_Log.Error("Exception while adding jobs to collection", ex);
                }
            }
        }

    }

    public class Job
    {
        public int Id { get; set; }
        public string JobName { get; set; }
        public DateTime JobCreated { get; set; }
    }
    class Program
    {
        static void Main(string[] args)
        {
            var g = new TestQueue();

            Console.ReadLine();
        }
    }
}