两个对象集合之间有争议的并行工作
Contentious paralleled work between two collections of objects
我正在尝试异步和并行地模拟两个集合之间的工作,我有一个客户的 ConcurrentQueue 和一个工人集合。我需要工作人员从队列中取走一位客户,对客户执行工作,完成后立即接走另一位客户。
我决定使用基于事件的范例,其中工作人员集合将对客户执行操作;谁拥有一个事件处理程序,该事件处理程序在客户完成时触发;这有望再次触发 DoWork 方法,这样我就可以并行化工作人员从队列中接走客户。但是我不知道如何在 OnCustomerFinished() 中将客户传递给 DoWork!工作人员显然不应该依赖客户队列
public class Worker
{
public async Task DoWork(ConcurrentQueue<Customer> cust)
{
await Task.Run(() =>
{
if (cust.TryDequeue(out Customer temp))
{
Task.Delay(5000);
temp.IsDone = true;
}
});
}
public void OnCustomerFinished()
{
// This is where I'm stuck
DoWork(~HOW TO PASS THE QUEUE OF CUSTOMER HERE?~);
}
}
// 编辑 - 这是客户 Class
public class Customer
{
private bool _isDone = false;
public EventHandler<EventArgs> CustomerFinished;
public bool IsDone
{
private get { return _isDone; }
set
{
_isDone = value;
if (_isDone)
{
OnCustomerFinished();
}
}
}
protected virtual void OnCustomerFinished()
{
if (CustomerFinished != null)
{
CustomerFinished(this, EventArgs.Empty);
}
}
}
.NET 已经有 pub/sub 和数据流块形式的工作机制,最近还有通道。
数据流
来自 System.Threading.Tasks.Dataflow 命名空间的数据流块是 "old" 构建工作人员和工作人员管道的方式(2012 年及以后)。每个块都有一个输入 and/or 输出缓冲区。发布到块的每条消息都由后台的一个或多个任务处理。对于有输出的块,每次迭代的输出都存储在输出缓冲区中。
块可以组合成类似于 CMD 或 Powershell 管道的管道,每个块 运行 执行自己的任务。
在最简单的情况下,ActionBlock 可以用作工人:
void ProcessCustomer(Customer customer)
{
....
}
var block =new ActionBlock<Customer>(cust=>ProcessCustomer(cust));
就是这样。无需手动出列或轮询。
生产者方法可以开始将客户实例发送到区块。它们中的每一个都将按照它们发布的顺序在后台处理:
foreach(var customer in bigCustomerList)
{
block.Post(customer);
}
完成后,例如当应用程序终止时,生产者只需要在块上调用 Complete()
并等待所有剩余条目完成。
block.Complete();
await block.Completion;
块也可以使用异步方法。
频道
通道是一种新机制,内置于 .NET Core 3 中,在以前的 .NET Framework 和 .NET Core 版本中作为 NuGet 提供。生产者使用 ChannelWriter 写入通道,消费者使用 ChannelReader 从通道读取数据。这可能看起来有点奇怪,直到您意识到它允许一些强大的模式。
生产者可能是这样的,例如 "produces" 列表中的所有客户延迟 0.5 秒的生产者:
ChannelReader<Customer> Producer(IEnumerable<Customer> customers,CancellationToken token=default)
{
//Create a channel that can buffer an infinite number of entries
var channel=Channel.CreateUnbounded();
var writer=channel.Writer;
//Start a background task to produce the data
_ = Task.Run(async ()=>{
foreach(var customer in customers)
{
//Exit gracefully in case of cancellation
if (token.IsCancellationRequested)
{
return;
}
await writer.WriteAsync(customer,token);
await Task.Delay(500);
}
},token)
//Ensure we complete the writer no matter what
.ContinueWith(t=>writer.Complete(t.Exception);
return channel.Reader;
}
这有点复杂,但请注意,该函数唯一需要 return 的是 ChannelReader。取消令牌对于提前终止生产者很有用,例如在超时后或应用程序关闭时。
当作者完成时,所有频道的 reader 也将完成。
消费者只需要那个 ChannelReader 就可以工作:
async Task Consumer(ChannelReader<Customer> reader,CancellationToken token=default)
{
while(await reader.WaitToReadAsync(token))
{
while(reader.TryRead(out var customer))
{
//Process the customer
}
}
}
如果编写器完成,WaitToReadAsync
将 return false
并且循环将退出。
在 .NET Core 3 中,ChannelReader 通过 ReadAllAsync 方法支持 IAsyncEnumerable,使代码更加简单:
async Task Consumer(ChannelReader<Customer> reader,CancellationToken token=default)
{
await foreach(var customer in reader.ReadAllAsync(token))
{
//Process the customer
}
}
生产者创建的reader可以直接传递给消费者:
var customers=new []{......}
var reader=Producer(customers);
await Consumer(reader);
中间步骤可以从上一个通道读取数据 reader 并将数据发布到下一个通道,例如订单生成器:
ChannelReader<Order> ConsumerOrders(ChannelReader<Customer> reader,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded();
var writer=channel.Writer;
//Start a background task to produce the data
_ = Task.Run(async ()=>{
await foreach(var customer in reader.ReadAllAsync(token))
{
//Somehow create an order for the customer
var order=new Order(...);
await writer.WriteAsync(order,token);
}
},token)
//Ensure we complete the writer no matter what
.ContinueWith(t=>writer.Complete(t.Exception);
return channel.Reader;
}
同样,我们需要做的就是将 readers 从一个方法传递到下一个
var customers=new []{......}
var customerReader=Producer(customers);
var orderReader=CustomerOrders(customerReader);
await ConsumeOrders(orderReader);
我正在尝试异步和并行地模拟两个集合之间的工作,我有一个客户的 ConcurrentQueue 和一个工人集合。我需要工作人员从队列中取走一位客户,对客户执行工作,完成后立即接走另一位客户。
我决定使用基于事件的范例,其中工作人员集合将对客户执行操作;谁拥有一个事件处理程序,该事件处理程序在客户完成时触发;这有望再次触发 DoWork 方法,这样我就可以并行化工作人员从队列中接走客户。但是我不知道如何在 OnCustomerFinished() 中将客户传递给 DoWork!工作人员显然不应该依赖客户队列
public class Worker
{
public async Task DoWork(ConcurrentQueue<Customer> cust)
{
await Task.Run(() =>
{
if (cust.TryDequeue(out Customer temp))
{
Task.Delay(5000);
temp.IsDone = true;
}
});
}
public void OnCustomerFinished()
{
// This is where I'm stuck
DoWork(~HOW TO PASS THE QUEUE OF CUSTOMER HERE?~);
}
}
// 编辑 - 这是客户 Class
public class Customer
{
private bool _isDone = false;
public EventHandler<EventArgs> CustomerFinished;
public bool IsDone
{
private get { return _isDone; }
set
{
_isDone = value;
if (_isDone)
{
OnCustomerFinished();
}
}
}
protected virtual void OnCustomerFinished()
{
if (CustomerFinished != null)
{
CustomerFinished(this, EventArgs.Empty);
}
}
}
.NET 已经有 pub/sub 和数据流块形式的工作机制,最近还有通道。
数据流
来自 System.Threading.Tasks.Dataflow 命名空间的数据流块是 "old" 构建工作人员和工作人员管道的方式(2012 年及以后)。每个块都有一个输入 and/or 输出缓冲区。发布到块的每条消息都由后台的一个或多个任务处理。对于有输出的块,每次迭代的输出都存储在输出缓冲区中。
块可以组合成类似于 CMD 或 Powershell 管道的管道,每个块 运行 执行自己的任务。
在最简单的情况下,ActionBlock 可以用作工人:
void ProcessCustomer(Customer customer)
{
....
}
var block =new ActionBlock<Customer>(cust=>ProcessCustomer(cust));
就是这样。无需手动出列或轮询。
生产者方法可以开始将客户实例发送到区块。它们中的每一个都将按照它们发布的顺序在后台处理:
foreach(var customer in bigCustomerList)
{
block.Post(customer);
}
完成后,例如当应用程序终止时,生产者只需要在块上调用 Complete()
并等待所有剩余条目完成。
block.Complete();
await block.Completion;
块也可以使用异步方法。
频道
通道是一种新机制,内置于 .NET Core 3 中,在以前的 .NET Framework 和 .NET Core 版本中作为 NuGet 提供。生产者使用 ChannelWriter 写入通道,消费者使用 ChannelReader 从通道读取数据。这可能看起来有点奇怪,直到您意识到它允许一些强大的模式。
生产者可能是这样的,例如 "produces" 列表中的所有客户延迟 0.5 秒的生产者:
ChannelReader<Customer> Producer(IEnumerable<Customer> customers,CancellationToken token=default)
{
//Create a channel that can buffer an infinite number of entries
var channel=Channel.CreateUnbounded();
var writer=channel.Writer;
//Start a background task to produce the data
_ = Task.Run(async ()=>{
foreach(var customer in customers)
{
//Exit gracefully in case of cancellation
if (token.IsCancellationRequested)
{
return;
}
await writer.WriteAsync(customer,token);
await Task.Delay(500);
}
},token)
//Ensure we complete the writer no matter what
.ContinueWith(t=>writer.Complete(t.Exception);
return channel.Reader;
}
这有点复杂,但请注意,该函数唯一需要 return 的是 ChannelReader。取消令牌对于提前终止生产者很有用,例如在超时后或应用程序关闭时。
当作者完成时,所有频道的 reader 也将完成。
消费者只需要那个 ChannelReader 就可以工作:
async Task Consumer(ChannelReader<Customer> reader,CancellationToken token=default)
{
while(await reader.WaitToReadAsync(token))
{
while(reader.TryRead(out var customer))
{
//Process the customer
}
}
}
如果编写器完成,WaitToReadAsync
将 return false
并且循环将退出。
在 .NET Core 3 中,ChannelReader 通过 ReadAllAsync 方法支持 IAsyncEnumerable,使代码更加简单:
async Task Consumer(ChannelReader<Customer> reader,CancellationToken token=default)
{
await foreach(var customer in reader.ReadAllAsync(token))
{
//Process the customer
}
}
生产者创建的reader可以直接传递给消费者:
var customers=new []{......}
var reader=Producer(customers);
await Consumer(reader);
中间步骤可以从上一个通道读取数据 reader 并将数据发布到下一个通道,例如订单生成器:
ChannelReader<Order> ConsumerOrders(ChannelReader<Customer> reader,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded();
var writer=channel.Writer;
//Start a background task to produce the data
_ = Task.Run(async ()=>{
await foreach(var customer in reader.ReadAllAsync(token))
{
//Somehow create an order for the customer
var order=new Order(...);
await writer.WriteAsync(order,token);
}
},token)
//Ensure we complete the writer no matter what
.ContinueWith(t=>writer.Complete(t.Exception);
return channel.Reader;
}
同样,我们需要做的就是将 readers 从一个方法传递到下一个
var customers=new []{......}
var customerReader=Producer(customers);
var orderReader=CustomerOrders(customerReader);
await ConsumeOrders(orderReader);