对于 Hive MAPJOIN 作业,有多少数据被视为 "too large"?

How much data is considered "too large" for a Hive MAPJOIN job?

编辑:添加了更多文件大小详细信息和一些其他会话信息。

我有一个看似简单的 Hive JOIN 查询,令人惊讶的是它需要几个小时才能 运行。

SELECT a.value1, a.value2, b.value
FROM a
JOIN b ON a.key = b.key
WHERE a.keyPart BETWEEN b.startKeyPart AND B.endKeyPart;

我正在尝试确定我的数据集和 AWS 硬件选择的执行时间是否正常,或者我是否只是想连接太多数据。

来自 A 的记录总是与 B 中的一个或多个记录相匹配,因此从逻辑上讲,我看到在使用 WHERE 子句 p运行ed 之前最多生成 5000 亿行。

为该作业分配了 4 个映射器,该作业在 6 小时 内完成。这种类型的查询和配置是否正常?如果没有,我应该如何改进?

我在 JOIN 键上对 B 进行了分区,它产生了 5 个分区,但没有发现明显的改进。

此外,日志显示 Hive 优化器启动了本地映射连接任务,大概是为了缓存或流式传输较小的 table:

2016-02-07 02:14:13 Starting to launch local task to process map join;  maximum memory = 932184064
2016-02-07 02:14:16 Dump the side-table for tag: 1 with group count: 5 into file: file:/mnt/var/lib/hive/tmp/local-hadoop/hive_2016-02-07_02-14-08_435_7052168836302267808-1/-local-10003/HashTable-Stage-4/MapJoin-mapfile01--.hashtable
2016-02-07 02:14:17 Uploaded 1 File to: file:/mnt/var/lib/hive/tmp/local-hadoop/hive_2016-02-07_02-14-08_435_7052168836302267808-1/-local-10003/HashTable-Stage-4/MapJoin-mapfile01--.hashtable (12059634 bytes)
2016-02-07 02:14:17 End of local task; Time Taken: 3.71 sec.

是什么导致此作业 运行 缓慢?数据集看起来并不太大,"small-table" 大小远低于触发禁用 MAPJOIN 优化的 "small-table" 限制 25MB。

EXPLAIN 输出的转储是 copied on PasteBin 以供参考。

我的会话启用了输出和中间存储的压缩。这会是罪魁祸首吗?

SET hive.exec.compress.output=true;
SET hive.exec.compress.intermediate=true;
SET mapred.output.compress=true;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
SET io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec;
SET io.seqfile.compression.type=BLOCK;

我对这个问题的解决方案是完全在 JOIN ON 子句中表达 JOIN 谓词,因为这是在 Hive 中执行 JOIN 的最有效方式。至于为什么原来查询慢,我相信mappers只是需要时间逐行扫描中间数据集,100+十亿次。

由于 Hive 仅支持 JOIN ON 子句中的等式表达式并拒绝使用两个 table 别名作为参数的函数调用,因此无法将原始查询的 BETWEEN 子句重写为代数表达式。例如,下面的表达式是非法的。

-- Only handles exclusive BETWEEN
JOIN b ON a.key = b.key
AND sign(a.keyPart - b.startKeyPart) = 1.0  -- keyPart > startKeyPart
AND sign(a.keyPart - b.endKeyPart) = -1.0   -- keyPart < endKeyPart

我最终修改了我的源数据,以在 Hive ARRAY<BIGINT> 数据类型中包含 startKeyPartendKeyPart 之间的每个值。

CREATE TABLE LookupTable
    key BIGINT,
    startKeyPart BIGINT,
    endKeyPart BIGINT,
    keyParts ARRAY<BIGINT>;

或者,我可以使用自定义 Java 方法在我的查询中内联生成这个值; LongStream.rangeClosed() 方法仅在 Java 8 中可用,它不是 AWS emr-4.3.0 中 Hive 1.0.0 的一部分。

现在我在数组中有了整个键 space,我可以使用 LATERAL VIEW and explode() 将数组转换为 table,重写 JOIN 如下。

WITH b AS
(
    SELECT key, keyPart, value
    FROM LookupTable
    LATERAL VIEW explode(keyParts) keyPartsTable AS keyPart
)
SELECT a.value1, a.value2, b.value
FROM a
JOIN b ON a.key = b.key AND a.keyPart = b.keyPart;

最终结果是上面的查询大约需要 3 分钟 才能完成,而原来的 6 小时硬件配置。