如何在多个分区/服务器之间快速平均分配文件?

How can I quickly distribute files equally across multiple partitions / servers?

我正在写一个方法来跨多个分区平均复制文件进行处理。我目前所做的似乎工作正常,但我觉得可能有更好的方法来做。

关于这个过程我有以下问题:

  1. 有没有比我正在使用的方法(你知道的)更好的方法来按大小在分区之间均匀分布文件?

  2. 因为我正在跨多个服务器复制到多个分区,所以为文件复制实现多线程对我有好处吗?或者我仍然会受到传输这些文件的磁盘输出的限制吗?

我创建相等文件分组的方法如下:

    /// <summary>
    /// Distributes a list of files into groups based on their size.
    /// </summary>
    /// <param name="files">The list of files to distribute.</param>
    /// <param name="partitionCount">The number of partitions to distribute across.</param>
    /// <returns>A balanced array of file lists for each partition.</returns>
    public List<SourceFile>[] Distribute(List<SourceFile> files, int partitionCount)
    {
        long totalSize = files.Sum(sf => sf.Size);

        long groupGoal = totalSize / partitionCount;

        List<SourceFile> sourceFilesSorted = files.OrderByDescending(sf => sf.Size).ToList();
        List<SourceFile>[] groups = Enumerable.Range(0, partitionCount).Select(l => new List<SourceFile>()).ToArray();

        int nGroup = 0, attempt = 1;
        long acceptedGoal = groupGoal;
        while (sourceFilesSorted.Count > 0)
        {
            WriteLine("Attempt {0} at initial file grouping, tolerance {1}...", attempt++, acceptedGoal);
            bool anySuccess = false;
            foreach (SourceFile sf in sourceFilesSorted.ToList())
            {
                if (groups[nGroup].Count == 0)
                {
                    groups[nGroup].Add(sf);
                    sourceFilesSorted.Remove(sf);
                    anySuccess = true;
                    continue;
                }

                bool satisfied = false;
                while (!satisfied && nGroup < groups.Length)
                {
                    if (groups[nGroup].Sum(gf => gf.Size) + sf.Size <= acceptedGoal)
                    {
                        groups[nGroup].Add(sf);
                        sourceFilesSorted.Remove(sf);
                        anySuccess = true;
                        satisfied = true;
                    }
                    if (!satisfied)
                        nGroup++;
                }

                if (++nGroup >= groups.Length)
                {
                    nGroup = 0;
                }
            }

            if (sourceFilesSorted.Count > 0)
                groups = groups.OrderBy(g => g.Sum(gf => gf.Size)).ToArray();

            if (!anySuccess)
                acceptedGoal += groupGoal;
        }

        groups = groups.OrderByDescending(g => g.Sum(gf => gf.Size)).ToArray();

        attempt = 1;
        acceptedGoal = groupGoal;

        bool hasMove = true;

        while (hasMove)
        {
            WriteLine("Attempt {0} at moving larger group files into smaller groups...", attempt);

            WriteLine("There are {0} groups above tolerance: {1}", groups.Where(g => (g.Sum(gf => gf.Size) > acceptedGoal)).Count(), acceptedGoal);

            // Begin moving files in groups where acceptable.
            List<SourceFile>[] move = Enumerable.Range(0, groups.Length).Select(l => new List<SourceFile>()).ToArray();
            for (int i = 0; i < groups.Length && i < move.Length; i++)
            {
                // WriteLine("Group {0} sum: {1}", i + 1, groups[i].Sum(sf => sf.Size));

                if (groups[i].Sum(sf => sf.Size) <= acceptedGoal)
                    continue;

                foreach (SourceFile file in groups[i])
                {
                    if (groups.Where(g => (g.Sum(gf => gf.Size) + file.Size <= acceptedGoal)).Any())
                    {
                        move[i].Add(file);
                    }
                }
            }

            long moves = move.Sum(m => m.Count);
            hasMove = move.Any(m => m.Any());
            WriteLine("Found {0} moves, {1}", moves, hasMove ? "attempting to redistribute..." : "process complete.");
            for (int i = 0; i < groups.Length; i++)
            {
                for (int j = 0; j < move.Length; j++)
                {
                    foreach (SourceFile file in move[j].ToList())
                    {
                        if (groups[i].Sum(sf => sf.Size) + file.Size <= acceptedGoal)
                        {
                            groups[i].Add(file);
                            groups[j].Remove(file);
                            move[j].Remove(file);
                        }
                    }
                }
            }

            if (!hasMove && acceptedGoal == groupGoal)
            {
                var acceptedGroups = groups.Where(g => (g.Sum(gf => gf.Size) <= acceptedGoal));
                acceptedGoal = acceptedGroups.Sum(g => g.Sum(gf => gf.Size)) / acceptedGroups.Count();
                WriteLine("Lowering tolerance to {0} for {1} groups, continue distribution...", acceptedGoal, acceptedGroups.Count());
                hasMove = true;
            }
        }

        return groups;
    }

首先,我指定 acceptedGoal,这是我希望在每台服务器上实现的目标大小。这只是创建完美分布的所有文件大小的平均值。

在此之后,我按大小降序对文件列表进行排序,然后开始将它们添加到每个组中,当添加将使总大小大于 acceptedGoal 时跳过每个组。

一旦在迭代中没有成功添加,acceptedGoal 将增加初始值,这基本上只是增加每一轮的公差。在每次迭代开始之前,组总是从低到高排序,以确保新文件被添加到当前最小的组中,从而使总体方差尽可能低。


更新:我进行了更深入的研究,现在第二次遍历列表。这一次,它计算出一个新的、较低的 tolerance,这是已经低于初始可接受公差的组的平均值。

该进程将不断尝试将文件从超出容差的组移到低于容差的组中。

到目前为止,我已经将其降低到目标目标之外的非常低的方差,但我仍然不确定是否有更好的方法。


更新 2:感谢@Enigmativity,我能够将这一切再次重构为一个非常干净的 IEnumerable 方法:

    /// <summary>
    /// Distributes a list of files into groups based on their size.
    /// </summary>
    /// <param name="files">The list of files to distribute.</param>
    /// <param name="partitionCount">The number of partitions to distribute across.</param>
    /// <returns>A balanced array of file lists for each partition.</returns>
    public IEnumerable<List<SourceFile>> Distribute(List<SourceFile> files, int partitionCount)
    {
        // Calculate the max fileSize tolerance per partition (the "perfect" distribution size across each disk). 
        long tolerance = files.Sum(sf => sf.Size) / partitionCount;

        List<List<SourceFile>> groups = Enumerable.Range(0, partitionCount).Select(l => new List<SourceFile>()).ToList();

        // Process each file, large to small.
        foreach(var file in files.OrderByDescending(sf => sf.Size))
        {
            // Add file to the smallest current group.
            groups.OrderBy(g => g.Sum(f => f.Size)).First().Add(file);

            // If this group exceeds tolerance, return it now so we can begin processing immediately.
            List<List<SourceFile>> returnGroups = groups.Where(g => g.Sum(sf => sf.Size) > tolerance).ToList();
            foreach (var retGroup in returnGroups)
            {
                groups.Remove(retGroup);
                yield return retGroup;
            }
        }
        // Remember to return the rest of the groups, large to small.
        foreach(var retGroup in groups.OrderByDescending(g => g.Sum(sf => sf.Size)))
            yield return retGroup;
    }

现在我计划迭代此列表并在创建列表时在每个分区上执行副本。


我仍然很好奇多线程是否有助于更快地处理,因为这是在跨服务器的多个分区上复制的。 I/O 是否仍会受到其他复制进程的限制,因为磁盘实际上只需要读取数据并将其通过网络发送到其他分区。从这里开始,另一个分区将利用其自身磁盘上的写入速度(我认为),这使我相信多线程是一个好主意。这听起来正确还是我离题了?

如果有人有任何资源可以研究,那也将不胜感激。我在网上还没有找到太多我真正理解的关于这个主题的内容。

我认为这符合您的要求:

public List<SourceFile>[] Distribute(List<SourceFile> files, int partitionCount)
{
    List<SourceFile> sourceFilesSorted =
        files
            .OrderByDescending(sf => sf.Size)
            .ToList();

    List<SourceFile>[] groups =
        Enumerable
            .Range(0, partitionCount)
            .Select(l => new List<SourceFile>())
            .ToArray();

    foreach (var f in files)
    {
        groups
            .Select(grp => new { grp, size = grp.Sum(x => x.Size) })
            .OrderBy(grp => grp.size)
            .First()
            .grp
            .Add(f);
    }

    return groups;
}

循环效率相当低,但是 10,000 个文件的结果在不到一秒的时间内返回,所以我希望它足够快。如果需要,创建一个 运行 组大小的数组不会太难,但如果不需要效率,那只会使代码复杂化。