嵌套GNU Parallel处理多个大文件,拆分每个文件数据作为队列处理

nesting GNU Parallel to process multiple huge files and split each file data to be processed as queue

我有一个包含将近 100 个日志文件的目录,每个日志文件的大小为 10~15 GB。需求是逐行读取每个文件(顺序无关紧要),清理json行并dump到后端elasticsearch存储进行索引

这是我做这项工作的工人

# file = worker.php

echo " -- New PHP Worker Started -- "; // to get how many times gnu-parallel initiated the worker
$dataSet = [];

while (false !== ($line = fgets(STDIN))) {

    // convert line text to json
    $l = json_decode($line);
    $dataSet[] = $l;

    if(sizeof($dataSet) >= 1000) {
        //index json to elasticsearch
        $elasticsearch->bulkIndex($dataSet);
        $dataSet = []; 
    }
}

在答案的帮助下 and here 我快到了,它正在工作(有点),但只需要确保在引擎盖下它实际上正在做我假设它正在做的事情.

只要一个文件我就可以处理如下

parallel --pipepart -a 10GB_input_file.txt  --round-robin php worker.php 

效果很好。添加 --round-robin 确保 php 工作进程只启动一次,然后它只是继续接收数据作为管道(穷人的队列)。

所以对于 4CPU 机器,它会启动 4 个 php worker 并非常快速地处理所有数据。

要对所有文件执行相同的操作,这是我的看法

find /data/directory -maxdepth 1 -type f | parallel cat | parallel --pipe -N10000 --round-robin php worker.php 

这看起来有点像工作,但我有一种直觉,这是为所有文件并行嵌套的错误方式。

其次,因为它不能使用 --pipepart,所以我认为它比较慢。

第三,作业完成后,我看到在一台 4cpu 机器上,只有 4 个 worker 被启动并且作业完成了。这是正确的行为吗?它不应该为每个文件启动 4 个工作人员吗?只是想确保我没有遗漏任何数据。

知道如何以更好的方式完成这项工作吗?

如果它们的大小大致相同,为什么不简单地给每个文件一个文件:

find /data/directory -maxdepth 1 -type f |
  parallel php worker.php '<' {}

另一种方法是对它们中的每一个使用 --pipepart

do_one() {
  parallel --pipepart -a "" --block -1 php worker.php
}
export -f do_one
find /data/directory -maxdepth 1 -type f | parallel -j1 do_one

如果启动时间不长php worker.php那么最后一个可能更可取,因为如果文件大小相差很大,它会分布得更均匀,因此如果最后一个文件很大,您最终不会等待单个进程完成处理。