为什么我的工人工作分配计数不等于此 System.Threading.Channel 样本中的生产项目总数?
Why my workers work distribution count does not total the number of produced items in this System.Threading.Channel sample?
正在关注 this post, I have been playing with System.Threading.Channel
to get confident enough and use it in my production code, replacing the Threads/Monitor.Pulse/Wait
based approach I am currently using (described in the referred post)。
基本上,我创建了一个带有边界通道的示例,其中我 运行 开始时有几个生产者任务,无需等待,开始我的消费者任务,这些任务开始从通道推送元素。
等待生产者任务完成后,我会向通道发出完成信号,这样消费者任务就可以停止收听新的通道元素。
我的频道是 Channel<Action>
,在每个操作中,我都会增加 WorkDistribution
并发字典中每个给定工作人员的计数,并在示例结束时打印它以便我可以检查我消费了多少商品,以及渠道如何在消费者之间分配操作。
出于某种原因,此“工作分配页脚”打印的项目数与生产者任务生成的项目总数不同。
我错过了什么?
添加存在的一些变量的唯一目的是帮助解决问题。
完整代码如下:
public class ChannelSolution
{
object LockObject = new object();
Channel<Action<string>> channel;
int ItemsToProduce;
int WorkersCount;
int TotalItemsProduced;
ConcurrentDictionary<string, int> WorkDistribution;
CancellationToken Ct;
public ChannelSolution(int workersCount, int itemsToProduce, int maxAllowedItems,
CancellationToken ct)
{
WorkersCount = workersCount;
ItemsToProduce = itemsToProduce;
channel = Channel.CreateBounded<Action<string>>(maxAllowedItems);
Console.WriteLine($"Created channel with max {maxAllowedItems} items");
WorkDistribution = new ConcurrentDictionary<string, int>();
Ct = ct;
}
async Task ProduceItems(int cycle)
{
for (var i = 0; i < ItemsToProduce; i++)
{
var index = i + 1 + (ItemsToProduce * cycle);
bool queueHasRoom;
var stopwatch = new Stopwatch();
stopwatch.Start();
do
{
if (Ct.IsCancellationRequested)
{
Console.WriteLine("exiting read loop - cancellation requested !");
break;
}
queueHasRoom = await channel.Writer.WaitToWriteAsync();
if (!queueHasRoom)
{
if (Ct.IsCancellationRequested)
{
Console.WriteLine("exiting read loop - cancellation"
+ " requested !");
break;
}
if (stopwatch.Elapsed.Seconds % 3 == 0)
Console.WriteLine("Channel reached maximum capacity..."
+ " producer waiting for items to be freed...");
}
}
while (!queueHasRoom);
channel.Writer.TryWrite((workerName) => action($"A{index}", workerName));
Console.WriteLine($"Channel has room, item {index} added"
+ $" - channel items count: [{channel.Reader.Count}]");
Interlocked.Increment(ref TotalItemsProduced);
}
}
List<Task> GetConsumers()
{
var tasks = new List<Task>();
for (var i = 0; i < WorkersCount; i++)
{
var workerName = $"W{(i + 1).ToString("00")}";
tasks.Add(Task.Run(async () =>
{
while (await channel.Reader.WaitToReadAsync())
{
if (Ct.IsCancellationRequested)
{
Console.WriteLine("exiting write loop - cancellation"
+ "requested !");
break;
}
if (channel.Reader.TryRead(out var action))
{
Console.WriteLine($"dequed action in worker [{workerName}]");
action(workerName);
}
}
}));
}
return tasks;
}
void action(string actionNumber, string workerName)
{
Console.WriteLine($"processing {actionNumber} in worker {workerName}...");
var secondsToWait = new Random().Next(2, 5);
Thread.Sleep(TimeSpan.FromSeconds(secondsToWait));
Console.WriteLine($"action {actionNumber} completed by worker {workerName}"
+ $" after {secondsToWait} secs! channel items left:"
+ $" [{channel.Reader.Count}]");
if (WorkDistribution.ContainsKey(workerName))
{
lock (LockObject)
{
WorkDistribution[workerName]++;
}
}
else
{
var succeeded = WorkDistribution.TryAdd(workerName, 1);
if (!succeeded)
{
Console.WriteLine($"!!! failed incremeting dic value !!!");
}
}
}
public void Summarize(Stopwatch stopwatch)
{
Console.WriteLine("--------------------------- Thread Work Distribution "
+ "------------------------");
foreach (var kv in this.WorkDistribution)
Console.WriteLine($"thread: {kv.Key} items consumed: {kv.Value}");
Console.WriteLine($"Total actions consumed: "
+ $"{WorkDistribution.Sum(w => w.Value)} - Elapsed time: "
+ $"{stopwatch.Elapsed.Seconds} secs");
}
public void Run(int producerCycles)
{
var stopwatch = new Stopwatch();
stopwatch.Start();
var producerTasks = new List<Task>();
Console.WriteLine($"Started running at {DateTime.Now}...");
for (var i = 0; i < producerCycles; i++)
{
producerTasks.Add(ProduceItems(i));
}
var consumerTasks = GetConsumers();
Task.WaitAll(producerTasks.ToArray());
Console.WriteLine($"-------------- Completed waiting for PRODUCERS -"
+ " total items produced: [{TotalItemsProduced}] ------------------");
channel.Writer.Complete(); //just so I can complete this demo
Task.WaitAll(consumerTasks.ToArray());
Console.WriteLine("----------------- Completed waiting for CONSUMERS "
+ "------------------");
//Task.WaitAll(GetConsumers().Union(producerTasks/*.Union(
// new List<Task> { taskKey })*/).ToArray());
//Console.WriteLine("Completed waiting for tasks");
Summarize(stopwatch);
}
}
这里是Program.cs
中的调用代码
var workersCount = 5;
var itemsToProduce = 10;
var maxItemsInQueue = 5;
var cts = new CancellationTokenSource();
var producerConsumerTests = new ProducerConsumerTests(workersCount, itemsToProduce,
maxItemsInQueue, cts.Token);
producerConsumerTests.Run(2);
快速查看 ProduceItems
方法中的 queueHasRoom
变量周围存在竞争条件。你不需要这个变量。 channel.Writer.TryWrite
method will tell you whether there is room in the channel's buffer or not. Alternatively you could simply await
the WriteAsync
方法,而不是使用 WaitToWriteAsync
/TryWrite
组合。 AFAIK 这个组合旨在作为前一种方法的性能优化。如果您在尝试 post 值之前绝对需要知道是否有可用的 space,那么 Channel<T>
可能不适合您的用例。你需要找到在“check-for-available-space -> create-the-value -> post-the-value”的整个操作过程中可以锁定的东西,这样这个操作可以原子化。
附带说明一下,使用锁来保护 ConcurrentDictionary
的更新是多余的。 ConcurrentDictionary
提供了 AddOrUpdate
方法,可以原子地用另一个值替换它包含的值。如果字典包含可变对象,您可能必须锁定,并且您需要使用线程安全来改变这些对象。但在您的情况下,值的类型为 Int32
,这是一个不可变的结构。你不改变它,你只是用一个新的 Int32
替换它,它是根据现有值创建的:
WorkDistribution.AddOrUpdate(workerName, 1, (_, existing) => existing + 1);
正在关注 this post, I have been playing with System.Threading.Channel
to get confident enough and use it in my production code, replacing the Threads/Monitor.Pulse/Wait
based approach I am currently using (described in the referred post)。
基本上,我创建了一个带有边界通道的示例,其中我 运行 开始时有几个生产者任务,无需等待,开始我的消费者任务,这些任务开始从通道推送元素。 等待生产者任务完成后,我会向通道发出完成信号,这样消费者任务就可以停止收听新的通道元素。
我的频道是 Channel<Action>
,在每个操作中,我都会增加 WorkDistribution
并发字典中每个给定工作人员的计数,并在示例结束时打印它以便我可以检查我消费了多少商品,以及渠道如何在消费者之间分配操作。
出于某种原因,此“工作分配页脚”打印的项目数与生产者任务生成的项目总数不同。
我错过了什么? 添加存在的一些变量的唯一目的是帮助解决问题。
完整代码如下:
public class ChannelSolution
{
object LockObject = new object();
Channel<Action<string>> channel;
int ItemsToProduce;
int WorkersCount;
int TotalItemsProduced;
ConcurrentDictionary<string, int> WorkDistribution;
CancellationToken Ct;
public ChannelSolution(int workersCount, int itemsToProduce, int maxAllowedItems,
CancellationToken ct)
{
WorkersCount = workersCount;
ItemsToProduce = itemsToProduce;
channel = Channel.CreateBounded<Action<string>>(maxAllowedItems);
Console.WriteLine($"Created channel with max {maxAllowedItems} items");
WorkDistribution = new ConcurrentDictionary<string, int>();
Ct = ct;
}
async Task ProduceItems(int cycle)
{
for (var i = 0; i < ItemsToProduce; i++)
{
var index = i + 1 + (ItemsToProduce * cycle);
bool queueHasRoom;
var stopwatch = new Stopwatch();
stopwatch.Start();
do
{
if (Ct.IsCancellationRequested)
{
Console.WriteLine("exiting read loop - cancellation requested !");
break;
}
queueHasRoom = await channel.Writer.WaitToWriteAsync();
if (!queueHasRoom)
{
if (Ct.IsCancellationRequested)
{
Console.WriteLine("exiting read loop - cancellation"
+ " requested !");
break;
}
if (stopwatch.Elapsed.Seconds % 3 == 0)
Console.WriteLine("Channel reached maximum capacity..."
+ " producer waiting for items to be freed...");
}
}
while (!queueHasRoom);
channel.Writer.TryWrite((workerName) => action($"A{index}", workerName));
Console.WriteLine($"Channel has room, item {index} added"
+ $" - channel items count: [{channel.Reader.Count}]");
Interlocked.Increment(ref TotalItemsProduced);
}
}
List<Task> GetConsumers()
{
var tasks = new List<Task>();
for (var i = 0; i < WorkersCount; i++)
{
var workerName = $"W{(i + 1).ToString("00")}";
tasks.Add(Task.Run(async () =>
{
while (await channel.Reader.WaitToReadAsync())
{
if (Ct.IsCancellationRequested)
{
Console.WriteLine("exiting write loop - cancellation"
+ "requested !");
break;
}
if (channel.Reader.TryRead(out var action))
{
Console.WriteLine($"dequed action in worker [{workerName}]");
action(workerName);
}
}
}));
}
return tasks;
}
void action(string actionNumber, string workerName)
{
Console.WriteLine($"processing {actionNumber} in worker {workerName}...");
var secondsToWait = new Random().Next(2, 5);
Thread.Sleep(TimeSpan.FromSeconds(secondsToWait));
Console.WriteLine($"action {actionNumber} completed by worker {workerName}"
+ $" after {secondsToWait} secs! channel items left:"
+ $" [{channel.Reader.Count}]");
if (WorkDistribution.ContainsKey(workerName))
{
lock (LockObject)
{
WorkDistribution[workerName]++;
}
}
else
{
var succeeded = WorkDistribution.TryAdd(workerName, 1);
if (!succeeded)
{
Console.WriteLine($"!!! failed incremeting dic value !!!");
}
}
}
public void Summarize(Stopwatch stopwatch)
{
Console.WriteLine("--------------------------- Thread Work Distribution "
+ "------------------------");
foreach (var kv in this.WorkDistribution)
Console.WriteLine($"thread: {kv.Key} items consumed: {kv.Value}");
Console.WriteLine($"Total actions consumed: "
+ $"{WorkDistribution.Sum(w => w.Value)} - Elapsed time: "
+ $"{stopwatch.Elapsed.Seconds} secs");
}
public void Run(int producerCycles)
{
var stopwatch = new Stopwatch();
stopwatch.Start();
var producerTasks = new List<Task>();
Console.WriteLine($"Started running at {DateTime.Now}...");
for (var i = 0; i < producerCycles; i++)
{
producerTasks.Add(ProduceItems(i));
}
var consumerTasks = GetConsumers();
Task.WaitAll(producerTasks.ToArray());
Console.WriteLine($"-------------- Completed waiting for PRODUCERS -"
+ " total items produced: [{TotalItemsProduced}] ------------------");
channel.Writer.Complete(); //just so I can complete this demo
Task.WaitAll(consumerTasks.ToArray());
Console.WriteLine("----------------- Completed waiting for CONSUMERS "
+ "------------------");
//Task.WaitAll(GetConsumers().Union(producerTasks/*.Union(
// new List<Task> { taskKey })*/).ToArray());
//Console.WriteLine("Completed waiting for tasks");
Summarize(stopwatch);
}
}
这里是Program.cs
中的调用代码var workersCount = 5;
var itemsToProduce = 10;
var maxItemsInQueue = 5;
var cts = new CancellationTokenSource();
var producerConsumerTests = new ProducerConsumerTests(workersCount, itemsToProduce,
maxItemsInQueue, cts.Token);
producerConsumerTests.Run(2);
快速查看 ProduceItems
方法中的 queueHasRoom
变量周围存在竞争条件。你不需要这个变量。 channel.Writer.TryWrite
method will tell you whether there is room in the channel's buffer or not. Alternatively you could simply await
the WriteAsync
方法,而不是使用 WaitToWriteAsync
/TryWrite
组合。 AFAIK 这个组合旨在作为前一种方法的性能优化。如果您在尝试 post 值之前绝对需要知道是否有可用的 space,那么 Channel<T>
可能不适合您的用例。你需要找到在“check-for-available-space -> create-the-value -> post-the-value”的整个操作过程中可以锁定的东西,这样这个操作可以原子化。
附带说明一下,使用锁来保护 ConcurrentDictionary
的更新是多余的。 ConcurrentDictionary
提供了 AddOrUpdate
方法,可以原子地用另一个值替换它包含的值。如果字典包含可变对象,您可能必须锁定,并且您需要使用线程安全来改变这些对象。但在您的情况下,值的类型为 Int32
,这是一个不可变的结构。你不改变它,你只是用一个新的 Int32
替换它,它是根据现有值创建的:
WorkDistribution.AddOrUpdate(workerName, 1, (_, existing) => existing + 1);