如何正确完成一个Channel?如何使用多个渠道?
How to complete a Channel right way? How to use multiple channels?
我有一个数据列表,想创建与列表中的元素数相对应的任务数。但我不知道如何正确完成频道。
我的代码,但是频道没有像我预期的那样关闭。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace Ding.LearningNewThings
{
public class MultipleChannel
{
public static async Task RunMiltipleChannel()
{
List<Place> listPlace = Place.InitData();
Channel<Position> _dataChannel = Channel.CreateUnbounded<Position>();
var listTask = new Task[11];
var listStatus = new bool[10];
for (int k = 0; k < listPlace.Count(); k++)
{
var place = listPlace[k];
listTask[k] = Task.Run(async () =>
{
int kk = k;
int count = 0;
Random r = new Random();
while (count < 10)
{
int id = r.Next(1, 1000);
var position = new Position()
{
ID = id,
Name = $"Postion{id}",
PlaceID = place.ID,
PlaceName = place.Name
};
Console.WriteLine($"WRITE: Position ID: {position.ID}, Postion Name: {position.Name}");
await _dataChannel.Writer.WriteAsync(position);
count++;
}
lock (listStatus)
{
if(count == 10)
{
listStatus[k] = true;
}
bool isStop = true;
foreach(var status in listStatus)
{
if (!status)
{
isStop = false;
}
}
if (isStop)
{
_dataChannel.Writer.Complete();
Console.WriteLine("Stopped");
}
}
});
}
listTask[10] = Task.Run(async () =>
{
while (await _dataChannel.Reader.WaitToReadAsync())
{
await Task.Delay(100);
var data = await _dataChannel.Reader.ReadAsync();
Console.WriteLine($"READ: Position ID: {data.ID}, Postion Name: {data.Name}");
}
});
await Task.WhenAll(listTask);
}
}
public class Place
{
public int ID { get; set; }
public string Name { get; set; }
public static List<Place> InitData()
{
var listData = new List<Place>();
for (int i = 0; i < 10; i++)
{
var data = new Place()
{
ID = i,
Name = $"Postion{i}",
};
listData.Add(data);
}
return listData;
}
}
public class Position
{
public int ID { get; set; }
public int PlaceID { get; set; }
public string PlaceName { get; set; }
public string Name { get; set; }
public static List<Position> InitData()
{
var listData = new List<Position>();
for (int i = 0; i < 10; i++)
{
var data = new Position()
{
ID = i,
Name = $"Postion{i}"
};
listData.Add(data);
}
return listData;
}
}
}
如果我想为每个任务创建单独的频道,我该如何完成它们?请举个例子。
在任务中使用迭代变量是有问题的。我在任务完成后改变
初始化。例如:
const int count = 10;
Task[] tasks = new Task[count];
for (int i = 0; i < count; i++)
{
tasks[i] = Task.Run(() => Console.WriteLine(i));
}
await Task.WhenAll(tasks);
将给出以下输出:
10
10
10
10
10
10
10
10
10
10
锁定没有帮助,因为 i 在到达锁定语句之前更改。
在循环中使用第二个变量给出了预期的结果:
const int count = 10;
Task[] tasks = new Task[count];
for (int i = 0; i < count; i++)
{
int j = i;
tasks[i] = Task.Run(() => Console.WriteLine(j));
}
await Task.WhenAll(tasks);
6
3
1
0
2
4
5
7
8
9
这是修改后的代码。
我去掉了那里的锁和复杂的逻辑。还修复了变量捕获并添加了对读取的健全性检查。
评论一致。请尝试 运行 并询问您是否有问题。
public class MultipleChannel
{
public static async Task RunMiltipleChannel()
{
List<Place> listPlace = Place.InitData();
Channel<Position> _dataChannel = Channel.CreateUnbounded<Position>();
var listTask = new Task[11];
//Count the number of writer tasks that finished
int completedTasks = 0;
for (int k = 0; k < listPlace.Count; k++)
{
var place = listPlace[k];
//Important to avoid closures
var kCapture = k;
listTask[kCapture] = Task.Run(async () =>
{
int kk = kCapture;
int count = 0;
Random r = new Random();
while (count < 10)
{
int id = r.Next(1, 1000);
var position = new Position()
{
ID = id,
Name = $"Postion{id}",
PlaceID = place.ID,
PlaceName = place.Name
};
Console.WriteLine($"WRITE: Position ID: {position.ID}, Postion Name: {position.Name}");
await _dataChannel.Writer.WriteAsync(position);
count++;
}
//Thread-safe check to see if this is the last task to complete
if (Interlocked.Increment(ref completedTasks) == 10)
{
_dataChannel.Writer.Complete();
Console.WriteLine($"Task {kk} finished, CHANNEL COMPLETED");
}
else
{
Console.WriteLine($"Task {kk} finished");
}
});
}
//Make sure we read everything
int readCount = 0;
listTask[10] = Task.Run(async () =>
{
while (await _dataChannel.Reader.WaitToReadAsync())
{
await Task.Delay(100);
var data = await _dataChannel.Reader.ReadAsync();
readCount++;
Console.WriteLine($"READ: Position ID: {data.ID}, Postion Name: {data.Name}");
}
});
await Task.WhenAll(listTask);
//Sanity check
Console.WriteLine($"Read {readCount} position data");
}
}
我可以确认频道已关闭,并且已读取 100 个项目。
我有一个数据列表,想创建与列表中的元素数相对应的任务数。但我不知道如何正确完成频道。
我的代码,但是频道没有像我预期的那样关闭。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace Ding.LearningNewThings
{
public class MultipleChannel
{
public static async Task RunMiltipleChannel()
{
List<Place> listPlace = Place.InitData();
Channel<Position> _dataChannel = Channel.CreateUnbounded<Position>();
var listTask = new Task[11];
var listStatus = new bool[10];
for (int k = 0; k < listPlace.Count(); k++)
{
var place = listPlace[k];
listTask[k] = Task.Run(async () =>
{
int kk = k;
int count = 0;
Random r = new Random();
while (count < 10)
{
int id = r.Next(1, 1000);
var position = new Position()
{
ID = id,
Name = $"Postion{id}",
PlaceID = place.ID,
PlaceName = place.Name
};
Console.WriteLine($"WRITE: Position ID: {position.ID}, Postion Name: {position.Name}");
await _dataChannel.Writer.WriteAsync(position);
count++;
}
lock (listStatus)
{
if(count == 10)
{
listStatus[k] = true;
}
bool isStop = true;
foreach(var status in listStatus)
{
if (!status)
{
isStop = false;
}
}
if (isStop)
{
_dataChannel.Writer.Complete();
Console.WriteLine("Stopped");
}
}
});
}
listTask[10] = Task.Run(async () =>
{
while (await _dataChannel.Reader.WaitToReadAsync())
{
await Task.Delay(100);
var data = await _dataChannel.Reader.ReadAsync();
Console.WriteLine($"READ: Position ID: {data.ID}, Postion Name: {data.Name}");
}
});
await Task.WhenAll(listTask);
}
}
public class Place
{
public int ID { get; set; }
public string Name { get; set; }
public static List<Place> InitData()
{
var listData = new List<Place>();
for (int i = 0; i < 10; i++)
{
var data = new Place()
{
ID = i,
Name = $"Postion{i}",
};
listData.Add(data);
}
return listData;
}
}
public class Position
{
public int ID { get; set; }
public int PlaceID { get; set; }
public string PlaceName { get; set; }
public string Name { get; set; }
public static List<Position> InitData()
{
var listData = new List<Position>();
for (int i = 0; i < 10; i++)
{
var data = new Position()
{
ID = i,
Name = $"Postion{i}"
};
listData.Add(data);
}
return listData;
}
}
}
如果我想为每个任务创建单独的频道,我该如何完成它们?请举个例子。
在任务中使用迭代变量是有问题的。我在任务完成后改变 初始化。例如:
const int count = 10;
Task[] tasks = new Task[count];
for (int i = 0; i < count; i++)
{
tasks[i] = Task.Run(() => Console.WriteLine(i));
}
await Task.WhenAll(tasks);
将给出以下输出:
10
10
10
10
10
10
10
10
10
10
锁定没有帮助,因为 i 在到达锁定语句之前更改。
在循环中使用第二个变量给出了预期的结果:
const int count = 10;
Task[] tasks = new Task[count];
for (int i = 0; i < count; i++)
{
int j = i;
tasks[i] = Task.Run(() => Console.WriteLine(j));
}
await Task.WhenAll(tasks);
6
3
1
0
2
4
5
7
8
9
这是修改后的代码。
我去掉了那里的锁和复杂的逻辑。还修复了变量捕获并添加了对读取的健全性检查。
评论一致。请尝试 运行 并询问您是否有问题。
public class MultipleChannel
{
public static async Task RunMiltipleChannel()
{
List<Place> listPlace = Place.InitData();
Channel<Position> _dataChannel = Channel.CreateUnbounded<Position>();
var listTask = new Task[11];
//Count the number of writer tasks that finished
int completedTasks = 0;
for (int k = 0; k < listPlace.Count; k++)
{
var place = listPlace[k];
//Important to avoid closures
var kCapture = k;
listTask[kCapture] = Task.Run(async () =>
{
int kk = kCapture;
int count = 0;
Random r = new Random();
while (count < 10)
{
int id = r.Next(1, 1000);
var position = new Position()
{
ID = id,
Name = $"Postion{id}",
PlaceID = place.ID,
PlaceName = place.Name
};
Console.WriteLine($"WRITE: Position ID: {position.ID}, Postion Name: {position.Name}");
await _dataChannel.Writer.WriteAsync(position);
count++;
}
//Thread-safe check to see if this is the last task to complete
if (Interlocked.Increment(ref completedTasks) == 10)
{
_dataChannel.Writer.Complete();
Console.WriteLine($"Task {kk} finished, CHANNEL COMPLETED");
}
else
{
Console.WriteLine($"Task {kk} finished");
}
});
}
//Make sure we read everything
int readCount = 0;
listTask[10] = Task.Run(async () =>
{
while (await _dataChannel.Reader.WaitToReadAsync())
{
await Task.Delay(100);
var data = await _dataChannel.Reader.ReadAsync();
readCount++;
Console.WriteLine($"READ: Position ID: {data.ID}, Postion Name: {data.Name}");
}
});
await Task.WhenAll(listTask);
//Sanity check
Console.WriteLine($"Read {readCount} position data");
}
}
我可以确认频道已关闭,并且已读取 100 个项目。