如何在 apache pig 输出文件中均匀分布数据?

How to evenly distribute data in apache pig output files?

我有一个 pig-latin 脚本,它接受一些 xml,使用 XPath UDF 提取一些字段,然后存储结果字段:

REGISTER udf-lib-1.0-SNAPSHOT.jar;
DEFINE XPath com.blah.udfs.XPath();

docs = LOAD '$input' USING com.blah.storage.XMLLoader('root') as (content:chararray);

results = FOREACH docs GENERATE XPath(content, 'root/id'), XPath(content, 'root/otherField'), content;

store results into '$output';

请注意,我们在集群上使用的是 pig-0.12.0,因此我从 pig-0.14.0 中删除了 XPath/XMLLoader 类 并将它们放入我自己的 jar 中,以便我可以在 0.12 中使用它们。

上面的脚本运行良好,并生成了我正在寻找的数据。但是,它会生成超过 1,900 个部分文件,每个文件只有几 mb。我了解了 default_parallel 选项,所以我将其设置为 128 以尝试获取 128 个零件文件。我最终不得不添加一块来强制减少阶段来实现这一点。我的脚本现在看起来像:

set default_parallel 128;
REGISTER udf-lib-1.0-SNAPSHOT.jar;
DEFINE XPath com.blah.udfs.XPath();

docs = LOAD '$input' USING com.blah.storage.XMLLoader('root') as (content:chararray);

results = FOREACH docs GENERATE XPath(content, 'root/id'), XPath(content, 'root/otherField'), content;

forced_reduce = FOREACH (GROUP results BY RANDOM()) GENERATE FLATTEN(results);
store forced_reduce into '$output';

同样,这会产生预期的数据。另外,我现在得到 128 个零件文件。我现在的问题是数据在零件文件中分布不均。有些有 8 场演出,有些有 100 MB。当按 RANDOM() :).

对它们进行分组时,我应该预料到这一点

我的问题是什么是限制部分文件数量但仍使它们大小均匀的首选方法?我是 pig/pig 拉丁语的新手,我假设我的做法完全错误。

p.s。我关心部分文件数量的原因是因为我想用 spark 处理输出,而我们的 spark 集群似乎在文件数量较少的情况下做得更好。

我仍在寻找直接从 pig 脚本执行此操作的方法,但目前我的 "solution" 是在处理 pig 脚本输出的 spark 进程中重新分区数据。我使用 RDD.coalesce 函数重新平衡数据。

从第一个代码片段开始,我假设它只是地图作业,因为您没有使用任何聚合。

不使用减速器,设置 属性 pig.maxCombinedSplitSize

    REGISTER udf-lib-1.0-SNAPSHOT.jar;
    DEFINE XPath com.blah.udfs.XPath();

    docs = LOAD '$input' USING com.blah.storage.XMLLoader('root') as (content:chararray);

    results = FOREACH docs GENERATE XPath(content, 'root/id'), XPath(content, 'root/otherField'), content;

    store results into '$output';        
exec;
        set pig.maxCombinedSplitSize 1000000000; -- 1 GB(given size in bytes)
        x = load '$output' using PigStorage();
        store x into '$output2' using PigStorage();

pig.maxCombinedSplitSize - 设置此 属性 将确保每个映射器读取大约 1 GB 的数据,并且以上代码用作身份映射器作业,这有助于您在 1GB 的部分文件块中写入数据。