以并行和顺序方式执行 N 个线程
Executing N number of threads in parallel and in a sequential manner
我有一个应用程序,其中有 1 个大文件的 1000 多个小部分。
我一次最多只能上传 16 个部分。
我用的是.Net的Thread并行库
我用Parallel.For划分了多个部分,并分配了1个方法,每个部分都应该执行,并将DegreeOfParallelism设置为16。
我需要使用由不同部分上传生成的校验和值执行 1 种方法,因此我必须设置某种机制,我必须等待所有部分上传完成,比如 1000。
在 TPL 库中,我面临 1 个问题,它随机执行 1000 个中的 16 个线程中的任何一个。
我想要一些机制,如果第 1 个或第 2 个或 16 个线程中的任何一个完成其任务,我可以使用它来 运行 最初的第 16 个线程,接下来应该启动第 17 部分。
我怎样才能做到这一点?
var workitems = ... /*e.g. Enumerable.Range(0, 1000000)*/;
SingleItemPartitioner.Create(workitems)
.AsParallel()
.AsOrdered()
.WithDegreeOfParallelism(16)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.ForAll(i => { Thread.Slee(1000); Console.WriteLine(i); });
这应该就是您所需要的。我忘记了这些方法的确切命名方式...查看文档。
通过在休眠 1 秒后打印到控制台来对此进行测试(此示例代码就是这样做的)。
一个可能的候选者可以是 TPL Dataflow。这是一个接收整数流并将它们打印到控制台的演示。您将 MaxDegreeOfParallelism
设置为您希望并行旋转的任意多个线程:
void Main()
{
var actionBlock = new ActionBlock<int>(
i => Console.WriteLine(i),
new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 16});
foreach (var i in Enumerable.Range(0, 200))
{
actionBlock.Post(i);
}
}
如果你想拥有多个 producer/consumers。
这是执行此操作的手动方法。
你需要排队。队列是待处理任务的序列。您必须出列并将它们放入工作任务列表中。当任务完成时,将其从工作任务列表中删除并从队列中取出另一个。主线程控制这个过程。这是如何执行此操作的示例。
为了测试,我使用了整数列表,但它应该适用于其他类型,因为它使用泛型。
private static void Main()
{
Random r = new Random();
var items = Enumerable.Range(0, 100).Select(x => r.Next(100, 200)).ToList();
ParallelQueue(items, DoWork);
}
private static void ParallelQueue<T>(List<T> items, Action<T> action)
{
Queue pending = new Queue(items);
List<Task> working = new List<Task>();
while (pending.Count + working.Count != 0)
{
if (pending.Count != 0 && working.Count < 16) // Maximum tasks
{
var item = pending.Dequeue(); // get item from queue
working.Add(Task.Run(() => action((T)item))); // run task
}
else
{
Task.WaitAny(working.ToArray());
working.RemoveAll(x => x.IsCompleted); // remove finished tasks
}
}
}
private static void DoWork(int i) // do your work here.
{
// this is just an example
Task.Delay(i).Wait();
Console.WriteLine(i);
}
如果您遇到如何为自己实现 DoWork 的问题,请告诉我。因为如果您更改方法签名,您可能需要做一些更改。
更新
您也可以在不阻塞主线程的情况下使用 async await 执行此操作。
private static void Main()
{
Random r = new Random();
var items = Enumerable.Range(0, 100).Select(x => r.Next(100, 200)).ToList();
Task t = ParallelQueue(items, DoWork);
// able to do other things.
t.Wait();
}
private static async Task ParallelQueue<T>(List<T> items, Func<T, Task> func)
{
Queue pending = new Queue(items);
List<Task> working = new List<Task>();
while (pending.Count + working.Count != 0)
{
if (working.Count < 16 && pending.Count != 0)
{
var item = pending.Dequeue();
working.Add(Task.Run(async () => await func((T)item)));
}
else
{
await Task.WhenAny(working);
working.RemoveAll(x => x.IsCompleted);
}
}
}
private static async Task DoWork(int i)
{
await Task.Delay(i);
}
另一种选择是使用 BlockingCollection<T>
作为文件 reader 线程和 16 个上传线程之间的队列。每个上传程序线程只会循环使用阻塞集合,直到它完成。
并且,如果您想限制队列中的内存消耗,您可以为阻塞收集设置上限,这样 file-reader 线程将在缓冲区达到容量时暂停。这在您可能需要限制每次 user/API 调用使用的内存的服务器环境中特别有用。
// Create a buffer of 4 chunks between the file reader and the senders
BlockingCollection<Chunk> queue = new BlockingCollection<Chunk>(4);
// Create a cancellation token source so you can stop this gracefully
CancellationTokenSource cts = ...
文件reader线程
...
queue.Add(chunk, cts.Token);
...
queue.CompleteAdding();
发送线程
for(int i = 0; i < 16; i++)
{
Task.Run(() => {
foreach (var chunk in queue.GetConsumingEnumerable(cts.Token))
{
.. do the upload
}
});
}
我有一个应用程序,其中有 1 个大文件的 1000 多个小部分。
我一次最多只能上传 16 个部分。
我用的是.Net的Thread并行库
我用Parallel.For划分了多个部分,并分配了1个方法,每个部分都应该执行,并将DegreeOfParallelism设置为16。
我需要使用由不同部分上传生成的校验和值执行 1 种方法,因此我必须设置某种机制,我必须等待所有部分上传完成,比如 1000。 在 TPL 库中,我面临 1 个问题,它随机执行 1000 个中的 16 个线程中的任何一个。
我想要一些机制,如果第 1 个或第 2 个或 16 个线程中的任何一个完成其任务,我可以使用它来 运行 最初的第 16 个线程,接下来应该启动第 17 部分。
我怎样才能做到这一点?
var workitems = ... /*e.g. Enumerable.Range(0, 1000000)*/;
SingleItemPartitioner.Create(workitems)
.AsParallel()
.AsOrdered()
.WithDegreeOfParallelism(16)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.ForAll(i => { Thread.Slee(1000); Console.WriteLine(i); });
这应该就是您所需要的。我忘记了这些方法的确切命名方式...查看文档。
通过在休眠 1 秒后打印到控制台来对此进行测试(此示例代码就是这样做的)。
一个可能的候选者可以是 TPL Dataflow。这是一个接收整数流并将它们打印到控制台的演示。您将 MaxDegreeOfParallelism
设置为您希望并行旋转的任意多个线程:
void Main()
{
var actionBlock = new ActionBlock<int>(
i => Console.WriteLine(i),
new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 16});
foreach (var i in Enumerable.Range(0, 200))
{
actionBlock.Post(i);
}
}
如果你想拥有多个 producer/consumers。
这是执行此操作的手动方法。
你需要排队。队列是待处理任务的序列。您必须出列并将它们放入工作任务列表中。当任务完成时,将其从工作任务列表中删除并从队列中取出另一个。主线程控制这个过程。这是如何执行此操作的示例。
为了测试,我使用了整数列表,但它应该适用于其他类型,因为它使用泛型。
private static void Main()
{
Random r = new Random();
var items = Enumerable.Range(0, 100).Select(x => r.Next(100, 200)).ToList();
ParallelQueue(items, DoWork);
}
private static void ParallelQueue<T>(List<T> items, Action<T> action)
{
Queue pending = new Queue(items);
List<Task> working = new List<Task>();
while (pending.Count + working.Count != 0)
{
if (pending.Count != 0 && working.Count < 16) // Maximum tasks
{
var item = pending.Dequeue(); // get item from queue
working.Add(Task.Run(() => action((T)item))); // run task
}
else
{
Task.WaitAny(working.ToArray());
working.RemoveAll(x => x.IsCompleted); // remove finished tasks
}
}
}
private static void DoWork(int i) // do your work here.
{
// this is just an example
Task.Delay(i).Wait();
Console.WriteLine(i);
}
如果您遇到如何为自己实现 DoWork 的问题,请告诉我。因为如果您更改方法签名,您可能需要做一些更改。
更新
您也可以在不阻塞主线程的情况下使用 async await 执行此操作。
private static void Main()
{
Random r = new Random();
var items = Enumerable.Range(0, 100).Select(x => r.Next(100, 200)).ToList();
Task t = ParallelQueue(items, DoWork);
// able to do other things.
t.Wait();
}
private static async Task ParallelQueue<T>(List<T> items, Func<T, Task> func)
{
Queue pending = new Queue(items);
List<Task> working = new List<Task>();
while (pending.Count + working.Count != 0)
{
if (working.Count < 16 && pending.Count != 0)
{
var item = pending.Dequeue();
working.Add(Task.Run(async () => await func((T)item)));
}
else
{
await Task.WhenAny(working);
working.RemoveAll(x => x.IsCompleted);
}
}
}
private static async Task DoWork(int i)
{
await Task.Delay(i);
}
另一种选择是使用 BlockingCollection<T>
作为文件 reader 线程和 16 个上传线程之间的队列。每个上传程序线程只会循环使用阻塞集合,直到它完成。
并且,如果您想限制队列中的内存消耗,您可以为阻塞收集设置上限,这样 file-reader 线程将在缓冲区达到容量时暂停。这在您可能需要限制每次 user/API 调用使用的内存的服务器环境中特别有用。
// Create a buffer of 4 chunks between the file reader and the senders
BlockingCollection<Chunk> queue = new BlockingCollection<Chunk>(4);
// Create a cancellation token source so you can stop this gracefully
CancellationTokenSource cts = ...
文件reader线程
...
queue.Add(chunk, cts.Token);
...
queue.CompleteAdding();
发送线程
for(int i = 0; i < 16; i++)
{
Task.Run(() => {
foreach (var chunk in queue.GetConsumingEnumerable(cts.Token))
{
.. do the upload
}
});
}