简单的生产者-消费者示例 C# 字典
Simple producer-consumer example C# Dictionary
我迈出了并行编程的第一步。我写了一个最简单的代码,但结果很混乱。此代码采用生产者-消费者模式获取前 10 个最近的项目。只有 1 个消费者和 1 个生产者。有两个消费者时效果更好,但也不正确。
public static void ProducerConsumer(string path)
{
var capacity = 50000;
var collection = new BlockingCollection<string>(capacity);
var dict = new Dictionary<string, int>();
var tasks = new List<Task<Dictionary<string, int>>>();
var producer = Task.Factory.StartNew(() =>
{
Parallel.ForEach(File.ReadLines(path), (line) =>
{
collection.Add(line);
});
collection.CompleteAdding();
});
for (int i = 0; i < 1; i++)
{
var consumer = Task.Factory.StartNew<Dictionary<string, int>>(() =>
{
var localDict = new Dictionary<string, int>();
while (!collection.IsCompleted)
{
string line;
if (collection.TryTake(out line))
{
Map(line, localDict);
}
}
return localDict;
});
tasks.Add(consumer);
}
int count = 0;
while (tasks.Count > 0)
{
var id = Task.WaitAny(tasks.ToArray());
var res = tasks[id].Result;
count += res.Sum(k => k.Value);
Aggregate(res, dict);
tasks.RemoveAt(id);
}
Console.WriteLine($"Values sum : {count}");
ShowResult(dict);
ShowTotal(dict, "End");
}
public static void Map(string line, Dictionary<string, int> dict)
{
var parts = line.Split(new string[] { ";" }, StringSplitOptions.RemoveEmptyEntries);
var streetName = parts[3];
if (dict.Keys.Contains(streetName))
{
dict[streetName]++;
}
else
{
dict.Add(streetName, 1);
}
}
public static void ShowResult(Dictionary<string, int> dict)
{
var res = dict.OrderByDescending(r => r.Value).Take(10).ToList();
foreach (var key in res)
{
Console.WriteLine($"{key.Key} - {key.Value}");
}
}
public static void Aggregate(Dictionary<string, int> part, Dictionary<string, int> main)
{
foreach (var key in part.Keys)
{
if (main.Keys.Contains(key))
{
main[key] += part[key];
}
else
{
main.Add(key, 1);
}
}
}
public static void ShowTotal(Dictionary<string, int> dict, string mark = null)
{
Console.WriteLine($"{mark ?? ""} Keys: {dict.Keys.Count} - Values:{dict.Sum(s => s.Value)}");
}
调试时它显示了正确的步骤,但结果显示每个项目只有一个命中,总数不正确。
就像KooKiz说的,应该是
main.Add(key, part[key])
如果我理解你的算法,应该是:
main.Add(key, part[key])
我迈出了并行编程的第一步。我写了一个最简单的代码,但结果很混乱。此代码采用生产者-消费者模式获取前 10 个最近的项目。只有 1 个消费者和 1 个生产者。有两个消费者时效果更好,但也不正确。
public static void ProducerConsumer(string path)
{
var capacity = 50000;
var collection = new BlockingCollection<string>(capacity);
var dict = new Dictionary<string, int>();
var tasks = new List<Task<Dictionary<string, int>>>();
var producer = Task.Factory.StartNew(() =>
{
Parallel.ForEach(File.ReadLines(path), (line) =>
{
collection.Add(line);
});
collection.CompleteAdding();
});
for (int i = 0; i < 1; i++)
{
var consumer = Task.Factory.StartNew<Dictionary<string, int>>(() =>
{
var localDict = new Dictionary<string, int>();
while (!collection.IsCompleted)
{
string line;
if (collection.TryTake(out line))
{
Map(line, localDict);
}
}
return localDict;
});
tasks.Add(consumer);
}
int count = 0;
while (tasks.Count > 0)
{
var id = Task.WaitAny(tasks.ToArray());
var res = tasks[id].Result;
count += res.Sum(k => k.Value);
Aggregate(res, dict);
tasks.RemoveAt(id);
}
Console.WriteLine($"Values sum : {count}");
ShowResult(dict);
ShowTotal(dict, "End");
}
public static void Map(string line, Dictionary<string, int> dict)
{
var parts = line.Split(new string[] { ";" }, StringSplitOptions.RemoveEmptyEntries);
var streetName = parts[3];
if (dict.Keys.Contains(streetName))
{
dict[streetName]++;
}
else
{
dict.Add(streetName, 1);
}
}
public static void ShowResult(Dictionary<string, int> dict)
{
var res = dict.OrderByDescending(r => r.Value).Take(10).ToList();
foreach (var key in res)
{
Console.WriteLine($"{key.Key} - {key.Value}");
}
}
public static void Aggregate(Dictionary<string, int> part, Dictionary<string, int> main)
{
foreach (var key in part.Keys)
{
if (main.Keys.Contains(key))
{
main[key] += part[key];
}
else
{
main.Add(key, 1);
}
}
}
public static void ShowTotal(Dictionary<string, int> dict, string mark = null)
{
Console.WriteLine($"{mark ?? ""} Keys: {dict.Keys.Count} - Values:{dict.Sum(s => s.Value)}");
}
调试时它显示了正确的步骤,但结果显示每个项目只有一个命中,总数不正确。
就像KooKiz说的,应该是
main.Add(key, part[key])
如果我理解你的算法,应该是:
main.Add(key, part[key])