分批循环遍历 List<T>,同时确保每批的项目都是唯一的

Looping through a List<T> in batches while ensuring items per batch are unique

上下文:我有一个应用程序允许用户处理当天收到的所有邮寄付款。有时一个信封可能包含同一个帐户的多张支票(想想两个室友各自支付他们的部分水电费)。

限制:以 10 个为一组处理所有付款,但每批帐户 ID 必须是唯一的。

非常简单的付款方式class:

public class Payment
{
    public int AccountId { get; set; }
    // ... other properties not important
}

今天通过邮件收到的假设收款。请注意,最后两个 AccountId 值是可接受的重复值:

List<Payment> payments = new List<Payment>()
{
    new Payment() {AccountId = 1 },
    new Payment() {AccountId = 2 },
    new Payment() {AccountId = 3 },
    new Payment() {AccountId = 4 },
    new Payment() {AccountId = 5 },
    new Payment() {AccountId = 1 }, // Duplicate Account
    new Payment() {AccountId = 2 }  // Duplicate Account

    // likely hundreds more unique accounts, possibly even some more duplicates...
};

我正在使用 MoreLinq 尝试 select 每批不同的帐户,但下面的代码显然不起作用。我觉得我很接近,但一直无法找到可行的解决方案。同样,目标是将所有付款分成 N 个批次,而不复制该批次中的 AccountId。重复的 AccountId 必须分布在其他批次中,这样在尝试更新客户的余额时它们就不会导致竞争条件。

为清楚起见编辑了代码注释。

int batchSize = 10;
var paymentTasks = new List<Task>(batchSize);

// This linq expression is the heart of my question: How to divide the payments
// into batches while ensuring uniqueness of a particular key(s). This expression
// is close, but the DistinctBy() is obviously excluding the duplicates that
// I just intend to be distinct for that Batch(batchSize).
foreach (IEnumerable<Payment> batchOfPayments in payments.DistinctBy(a => a.AccountId).Batch(batchSize))
{
    // The rest of this method is for context only

    paymentTasks.Clear();

    foreach (Payment payment in batchOfPayments)
    {
        // Async method implementation not important
        Task paymentTask = ProcessPaymentAsync(payment);
        paymentTasks.Add(paymentTask);
    }

    // Await all the tasks in this batch to complete before starting the next batch
    await Task.WhenAll(paymentTasks);
}

感谢您抽空查看我的问题。

如果我完全理解这个问题,那么有很多方法可以做到这一点,最好的解决方案取决于您的实际需求。

假设是:

  1. 您所描述的是记忆中的方法
  2. 不需要访问数据库
  3. 不需要是生产者消费者

然后一个非常简单(但有效)的批处理和队列模式可以使用最少的分配。

给定

public class Payment
{
   public int AccountId { get; set; }
   public Payment(int accountId) => AccountId = accountId;
}

public static IEnumerable<Payment[]> GetBatches(IEnumerable<Payment> source, int count)
{
   var hashset = new HashSet<int>(count);
   var batch = new List<Payment>(count);
   var leftOvers = new Queue<Payment>();

   while (true)
   {
      foreach (var item in source)
      {
         // check if batched
         if (hashset.Add(item.AccountId))
            batch.Add(item); // add to batch
         else
            leftOvers.Enqueue(item); // add to left overs

         // if we are at the batch size start a loop
         while (batch.Count == count)
         {
            yield return batch.ToArray(); // return the batch

            batch.Clear();
            hashset.Clear();

            // check the left overs
            while (leftOvers.Any() && batch.Count != count)
               if (hashset.Add(leftOvers.Peek().AccountId)) // check the batch
                  batch.Add(leftOvers.Dequeue());
               else break; // we still have a duplicate bail
         }
      }

      if(batch.Any()) yield return batch.ToArray();

      if (!leftOvers.Any()) break;

      source = leftOvers.ToList(); // allocation  :(
      hashset.Clear();
      batch.Clear();
      leftOvers.Clear();
   }
}

注意 :这是相当节省资源的,尽管在处理纯剩菜时它可能有额外不必要的小分配,我我确信这可以被删除,尽管我会把它留给你。您还可以通过使用可以轻松转变为消费者的渠道来提高效率

测试

var list = new List<Payment>() {new(1), new(2), new(3), new(4), new(4), new(5), new(6), new(4), new(4), new(6), new(4)};

var batches = GetBatches(list, 3);

foreach (var batch in batches)
   Console.WriteLine(string.Join(", ",batch.Select(x => x.AccountId)));

输出

1, 2, 3
4, 5, 6
4, 6
4
4
4

Full demo here to Play with

请注意,这假设这不是真实世界的示例,因为它绝对不能防止接收端出现重复!这只是一种不重复地分发前端加载的批次而只循环一次集合的方法。

它依赖于按需创建桶和跟踪整个桶列表中的位置,以及当前目标桶本身。此外,它使用自定义 EqualityComparer 在每个单独的存储桶中启用 O(1) 次付款查找。

public async Task Implementation(List<Payment> payments)
{
    int batchSize = 10;
    var paymentTasks = new List<Task>();
    // use a custom equality comparer to allow for O(1) lookups
    // of payments based on AccountId
    var buckets = new List<HashSet<Payment>>();

    var incompletedBucketPosition = 0;
    var currentBucketPosition = 0;

    foreach (var payment in payments)
    {
        var isAdded = false;
        while (!isAdded)
        {
            if (currentBucketPosition >= buckets.Count)
            {
                // if we have run out of buckets, we need to
                // create a new one.
                // You could also create buckets ahead of time but
                // without knowing how many duplicates to expect,
                // this could be more efficient.
                buckets.Add(new HashSet<Payment>(batchSize));
            }
            
            var currentBucket = buckets[currentBucketPosition];

            if (currentBucket.Count >= batchSize)
            {   
                // our batch is complete. Advance to the next incomplete bucket
                // and reset our currentBucketPosition to the incomplete position.
                currentBucketPosition = ++incompletedBucketPosition;
            }
            else
            {
                if (currentBucket.Contains(payment, new PaymentComparer()))
                {
                    // we can't use this batch since our new payment would be a dupe
                    // advance to the next bucket position.
                    currentBucketPosition++;
                }
                else
                {
                    currentBucket.Add(payment);
                    isAdded = true;
                    // since we've successfully added an element,
                    // reset the currentBucketPosition to the last 
                    // incomplete bucket.
                    currentBucketPosition = incompletedBucketPosition;
                }
            }
        }
    }
    
    paymentTasks.AddRange(buckets.Select(b => ProcessPaymentAsync(b)));
    await Task.WhenAll(paymentTasks);
}

public async Task ProcessPaymentAsync(HashSet<Payment> paymentBatch)
{
    Console.WriteLine(string.Join(',', paymentBatch.Select(b => b.AccountId)));
    await Task.FromResult(0);
}

public class Payment
{
    public int AccountId { get; set; }
}

public class PaymentComparer : IEqualityComparer<Payment>
{
    public bool Equals(Payment x, Payment y)
    {
        return x?.AccountId == y?.AccountId;
    }

    public int GetHashCode(Payment obj)
    {
        return obj?.AccountId.GetHashCode() ?? 0;
    }
}

您可以使用以下方法对其进行测试:

void Main()
{
    List<Payment> payments = new List<Payment>()
    {
        new Payment() {AccountId = 1 },
        new Payment() {AccountId = 2 },
        new Payment() {AccountId = 3 },
        new Payment() {AccountId = 4 },
        new Payment() {AccountId = 5 },
        new Payment() {AccountId = 1 }, // Duplicate Account
        new Payment() {AccountId = 2 },  // Duplicate Account
        new Payment() {AccountId = 6 },
        new Payment() {AccountId = 7 },
        new Payment() {AccountId = 8 },
        new Payment() {AccountId = 9 },
        new Payment() {AccountId = 10 },
        new Payment() {AccountId = 6 },
        new Payment() {AccountId = 11 },
        new Payment() {AccountId = 12 },
        new Payment() {AccountId = 13 },
        new Payment() {AccountId = 14 },
        new Payment() {AccountId = 15 },
        new Payment() {AccountId = 15 },
        new Payment() {AccountId = 16 },
        new Payment() {AccountId = 17 },
        new Payment() {AccountId = 18 },
        new Payment() {AccountId = 6 },
        // likely hundreds more unique accounts, possibly even some more duplicates...
    };
    
    Implementation(payments).GetAwaiter().GetResult();  
}

输出:

1,2,3,4,5,6,7,8,9,10
1,2,6,11,12,13,14,15,16,17
15,18,6

我只是来 post 我的@TheGeneral 的回答版本。编写原始解决方案的所有功劳都归于他。

我的版本略有不同,因为我已将其完全通用以在 OP 的直接用例之外使用,见下文:

public static IEnumerable<IEnumerable<TSource>> BatchDuplicate<TSource, TKey>(
    this IEnumerable<TSource> source, 
    Func<TSource, TKey> keySelector,
    int count
)
{
var hashset = new HashSet<TKey>(count);
var batch = new List<TSource>(count);
var leftOvers = new Queue<TSource>();

while (true)
{
    foreach (var item in source)
    {
        // check if batched
        if (hashset.Add(keySelector(item)))
        {
            batch.Add(item); // add to batch
        }
        else
        {
            leftOvers.Enqueue(item); // add to left overs
        }

        // if we are at the batch size start a loop
        while (batch.Count == count)
        {
            yield return batch.ToArray(); // return the batch

            batch.Clear();
            hashset.Clear();

            // check the left overs
            while (leftOvers.Any() && batch.Count != count)
            if (hashset.Add(keySelector(leftOvers.Peek())))
            {
                batch.Add(leftOvers.Dequeue());
            } // check the batch
            else break; // we still have a duplicate bail
        }
    }

    if(batch.Any()) yield return batch.ToArray();

    if (!leftOvers.Any()) break;

    source = leftOvers.ToList(); // allocation  :(
    hashset.Clear();
    batch.Clear();
    leftOvers.Clear();
}

用法:

class MyModel
{
    public string MyKey { get; set; }
}

// // IEnumerable<MyModel>
someEnumerable.BatchDuplicate(item => item.MyKey, 10);

这允许该方法用于任何模型,只要您提供模型键的选择器