分区输入并并行执行查询
Partition input and execute queries in parallel
我有以下代码,我想在其中检索给定员工 ID 列表的员工信息。如果计数超过 100 万,我会进行验证以抛出异常。在大多数情况下,请求将小于 200K,因此我将请求分成 4 个分区,每个分区包含相同数量的员工 ID。所有 4 个分区并行执行并在 Task.WhenAll
之后连接在一起。有人可以给我一些进一步改进的提示吗?我查看了 ParallelForEachAsync and Parallel Foreach async in C# 但无法正常工作。下面提到的代码有效,但它被硬编码为分成 4 个分区。如何使最大并行度设置为 50 的动态分区更加并行?如果输入是100K ids,我想分成10个分区并并行执行所有10个。
public class Service
{
private async Task<List<EmployeeEntity>> GetInfo(List<long> input)
{
var breakup = input.Split(4);
var result1Task = GetResult(breakup.First().ToList());
var result2Task = GetResult(breakup.Skip(1).Take(1).First().ToList());
var result3Task = GetResult(breakup.Skip(2).Take(1).First().ToList());
var result4Task = GetResult(breakup.Skip(3).Take(1).First().ToList());
await Task.WhenAll(result1Task, result2Task, result3Task, result4Task);
List<EmployeeEntity> result1 = await result1Task;
List<EmployeeEntity> result2 = await result2Task;
List<EmployeeEntity> result3 = await result3Task;
List<EmployeeEntity> result4 = await result4Task;
return result1.Union(result2.Union(result3.Union(result4))).ToList();
}
private async Task<List<EmployeeEntity>> GetResult(List<long> employees)
{
using var context = new MyAppDBContext();
var EmployeeBand = await context.EmployeeBand.Where(x => employees.Contains(x.EmployeeId)).ToListAsync();
var EmployeeClient = await context.EmployeeClient.Where(x => employees.Contains(x.EmployeeId)).ToListAsync();
return await context.Employee.Where(x => employees.Contains(x.EmployeeId)).ToListAsync();
}
}
public static class ExtensionMethods
{
public static List<List<T>> Split<T>(this List<T> myList, int parts)
{
int i = 0;
var splits = from item in myList
group item by i++ % parts into part
select part.ToList();
return splits.ToList();
}
}
public class EmployeeEntity
{
public EmployeeEntity()
{
EmployeeBands = new HashSet<EmployeeBandEntity>();
EmployeeClients = new HashSet<EmployeeClientEntity>();
}
public long EmployeeId { get; set; }
public ICollection<EmployeeBandEntity> EmployeeBands { get; set; }
public ICollection<EmployeeClientEntity> EmployeeClients { get; set; }
}
public class EmployeeBandEntity
{
public long EmployeeBandId { get; set; }
public long EmployeeId { get; set; }
public EmployeeEntity EmployeeEntity { get; set; }
}
public class EmployeeClientEntity
{
public long EmployeeClientId { get; set; }
public long EmployeeId { get; set; }
public EmployeeEntity EmployeeEntity { get; set; }
}
public partial class MyAppDBContext : DbContext
{
public virtual DbSet<EmployeeEntity> Employee { get; set; }
public virtual DbSet<EmployeeBandEntity> EmployeeBand { get; set; }
public virtual DbSet<EmployeeClientEntity> EmployeeClient { get; set; }
}
我相信您可以对 GetResult 发挥真正的创意并以更好的方式重写,以便查询类似于 where id greater than (and/or less than)
而不是 ids in (... list)
。假设你的 GetResult 已经以最好的方式实现,你只需要一种方法来实现最大的并行执行,这是我的解决方案。
private async Task<List<EmployeeEntity>> GetInfo2(List<long> input)
{
if (input == null)
{
return null;
}
if (input.Count == 0)
{
return new List<EmployeeEntity>();
}
var taskList = new List<Task<List<EmployeeEntity>>>();
foreach (var batch in input.Batch(100))
{
taskList.Add(GetResult(batch.ToList()));
}
var result = (await Task.WhenAll(taskList)).SelectMany(a => a);
return result.ToList();
}
这需要下面的批量扩展方法。
public static class Extensions
{
public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int size)
{
T[] bucket = null;
var count = 0;
foreach (var item in source)
{
if (bucket == null)
bucket = new T[size];
bucket[count++] = item;
if (count != size)
continue;
yield return bucket.Select(x => x);
bucket = null;
count = 0;
}
if (bucket != null && count > 0)
yield return bucket.Take(count);
}
}
您可以找到批量大小的最佳点。我已将它硬编码为 100,但您可以根据输入列表的大小或您可能拥有的任何其他逻辑推导它。
这是管理任务的替代解决方案,无需编写我在之前的回答(在评论中)中建议的 semaphoreslim 东西。我遇到了这个 .net 开箱即用的解决方案,它使用 System.Threading.Tasks.Dataflow
命名空间中的 ActionBlock
。这负责在不使用信号量 slim 等的情况下将并发保持在所需的最大程度。
Batch
扩展仍然与我之前的回答相同。但对于第一部分,您将使用如下所示的 ActionBlock
(而不是 Task.WhenAll
)。
private async Task<List<EmployeeEntity>> GetInfo2(List<long> input)
{
if (input == null)
{
return null;
}
if (input.Count == 0)
{
return new List<EmployeeEntity>();
}
var employeeEntities = new BlockingCollection<EmployeeEntity>();
var actionBlock = new ActionBlock<List<long>>(async (employeeIds) =>
{
var employees = await GetResult(employeeIds);
employees.ForEach(e => employeeEntities.Add(e));
}, new ExecutionDataflowBlockOptions
{
//config this to whatever works best in your situation
MaxDegreeOfParallelism = 20
});
foreach (var batch in input.Batch(100))
{
await actionBlock.SendAsync(batch.ToList());
}
actionBlock.Complete();
await actionBlock.Completion;
return employeeEntities.ToList();
}
我有以下代码,我想在其中检索给定员工 ID 列表的员工信息。如果计数超过 100 万,我会进行验证以抛出异常。在大多数情况下,请求将小于 200K,因此我将请求分成 4 个分区,每个分区包含相同数量的员工 ID。所有 4 个分区并行执行并在 Task.WhenAll
之后连接在一起。有人可以给我一些进一步改进的提示吗?我查看了 ParallelForEachAsync and Parallel Foreach async in C# 但无法正常工作。下面提到的代码有效,但它被硬编码为分成 4 个分区。如何使最大并行度设置为 50 的动态分区更加并行?如果输入是100K ids,我想分成10个分区并并行执行所有10个。
public class Service
{
private async Task<List<EmployeeEntity>> GetInfo(List<long> input)
{
var breakup = input.Split(4);
var result1Task = GetResult(breakup.First().ToList());
var result2Task = GetResult(breakup.Skip(1).Take(1).First().ToList());
var result3Task = GetResult(breakup.Skip(2).Take(1).First().ToList());
var result4Task = GetResult(breakup.Skip(3).Take(1).First().ToList());
await Task.WhenAll(result1Task, result2Task, result3Task, result4Task);
List<EmployeeEntity> result1 = await result1Task;
List<EmployeeEntity> result2 = await result2Task;
List<EmployeeEntity> result3 = await result3Task;
List<EmployeeEntity> result4 = await result4Task;
return result1.Union(result2.Union(result3.Union(result4))).ToList();
}
private async Task<List<EmployeeEntity>> GetResult(List<long> employees)
{
using var context = new MyAppDBContext();
var EmployeeBand = await context.EmployeeBand.Where(x => employees.Contains(x.EmployeeId)).ToListAsync();
var EmployeeClient = await context.EmployeeClient.Where(x => employees.Contains(x.EmployeeId)).ToListAsync();
return await context.Employee.Where(x => employees.Contains(x.EmployeeId)).ToListAsync();
}
}
public static class ExtensionMethods
{
public static List<List<T>> Split<T>(this List<T> myList, int parts)
{
int i = 0;
var splits = from item in myList
group item by i++ % parts into part
select part.ToList();
return splits.ToList();
}
}
public class EmployeeEntity
{
public EmployeeEntity()
{
EmployeeBands = new HashSet<EmployeeBandEntity>();
EmployeeClients = new HashSet<EmployeeClientEntity>();
}
public long EmployeeId { get; set; }
public ICollection<EmployeeBandEntity> EmployeeBands { get; set; }
public ICollection<EmployeeClientEntity> EmployeeClients { get; set; }
}
public class EmployeeBandEntity
{
public long EmployeeBandId { get; set; }
public long EmployeeId { get; set; }
public EmployeeEntity EmployeeEntity { get; set; }
}
public class EmployeeClientEntity
{
public long EmployeeClientId { get; set; }
public long EmployeeId { get; set; }
public EmployeeEntity EmployeeEntity { get; set; }
}
public partial class MyAppDBContext : DbContext
{
public virtual DbSet<EmployeeEntity> Employee { get; set; }
public virtual DbSet<EmployeeBandEntity> EmployeeBand { get; set; }
public virtual DbSet<EmployeeClientEntity> EmployeeClient { get; set; }
}
我相信您可以对 GetResult 发挥真正的创意并以更好的方式重写,以便查询类似于 where id greater than (and/or less than)
而不是 ids in (... list)
。假设你的 GetResult 已经以最好的方式实现,你只需要一种方法来实现最大的并行执行,这是我的解决方案。
private async Task<List<EmployeeEntity>> GetInfo2(List<long> input)
{
if (input == null)
{
return null;
}
if (input.Count == 0)
{
return new List<EmployeeEntity>();
}
var taskList = new List<Task<List<EmployeeEntity>>>();
foreach (var batch in input.Batch(100))
{
taskList.Add(GetResult(batch.ToList()));
}
var result = (await Task.WhenAll(taskList)).SelectMany(a => a);
return result.ToList();
}
这需要下面的批量扩展方法。
public static class Extensions
{
public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int size)
{
T[] bucket = null;
var count = 0;
foreach (var item in source)
{
if (bucket == null)
bucket = new T[size];
bucket[count++] = item;
if (count != size)
continue;
yield return bucket.Select(x => x);
bucket = null;
count = 0;
}
if (bucket != null && count > 0)
yield return bucket.Take(count);
}
}
您可以找到批量大小的最佳点。我已将它硬编码为 100,但您可以根据输入列表的大小或您可能拥有的任何其他逻辑推导它。
这是管理任务的替代解决方案,无需编写我在之前的回答(在评论中)中建议的 semaphoreslim 东西。我遇到了这个 .net 开箱即用的解决方案,它使用 System.Threading.Tasks.Dataflow
命名空间中的 ActionBlock
。这负责在不使用信号量 slim 等的情况下将并发保持在所需的最大程度。
Batch
扩展仍然与我之前的回答相同。但对于第一部分,您将使用如下所示的 ActionBlock
(而不是 Task.WhenAll
)。
private async Task<List<EmployeeEntity>> GetInfo2(List<long> input)
{
if (input == null)
{
return null;
}
if (input.Count == 0)
{
return new List<EmployeeEntity>();
}
var employeeEntities = new BlockingCollection<EmployeeEntity>();
var actionBlock = new ActionBlock<List<long>>(async (employeeIds) =>
{
var employees = await GetResult(employeeIds);
employees.ForEach(e => employeeEntities.Add(e));
}, new ExecutionDataflowBlockOptions
{
//config this to whatever works best in your situation
MaxDegreeOfParallelism = 20
});
foreach (var batch in input.Batch(100))
{
await actionBlock.SendAsync(batch.ToList());
}
actionBlock.Complete();
await actionBlock.Completion;
return employeeEntities.ToList();
}