如何并行优化 spark sql 到 运行
How to optimize spark sql to run it in parallel
我是一个 spark 新手,我有一个简单的 spark 应用程序使用 Spark SQL/hiveContext 到:
- select 来自配置单元的数据 table(10 亿行)
- 做一些过滤,聚合包括 row_number over window function to select first row, group by, count() and max(), etc.
- 将结果写入HBase(亿行)
我在 yarn 集群(100 个执行程序)上将作业提交给 运行,它很慢,当我查看 Spark UI 中的 DAG 可视化时,似乎只有配置单元 table 扫描任务是 运行ning 并行的,其余步骤 #2 和上面的 #3 仅在一个实例中 运行ning 可能应该能够优化并行化?
应用程序看起来像:
第 1 步:
val input = hiveContext
.sql(
SELECT
user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
FROM
(
SELECT
user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
, row_number() over
(partition by user_id, address, phone_number, first_name, last_name order by user_id, address, phone_number, first_name, last_name, server_ts desc, age) AS rn
FROM
(
SELECT
user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
FROM
table
WHERE
phone_number <> '911' AND
server_date >= '2015-12-01' and server_date < '2016-01-01' AND
user_id IS NOT NULL AND
first_name IS NOT NULL AND
last_name IS NOT NULL AND
address IS NOT NULL AND
phone_number IS NOT NULL AND
) all_rows
) all_rows_with_row_number
WHERE rn = 1)
val input_tbl = input.registerTempTable(input_tbl)
第 2 步:
val result = hiveContext.sql(
SELECT state,
phone_number,
address,
COUNT(*) as hash_count,
MAX(server_ts) as latest_ts
FROM
( SELECT
udf_getState(address) as state
, user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
FROM
input_tbl ) input
WHERE state IS NOT NULL AND state != ''
GROUP BY state, phone_number, address)
第 3 步:
result.cache()
result.map(x => ...).saveAsNewAPIHadoopDataset(conf)
DAG 可视化如下所示:
如您所见,stage 0 中的 "Filter"、"Project" 和 "Exchange" 仅 运行ning 在一个实例中,stage1 和 stage2 也是如此,所以有几个问题,如果问题很愚蠢,我们深表歉意:
- "Filter"、"Project"和"Exchange"是否在每个执行器的数据混洗后发生在Driver中?
- 什么代码映射到 "Filter"、"Project" 和 "Exchange"?
- 如何并行 运行 "Filter"、"Project" 和 "Exchange" 来优化性能?
- 是否可以运行 stage1 和 stage2 并行?
这不是很明显,所以我会做以下事情来解决问题。
- 计算每个步骤的执行时间。
- 如果您的 table 是文本格式,第一步可能会很慢,如果数据以 parquet 格式存储在 Hive 中,spark 通常会工作得更好。
- 查看您的 table 是否按 where 子句中使用的列进行分区。
- 如果将数据保存到 Hbase 很慢,那么您可能需要预先拆分 hbase table,因为默认情况下数据存储在单个区域中。
- 查看 spark 中的阶段选项卡 ui 以查看每个阶段启动了多少任务,并按照描述查找本地级别的数据 here
希望您能够将问题归零。
您没有正确阅读 DAG 图 - 每个步骤都是使用单个框可视化的事实 并不意味着它没有使用多个 tasks(因此 cores)来计算该步骤。
您可以通过向下钻取到显示该阶段所有任务的阶段视图来查看每个步骤使用了多少任务。
例如,这是一个类似于您的 DAG 可视化示例:
您可以看到每个阶段都由 "single" 列步骤描述。
但是如果我们查看下面的 table,我们可以看到每个阶段的任务数:
其中一个只使用 2 个任务,而另一个使用 220 个,这意味着数据被拆分为 220 个分区,并且在提供足够的可用资源的情况下并行处理分区。
如果深入到该阶段,您会再次看到它使用了 220 个任务以及所有任务的详细信息。
只有任务从磁盘读取数据在图表中显示为具有这些"multiple dots"以帮助您了解读取了多少文件。
所以 - 正如 Rashid 的回答所建议的那样,检查每个阶段的任务数。
我是一个 spark 新手,我有一个简单的 spark 应用程序使用 Spark SQL/hiveContext 到:
- select 来自配置单元的数据 table(10 亿行)
- 做一些过滤,聚合包括 row_number over window function to select first row, group by, count() and max(), etc.
- 将结果写入HBase(亿行)
我在 yarn 集群(100 个执行程序)上将作业提交给 运行,它很慢,当我查看 Spark UI 中的 DAG 可视化时,似乎只有配置单元 table 扫描任务是 运行ning 并行的,其余步骤 #2 和上面的 #3 仅在一个实例中 运行ning 可能应该能够优化并行化?
应用程序看起来像:
第 1 步:
val input = hiveContext
.sql(
SELECT
user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
FROM
(
SELECT
user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
, row_number() over
(partition by user_id, address, phone_number, first_name, last_name order by user_id, address, phone_number, first_name, last_name, server_ts desc, age) AS rn
FROM
(
SELECT
user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
FROM
table
WHERE
phone_number <> '911' AND
server_date >= '2015-12-01' and server_date < '2016-01-01' AND
user_id IS NOT NULL AND
first_name IS NOT NULL AND
last_name IS NOT NULL AND
address IS NOT NULL AND
phone_number IS NOT NULL AND
) all_rows
) all_rows_with_row_number
WHERE rn = 1)
val input_tbl = input.registerTempTable(input_tbl)
第 2 步:
val result = hiveContext.sql(
SELECT state,
phone_number,
address,
COUNT(*) as hash_count,
MAX(server_ts) as latest_ts
FROM
( SELECT
udf_getState(address) as state
, user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
FROM
input_tbl ) input
WHERE state IS NOT NULL AND state != ''
GROUP BY state, phone_number, address)
第 3 步:
result.cache()
result.map(x => ...).saveAsNewAPIHadoopDataset(conf)
DAG 可视化如下所示:
如您所见,stage 0 中的 "Filter"、"Project" 和 "Exchange" 仅 运行ning 在一个实例中,stage1 和 stage2 也是如此,所以有几个问题,如果问题很愚蠢,我们深表歉意:
- "Filter"、"Project"和"Exchange"是否在每个执行器的数据混洗后发生在Driver中?
- 什么代码映射到 "Filter"、"Project" 和 "Exchange"?
- 如何并行 运行 "Filter"、"Project" 和 "Exchange" 来优化性能?
- 是否可以运行 stage1 和 stage2 并行?
这不是很明显,所以我会做以下事情来解决问题。
- 计算每个步骤的执行时间。
- 如果您的 table 是文本格式,第一步可能会很慢,如果数据以 parquet 格式存储在 Hive 中,spark 通常会工作得更好。
- 查看您的 table 是否按 where 子句中使用的列进行分区。
- 如果将数据保存到 Hbase 很慢,那么您可能需要预先拆分 hbase table,因为默认情况下数据存储在单个区域中。
- 查看 spark 中的阶段选项卡 ui 以查看每个阶段启动了多少任务,并按照描述查找本地级别的数据 here
希望您能够将问题归零。
您没有正确阅读 DAG 图 - 每个步骤都是使用单个框可视化的事实 并不意味着它没有使用多个 tasks(因此 cores)来计算该步骤。
您可以通过向下钻取到显示该阶段所有任务的阶段视图来查看每个步骤使用了多少任务。
例如,这是一个类似于您的 DAG 可视化示例:
您可以看到每个阶段都由 "single" 列步骤描述。
但是如果我们查看下面的 table,我们可以看到每个阶段的任务数:
其中一个只使用 2 个任务,而另一个使用 220 个,这意味着数据被拆分为 220 个分区,并且在提供足够的可用资源的情况下并行处理分区。
如果深入到该阶段,您会再次看到它使用了 220 个任务以及所有任务的详细信息。
只有任务从磁盘读取数据在图表中显示为具有这些"multiple dots"以帮助您了解读取了多少文件。
所以 - 正如 Rashid 的回答所建议的那样,检查每个阶段的任务数。