避免使用 hive.optimize.sort.dynamic.partition 选项的单个文件

Avoid single file with hive.optimize.sort.dynamic.partition option

我正在使用配置单元。

当我使用 INSERT 查询写入动态分区并打开 hive.optimize.sort.dynamic.partition 选项(SET hive.optimize.sort.dynamic.partition=true)时,每个分区中始终只有一个文件。

但是如果我关闭该选项(SET hive.optimize.sort.dynamic.partition=false),我会遇到这样的内存不足异常。

TaskAttempt 3 failed, info=[Error: Error while running task ( failure ) : attempt_1534502930145_6994_1_01_000008_3:java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space
        at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:194)
        at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:168)
        at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:370)
        at org.apache.tez.runtime.task.TaskRunner2Callable.run(TaskRunner2Callable.java:73)
        at org.apache.tez.runtime.task.TaskRunner2Callable.run(TaskRunner2Callable.java:61)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
        at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:61)
        at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:37)
        at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
        at org.apache.parquet.column.values.dictionary.IntList.initSlab(IntList.java:90)
        at org.apache.parquet.column.values.dictionary.IntList.<init>(IntList.java:86)
        at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:93)
        at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainBinaryDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:229)
        at org.apache.parquet.column.ParquetProperties.dictionaryWriter(ParquetProperties.java:131)
        at org.apache.parquet.column.ParquetProperties.dictWriterWithFallBack(ParquetProperties.java:178)
        at org.apache.parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:203)
        at org.apache.parquet.column.impl.ColumnWriterV1.<init>(ColumnWriterV1.java:83)
        at org.apache.parquet.column.impl.ColumnWriteStoreV1.newMemColumn(ColumnWriteStoreV1.java:68)
        at org.apache.parquet.column.impl.ColumnWriteStoreV1.getColumnWriter(ColumnWriteStoreV1.java:56)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:184)
        at org.apache.parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:376)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:109)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:99)
        at org.apache.parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:100)
        at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:327)
        at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:288)
        at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.<init>(ParquetRecordWriterWrapper.java:67)
        at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:128)
        at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:117)
        at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:286)
        at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:271)
        at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createBucketForFileIdx(FileSinkOperator.java:619)
        at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createBucketFiles(FileSinkOperator.java:563)
        at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createNewPaths(FileSinkOperator.java:867)
        at org.apache.hadoop.hive.ql.exec.FileSinkOperator.getDynOutPaths(FileSinkOperator.java:975)
        at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:715)
        at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:897)
        at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:95)
        at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource$GroupIterator.next(ReduceRecordSource.java:356)
        at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource.pushRecord(ReduceRecordSource.java:287)
        at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor.run(ReduceRecordProcessor.java:317)
]], Vertex did not succeed due to OWN_TASK_FAILURE, failedTasks:1 killedTasks:299, Vertex vertex_1534502930145_6994_1_01 [Reducer 2] killed/failed due to:OWN_TASK_FAILURE]Vertex killed, vertexName=Map 1, vertexId=vertex_1534502930145_6994_1_00, diagnostics=[Vertex received Kill while in RUNNING state., Vertex did not succeed due to OTHER_VERTEX_FAILURE, failedTasks:0 killedTasks:27, Vertex vertex_1534502930145_6994_1_00 [Map 1] killed/failed due to:OTHER_VERTEX_FAILURE]DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:1

我想引发此异常是因为 reducer 同时写入多个分区。但我找不到如何控制它。我关注了this article,但对我没有帮助。

我的环境是这样的:

这是我的示例查询。

SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.optimize.sort.dynamic.partition=true;
SET hive.exec.reducers.bytes.per.reducer=1048576;
SET mapred.reduce.tasks=300;
FROM raw_data
INSERT OVERWRITE TABLE idw_data
  PARTITION(event_timestamp_date)
  SELECT
    *
  WHERE 
    event_timestamp_date BETWEEN '2018-09-09' AND '2018-10-09' 
DISTRIBUTE BY event_timestamp_date
;

distribute by partition key 有助于解决 OOM 问题,但此配置可能会导致每个 reducer 写入整个分区,具体取决于 hive.exec.reducers.bytes.per.reducer 配置,默认情况下可以设置非常高的值,例如 1Gb。 distribute by partition key可能会导致额外的reduce阶段,hive.optimize.sort.dynamic.partition也是如此。

因此,为了避免 OOM 并实现最佳性能:

  1. 在插入查询的末尾添加 distribute by partition key,这将导致相同的分区键由相同的 reducer 处理。或者,除此设置外,您还可以使用 hive.optimize.sort.dynamic.partition=true
  2. hive.exec.reducers.bytes.per.reducer 设置为将 如果一个分区中的数据太多,则触发更多的 reducer。只需检查 hive.exec.reducers.bytes.per.reducer 的当前值并相应地减少或增加它以获得适当的减速器并行性。此设置将决定单个减速器将处理多少数据以及每个分区将创建多少文件。

示例:

set hive.exec.reducers.bytes.per.reducer=33554432;

insert overwrite table partition (load_date)
select * from src_table
distribute by load_date;

另请参阅有关控制映射器和缩减器数量的答案:

终于找到问题了

首先,执行引擎是tez。 mapreduce.reduce.memory.mb 选项没有帮助。您应该使用 hive.tez.container.size 选项。写入动态分区时,reducer 开启多个record writers。 Reducer 需要足够的内存来同时写入多个分区。

如果你使用 hive.optimize.sort.dynamic.partition 选项,全局分区排序是 运行 但排序意味着有 reducer。在这种情况下,如果没有另一个 reducer 任务,则每个分区都由一个 reducer 处理。这就是分区中只有一个文件的原因。 DISTRIBUTE BY make more reduce tasks, so it can make more files in each partition, but there as the memory problem.

因此,容器内存大小真的很重要!不要忘记使用 hive.tez.container.size 选项来更改 tez 容器内存大小!