两个对象集合之间有争议的并行工作

Contentious paralleled work between two collections of objects

我正在尝试异步和并行地模拟两个集合之间的工作,我有一个客户的 ConcurrentQueue 和一个工人集合。我需要工作人员从队列中取走一位客户,对客户执行工作,完成后立即接走另一位客户。

我决定使用基于事件的范例,其中工作人员集合将对客户执行操作;谁拥有一个事件处理程序,该事件处理程序在客户完成时触发;这有望再次触发 DoWork 方法,这样我就可以并行化工作人员从队列中接走客户。但是我不知道如何在 OnCustomerFinished() 中将客户传递给 DoWork!工作人员显然不应该依赖客户队列

public class Worker
{
    public async Task DoWork(ConcurrentQueue<Customer> cust)
    {
        await Task.Run(() =>
        {
            if (cust.TryDequeue(out Customer temp))
            {
                Task.Delay(5000);
                temp.IsDone = true;
            }
        });
    }

    public void OnCustomerFinished()
    {
        // This is where I'm stuck
        DoWork(~HOW TO PASS THE QUEUE OF CUSTOMER HERE?~);
    }
}

// 编辑 - 这是客户 Class

 public class Customer
{
    private bool _isDone = false;

    public EventHandler<EventArgs> CustomerFinished;

    public bool IsDone
    {
        private get { return _isDone; }
        set
        {
            _isDone = value;
            if (_isDone)
            {
                OnCustomerFinished();
            }

        }
    }
    protected virtual void OnCustomerFinished()
    {
        if (CustomerFinished != null)
        {
            CustomerFinished(this, EventArgs.Empty);
        }
    }
}

.NET 已经有 pub/sub 和数据流块形式的工作机制,最近还有通道。

数据流

来自 System.Threading.Tasks.Dataflow 命名空间的数据流块是 "old" 构建工作人员和工作人员管道的方式(2012 年及以后)。每个块都有一个输入 and/or 输出缓冲区。发布到块的每条消息都由后台的一个或多个任务处理。对于有输出的块,每次迭代的输出都存储在输出缓冲区中。

块可以组合成类似于 CMD 或 Powershell 管道的管道,每个块 运行 执行自己的任务。

在最简单的情况下,ActionBlock 可以用作工人:

void ProcessCustomer(Customer customer)
{
    ....
}

var block =new ActionBlock<Customer>(cust=>ProcessCustomer(cust));

就是这样。无需手动出列或轮询。

生产者方法可以开始将客户实例发送到区块。它们中的每一个都将按照它们发布的顺序在后台处理:

foreach(var customer in bigCustomerList)
{
    block.Post(customer);
}

完成后,例如当应用程序终止时,生产者只需要在块上调用 Complete() 并等待所有剩余条目完成。

block.Complete();
await block.Completion;

块也可以使用异步方法。

频道

通道是一种新机制,内置于 .NET Core 3 中,在以前的 .NET Framework 和 .NET Core 版本中作为 NuGet 提供。生产者使用 ChannelWriter 写入通道,消费者使用 ChannelReader 从通道读取数据。这可能看起来有点奇怪,直到您意识到它允许一些强大的模式。

生产者可能是这样的,例如 "produces" 列表中的所有客户延迟 0.5 秒的生产者:

ChannelReader<Customer> Producer(IEnumerable<Customer> customers,CancellationToken token=default)
{
    //Create a channel that can buffer an infinite number of entries
    var channel=Channel.CreateUnbounded();
    var writer=channel.Writer;
    //Start a background task to produce the data
    _ = Task.Run(async ()=>{
        foreach(var customer in customers)
        {
            //Exit gracefully in case of cancellation
            if (token.IsCancellationRequested)
            {
                return;
            }
            await writer.WriteAsync(customer,token);
            await Task.Delay(500);
        }
    },token)
         //Ensure we complete the writer no matter what
         .ContinueWith(t=>writer.Complete(t.Exception);

   return channel.Reader;
}

这有点复杂,但请注意,该函数唯一需要 return 的是 ChannelReader。取消令牌对于提前终止生产者很有用,例如在超时后或应用程序关闭时。

当作者完成时,所有频道的 reader 也将完成。

消费者只需要那个 ChannelReader 就可以工作:

async Task Consumer(ChannelReader<Customer> reader,CancellationToken token=default)
{
    while(await reader.WaitToReadAsync(token))
    {
       while(reader.TryRead(out var customer))
       {
           //Process the customer
       }
    }
}

如果编写器完成,WaitToReadAsync 将 return false 并且循环将退出。

在 .NET Core 3 中,ChannelReader 通过 ReadAllAsync 方法支持 IAsyncEnumerable,使代码更加简单:

async Task Consumer(ChannelReader<Customer> reader,CancellationToken token=default)
{
    await foreach(var customer in reader.ReadAllAsync(token))
    {
           //Process the customer
    }
}

生产者创建的reader可以直接传递给消费者:

var customers=new []{......}
var reader=Producer(customers);
await Consumer(reader);

中间步骤可以从上一个通道读取数据 reader 并将数据发布到下一个通道,例如订单生成器:

ChannelReader<Order> ConsumerOrders(ChannelReader<Customer> reader,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded();
    var writer=channel.Writer;
    //Start a background task to produce the data
    _ = Task.Run(async ()=>{
        await foreach(var customer in reader.ReadAllAsync(token))
        {
           //Somehow create an order for the customer
           var order=new Order(...);
           await writer.WriteAsync(order,token);
        }
    },token)
         //Ensure we complete the writer no matter what
         .ContinueWith(t=>writer.Complete(t.Exception);

   return channel.Reader;
}

同样,我们需要做的就是将 readers 从一个方法传递到下一个

var customers=new []{......}
var customerReader=Producer(customers);
var orderReader=CustomerOrders(customerReader);
await ConsumeOrders(orderReader);