在 Hive 上使用 Parquet 提高写入并行度
Increasing write parallelism with Parquet on Hive
tl;dr - 我正在 Hive 上将大量数据写入新的 Parquet 格式 table,但该作业使用的 reducer 比指定的要少得多,这使得写入花费的时间比我想要的长得多.
我正在构建一个数据湖 table 旨在使用 Spark 创建快速读取,但我正在使用 hive 写入数据,因此 a) bucketed tables 可以由 hive 读取并且b) 这样我就可以将统计信息写入配置单元 Metastore。
我从 python 创建 table 像这样:
hivecur.execute("set hive.cbo.enable=true")
hivecur.execute("set hive.compute.query.using.stats=true")
hivecur.execute("set hive.stats.fetch.column.stats=true")
hivecur.execute("set hive.stats.fetch.partition.stats=true")
hivecur.execute("set hive.vectorized.execution.enabled=true")
hivecur.execute("set hive.vectorized.execution.reduce.enabled=true")
hivecur.execute('set mapred.reduce.tasks=100')
hivecur.execute(f"set dfs.block.size={1024*1024*128}")
hivecur.execute(f"set parquet.block.size={1024*1024*128}")
hivecur.execute(f"drop table if exists {TABLE_NAME}")
table_create_qry = f"""
create table {TABLE_NAME} (
{schema.dice}
)
partitioned by (process_date_z int, dataset string)
clustered by (id) sorted by (source_id, type, id) into 200 buckets
stored as parquet
TBLPROPERTIES ("comment" = "{git_hash()}",
"parquet.compress" = "snappy")
然后当我插入时:
qry = f"""
insert overwrite table {TABLE_NAME} partition (process_date_z, dataset)
select ...
source_id,
process_date_z,
'{dataset}' as dataset
from {source_table}
where process_date_z = {d}
and pmod(hash(id),100) in ({",".join([str(x) for x in id_filters])})"""
通过设置mapred.reduce.tasks=100
,我希望我能强制每个分区包含 100 个文件。相反,虽然创建了 100 个任务,但 92 个任务很快完成,8 个 reduce 任务 运行 更长,写入低十个(但不是 100 个)大致相等大小的文件。
这个问题是减少是写入过程中的一个重要瓶颈。我可以设置什么参数来提高性能?
我认为我的问题来自对哈希函数的愚蠢选择。
我怀疑用于按 ID 存储桶的算法与我用于对 ID 进行子集化的散列相同,因此它为所有可能的输入 ID 创建了一个存储桶,但 pmod WHERE 只让它填充了几个。
为了解决这个问题,我用 brickhouse 的 Murmurhash3 UDF 切换了 pmod 中的哈希。
tl;dr - 我正在 Hive 上将大量数据写入新的 Parquet 格式 table,但该作业使用的 reducer 比指定的要少得多,这使得写入花费的时间比我想要的长得多.
我正在构建一个数据湖 table 旨在使用 Spark 创建快速读取,但我正在使用 hive 写入数据,因此 a) bucketed tables 可以由 hive 读取并且b) 这样我就可以将统计信息写入配置单元 Metastore。
我从 python 创建 table 像这样:
hivecur.execute("set hive.cbo.enable=true")
hivecur.execute("set hive.compute.query.using.stats=true")
hivecur.execute("set hive.stats.fetch.column.stats=true")
hivecur.execute("set hive.stats.fetch.partition.stats=true")
hivecur.execute("set hive.vectorized.execution.enabled=true")
hivecur.execute("set hive.vectorized.execution.reduce.enabled=true")
hivecur.execute('set mapred.reduce.tasks=100')
hivecur.execute(f"set dfs.block.size={1024*1024*128}")
hivecur.execute(f"set parquet.block.size={1024*1024*128}")
hivecur.execute(f"drop table if exists {TABLE_NAME}")
table_create_qry = f"""
create table {TABLE_NAME} (
{schema.dice}
)
partitioned by (process_date_z int, dataset string)
clustered by (id) sorted by (source_id, type, id) into 200 buckets
stored as parquet
TBLPROPERTIES ("comment" = "{git_hash()}",
"parquet.compress" = "snappy")
然后当我插入时:
qry = f"""
insert overwrite table {TABLE_NAME} partition (process_date_z, dataset)
select ...
source_id,
process_date_z,
'{dataset}' as dataset
from {source_table}
where process_date_z = {d}
and pmod(hash(id),100) in ({",".join([str(x) for x in id_filters])})"""
通过设置mapred.reduce.tasks=100
,我希望我能强制每个分区包含 100 个文件。相反,虽然创建了 100 个任务,但 92 个任务很快完成,8 个 reduce 任务 运行 更长,写入低十个(但不是 100 个)大致相等大小的文件。
这个问题是减少是写入过程中的一个重要瓶颈。我可以设置什么参数来提高性能?
我认为我的问题来自对哈希函数的愚蠢选择。
我怀疑用于按 ID 存储桶的算法与我用于对 ID 进行子集化的散列相同,因此它为所有可能的输入 ID 创建了一个存储桶,但 pmod WHERE 只让它填充了几个。
为了解决这个问题,我用 brickhouse 的 Murmurhash3 UDF 切换了 pmod 中的哈希。