如何有效地在庞大的代码库中搜索多个文字字符串?

How to search vast code base for multiple literal strings efficiently?

这个问题是

的后续问题

源代码在这里 - https://github.com/MarkKharitonov/LearningTPLDataFlow

鉴于:

我想生成一个 JSON 文件,列出源代码中出现的所有文字。对于每条匹配的行,我都希望获得以下信息:

并且所有记录都排列成字典,由各自的文字键入。

所以挑战在于尽可能高效地完成它(当然是在 C# 中)。

可以在此文件中找到 DataFlow 管道 - https://github.com/MarkKharitonov/LearningTPLDataFlow/blob/master/FindStringCmd.cs

这里是:

private void Run(string workspaceRoot, string outFilePath, string[] literals, bool searchAllFiles, int workSize, int maxDOP1, int maxDOP2, int maxDOP3, int maxDOP4)
{
    var res = new SortedDictionary<string, List<MatchingLine>>();
    var projects = (workspaceRoot + "build\projects.yml").YieldAllProjects();
    var progress = new Progress();

    var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, Environment.ProcessorCount);

    var produceCSFiles = new TransformManyBlock<ProjectEx, CSFile>(p => YieldCSFiles(p, searchAllFiles), new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDOP1
    });
    var produceCSFileContent = new TransformBlock<CSFile, CSFile>(CSFile.PopulateContentAsync, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDOP2
    });
    var produceWorkItems = new TransformManyBlock<CSFile, (CSFile CSFile, int Pos, int Length)>(csFile => csFile.YieldWorkItems(literals, workSize, progress), new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDOP3,
        TaskScheduler = taskSchedulerPair.ConcurrentScheduler
    });
    var produceMatchingLines = new TransformManyBlock<(CSFile CSFile, int Pos, int Length), MatchingLine>(o => o.CSFile.YieldMatchingLines(literals, o.Pos, o.Length, progress), new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDOP4,
        TaskScheduler = taskSchedulerPair.ConcurrentScheduler
    });
    var getMatchingLines = new ActionBlock<MatchingLine>(o => AddResult(res, o));

    var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

    produceCSFiles.LinkTo(produceCSFileContent, linkOptions);
    produceCSFileContent.LinkTo(produceWorkItems, linkOptions);
    produceWorkItems.LinkTo(produceMatchingLines, linkOptions);
    produceMatchingLines.LinkTo(getMatchingLines, linkOptions);

    var progressTask = Task.Factory.StartNew(() =>
    {
        var delay = literals.Length < 10 ? 1000 : 10000;
        for (; ; )
        {
            var current = Interlocked.Read(ref progress.Current);
            var total = Interlocked.Read(ref progress.Total);
            Console.Write("Total = {0:n0}, Current = {1:n0}, Percents = {2:P}   \r", total, current, ((double)current) / total);
            if (progress.Done)
            {
                break;
            }
            Thread.Sleep(delay);
        }
        Console.WriteLine();
    }, TaskCreationOptions.LongRunning);

    projects.ForEach(p => produceCSFiles.Post(p));
    produceCSFiles.Complete();
    getMatchingLines.Completion.GetAwaiter().GetResult();
    progress.Done = true;
    progressTask.GetAwaiter().GetResult();

    res.SaveAsJson(outFilePath);
}

默认参数为(https://github.com/MarkKharitonov/LearningTPLDataFlow/blob/master/FindStringCmd.cs#L24-L28):

private int m_maxDOP1 = 3;
private int m_maxDOP2 = 20;
private int m_maxDOP3 = Environment.ProcessorCount;
private int m_maxDOP4 = Environment.ProcessorCount;
private int m_workSize = 1_000_000;

我的想法是将工作划分为工作项,其中工作项的大小是通过将相应文件中的行数乘以字符串文字的计数来计算的。因此,如果 C# 文件包含 500 行,则在其中搜索所有 3401 个文字会导致大小为 3401 * 500 = 1700500

工作单元默认为 1000000 行,因此在上述示例中,文件将产生 2 个工作项:

  1. 文字 0..1999
  2. 字面量 2000..3400

produceWorkItems 块负责从文件生成这些工作项。

示例运行:

C:\work\TPLDataFlow [master ≡]> .\bin\Debug\net5.0\TPLDataFlow.exe find-string -d C:\xyz\tip -o c:\temp -l C:\temp.txt
Locating all the instances of the 4 literals found in the file C:\temp.txt in the C# code ...
Total = 49,844,516, Current = 49,702,532, Percents = 99.72%
Elapsed: 00:00:18.4320676
C:\work\TPLDataFlow [master ≡]> .\bin\Debug\net5.0\TPLDataFlow.exe find-string -d C:\xyz\tip -o c:\temp -l C:\temp.txt
Locating all the instances of the 3401 literals found in the file c:\temp.txt in the C# code ...
Total = 42,379,095,775, Current = 42,164,259,870, Percents = 99.49%
Elapsed: 01:44:13.4289270

问题

许多工作项目都不够大。如果我有 3 个 C# 文件,每个 20 行,我当前的代码将生成 3 个工作项,因为在我当前的实现中,工作项永远不会跨越文件边界。这是低效的。理想情况下,它们会被分批到一个工作项中,因为 60 * 3401 = 204060 < 1000000。但是 BatchBlock 不能在这里使用,因为它希望我提供批量大小,我不知道 - 它取决于管道中的工作项。

您将如何实现这种批处理?

关于第一个问题(配置管道),我真的无法提供任何指导。优化数据流管道的参数对我来说似乎是一种魔法!

关于第二个问题(如何在编译时批处理由大小未知的工作项组成的工作负载),您可以使用下面的自定义BatchBlock<T>。它使用 DataflowBlock.Encapsulate 方法将两个数据流块合并为一个。 ActionBlock<T> 中的第一个块接收输入并将其放入缓冲区,第二个块是 BufferBlock<T[]> 保存批处理的项目并将它们传播到下游。 weightSelector 是一个 lambda,表示 returns 每个收到的物品的重量。当累计重量超过batchWeight阈值时,发出一批。

public static IPropagatorBlock<T, T[]> CreateDynamicBatchBlock<T>(
    int batchWeight, Func<T, int> weightSelector,
    DataflowBlockOptions options = null)
{
    // Arguments validation omitted
    options ??= new DataflowBlockOptions();
    var outputBlock = new BufferBlock<T[]>(options);
    List<T> buffer = new List<T>();
    int sumWeight = 0;

    var inputBlock = new ActionBlock<T>(async item =>
    {
        checked
        {
            int weight = weightSelector(item);
            if (weight + sumWeight > batchWeight && buffer.Count > 0)
                await SendBatchAsync();
            buffer.Add(item);
            sumWeight += weight;
            if (sumWeight >= batchWeight) await SendBatchAsync();
        }
    }, new()
    {
        BoundedCapacity = options.BoundedCapacity,
        CancellationToken = options.CancellationToken,
        TaskScheduler = options.TaskScheduler,
        MaxMessagesPerTask = options.MaxMessagesPerTask,
        NameFormat = options.NameFormat
    });

    PropagateCompletion(inputBlock, outputBlock, async () =>
    {
        if (buffer.Count > 0) await SendBatchAsync();
    });

    Task SendBatchAsync()
    {
        var batch = buffer.ToArray();
        buffer.Clear();
        sumWeight = 0;
        return outputBlock.SendAsync(batch);
    }

    static async void PropagateCompletion(IDataflowBlock source,
        IDataflowBlock target, Func<Task> postCompletionAction)
    {
        try { await source.Completion.ConfigureAwait(false); } catch { }
        Exception ex =
            source.Completion.IsFaulted ? source.Completion.Exception : null;
        try { await postCompletionAction(); }
        catch (Exception actionError) { ex = actionError; }
        if (ex != null) target.Fault(ex); else target.Complete();
    }

    return DataflowBlock.Encapsulate(inputBlock, outputBlock);
}

用法示例:

var batchBlock = CreateDynamicBatchBlock<WorkItem>(1_000_000, wi => wi.Size);

如果权重int类型范围不够溢出,可以换成longdouble

我意识到了一些事情。也许这很明显,但我刚刚弄明白了。如果可以先缓冲所有项目,则 TPL DataFlow 库没有用。所以就我而言 - 我可以做到。因此,我可以对项目进行缓冲和排序,从大到小。这样一个简单的 Parallel.ForEach 就可以很好地完成工作。意识到我改变了我的实现以像这样使用 Reactive:

阶段 1 - 获取所有物品,这是所有 IO 所在的地方

var input = (workspaceRoot + "build\projects.yml")
    .YieldAllProjects()
    .ToObservable()
    .Select(project => Observable.FromAsync(() => Task.Run(() => YieldFiles(project, searchAllFiles))))
    .Merge(2)
    .SelectMany(files => files)
    .Select(file => Observable.FromAsync(file.PopulateContentAsync))
    .Merge(10)
    .ToList()
    .GetAwaiter().GetResult()
    .AsList();

input.Sort((x, y) => y.EstimatedLineCount - x.EstimatedLineCount);

第 2 阶段 - 找到所有匹配行(仅限 CPU)

var res = new SortedDictionary<string, List<MatchingLine>>();
input
    .ToObservable()
    .Select(file => Observable.FromAsync(() => Task.Run(() => file.YieldMatchingLines(literals, 0, literals.Count, progress).ToList())))
    .Merge(maxDOP.Value)
    .ToList()
    .GetAwaiter().GetResult()
    .SelectMany(m => m)
    .ForEach(m => AddResult(res, m));

所以,即使我有数百个项目、数千个文件和数百万行代码——这不是 TPL DataFlow 的规模,因为我的工具可以将所有文件读入内存,按有利的顺序重新排列并然后处理。