.NET 排队任务(async/await)
.NET queued tasks (with async/await)
我有大量任务 (~1000) 需要执行。我 运行 使用 4 核处理器,所以我想一次并行处理 4 个任务。
为了给您一个起点,这里有一些示例代码。
class Program
{
public class LongOperation
{
private static readonly Random RandomNumberGenerator = new Random(0);
const int UpdateFrequencyMilliseconds = 100;
public int CurrentProgress { get; set; }
public int TargetProcess { get; set; }
public LongOperation()
{
TargetProcess = RandomNumberGenerator.Next(
(int)TimeSpan.FromSeconds(5).TotalMilliseconds / UpdateFrequencyMilliseconds,
(int)TimeSpan.FromSeconds(10).TotalMilliseconds / UpdateFrequencyMilliseconds);
}
public async Task Execute()
{
while (!IsCompleted)
{
await Task.Delay(UpdateFrequencyMilliseconds);
CurrentProgress++;
}
}
public bool IsCompleted => CurrentProgress >= TargetProcess;
}
static void Main(string[] args)
{
Task.Factory.StartNew(async () =>
{
var operations = new List<LongOperation>();
for(var x = 1; x <= 10; x++)
operations.Add(new LongOperation());
await ProcessOperations(4, operations);
}).Wait();
}
public static async Task ProcessOperations(int maxSimultaneous, List<LongOperation> operations)
{
await Task.WhenAll(operations.Select(x => x.Execute()));
// TODO: Process up to 4 operations at a time, until every operation is completed.
}
}
我想要一些关于 类 我将使用什么的信息,以及我如何构造 ProcessOperations
一次最多处理 4 个操作,直到所有操作完成,在一个等待中 Task
.
我正在考虑以某种方式使用 SemaphoreSlim
对象,因为它似乎适合限制 resource/process.
如前所述,您需要使用一个方便的 TPL Dataflow library,有两个块,用于在处理之前存储消息,以及对它们执行实际操作:
// storage
var operations = new BufferBlock<LongOperation>();
// no more than 4 actions at the time
var actions = new ActionBlock<LongOperation>(x => x.Execute(),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
// consume new operations automatically
operations.LinkTo(actions);
for(var x = 1; x <= 10; ++x)
{
// blocking sending
operations.Post(new LongOperation());
// awaitable send for async operations
// await operations.SendAsync(new LongOperation());
}
您还可以通过设置缓冲区的 BoundedCapacity
选项来引入一些节流限制,比如同时不超过 30 个操作。
我有大量任务 (~1000) 需要执行。我 运行 使用 4 核处理器,所以我想一次并行处理 4 个任务。
为了给您一个起点,这里有一些示例代码。
class Program
{
public class LongOperation
{
private static readonly Random RandomNumberGenerator = new Random(0);
const int UpdateFrequencyMilliseconds = 100;
public int CurrentProgress { get; set; }
public int TargetProcess { get; set; }
public LongOperation()
{
TargetProcess = RandomNumberGenerator.Next(
(int)TimeSpan.FromSeconds(5).TotalMilliseconds / UpdateFrequencyMilliseconds,
(int)TimeSpan.FromSeconds(10).TotalMilliseconds / UpdateFrequencyMilliseconds);
}
public async Task Execute()
{
while (!IsCompleted)
{
await Task.Delay(UpdateFrequencyMilliseconds);
CurrentProgress++;
}
}
public bool IsCompleted => CurrentProgress >= TargetProcess;
}
static void Main(string[] args)
{
Task.Factory.StartNew(async () =>
{
var operations = new List<LongOperation>();
for(var x = 1; x <= 10; x++)
operations.Add(new LongOperation());
await ProcessOperations(4, operations);
}).Wait();
}
public static async Task ProcessOperations(int maxSimultaneous, List<LongOperation> operations)
{
await Task.WhenAll(operations.Select(x => x.Execute()));
// TODO: Process up to 4 operations at a time, until every operation is completed.
}
}
我想要一些关于 类 我将使用什么的信息,以及我如何构造 ProcessOperations
一次最多处理 4 个操作,直到所有操作完成,在一个等待中 Task
.
我正在考虑以某种方式使用 SemaphoreSlim
对象,因为它似乎适合限制 resource/process.
如前所述,您需要使用一个方便的 TPL Dataflow library,有两个块,用于在处理之前存储消息,以及对它们执行实际操作:
// storage
var operations = new BufferBlock<LongOperation>();
// no more than 4 actions at the time
var actions = new ActionBlock<LongOperation>(x => x.Execute(),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
// consume new operations automatically
operations.LinkTo(actions);
for(var x = 1; x <= 10; ++x)
{
// blocking sending
operations.Post(new LongOperation());
// awaitable send for async operations
// await operations.SendAsync(new LongOperation());
}
您还可以通过设置缓冲区的 BoundedCapacity
选项来引入一些节流限制,比如同时不超过 30 个操作。