AWS Glue (Spark) 非常慢
AWS Glue (Spark) very slow
我继承了一些在 AWS Glue 上运行速度极慢的代码。
在作业中,它会创建许多动态帧,然后使用 spark.sql
将这些帧连接起来。从 MySQL 和 Postgres 数据库中读取表格,然后使用 Glue 将它们连接在一起,最终将另一个 table 写回 Postgres。
示例(注意 dbs 等已被重命名和简化,因为我无法直接粘贴我的实际代码)
jobName = args['JOB_NAME']
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(jobName, args)
# MySQL
glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "trans").toDF().createOrReplaceTempView("trans")
glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "types").toDF().createOrReplaceTempView("types")
glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "currency").toDF().createOrReplaceTempView("currency")
# DB2 (Postgres)
glueContext.create_dynamic_frame.from_catalog(database = "db2", table_name = "watermark").toDF().createOrReplaceTempView("watermark")
# transactions
new_transactions_df = spark.sql("[SQL CODE HERE]")
# Write to DB
conf_g = glueContext.extract_jdbc_conf("My DB")
url = conf_g["url"] + "/reporting"
new_transactions_df.write.option("truncate", "true").jdbc(url, "staging.transactions", properties=conf_g, mode="overwrite")
[SQL CODE HERE]
实际上是一个简单的 select 语句,将三个 table 连接在一起以产生一个输出,然后将其写入 staging.transactions table.
当我上次 运行 时,它只写了 150 行,但花了 9 分钟才完成。有人可以告诉我如何优化这个吗?
附加信息:
- 最大容量:6
- 工人类型:G.1X
- 工人人数:6
通常,当 reading/writing 数据在 spark 中使用 JDBC 驱动程序时,常见的问题是操作未并行化。以下是您可能想尝试的一些优化:
指定读取时的并行度
从您提供的代码看来,所有 tables 数据都是使用一个查询和一个 spark 执行程序读取的。
如果直接使用spark dataframe reader,可以设置选项partitionColumn
、lowerBound
、upperBound
、fetchSize
并行读取多个分区使用多个 worker,如 docs 中所述。示例:
spark.read.format("jdbc") \
#...
.option("partitionColumn", "partition_key") \
.option("lowerBound", "<lb>") \
.option("upperBound", "<ub>") \
.option("numPartitions", "<np>") \
.option("fetchsize", "<fs>")
使用读取分区时,请注意 spark 会并行发出多个查询,因此请确保数据库引擎支持它并优化索引,尤其是 partition_column
以避免整个 table 扫描.
在 AWS Glue 中,这可以通过使用参数 additional_options
:
传递附加选项来完成
To use a JDBC connection that performs parallel reads, you can set the
hashfield
, hashexpression
, or hashpartitions
options:
glueContext.create_dynamic_frame.from_catalog(
database = "db1",
table_name = "trans",
additional_options = {"hashfield": "transID", "hashpartitions": "10"}
).toDF().createOrReplaceTempView("trans")
Glue 文档对此进行了描述:Reading from JDBC Tables in Parallel
写入时使用batchsize
选项:
在您的特定情况下,不确定这是否有帮助,因为您只写了 150 行,但您可以指定此选项以提高写入性能:
new_transactions_df.write.format('jdbc') \
# ...
.option("batchsize", "10000") \
.save()
下推优化
您还可以通过将某些查询(筛选器、列选择)直接下推到数据库引擎来优化读取,而不是将整个 table 加载到动态框架中然后进行筛选。
在 Glue 中,这可以使用 push_down_predicate
参数来完成:
glueContext.create_dynamic_frame.from_catalog(
database = "db1",
table_name = "trans",
push_down_predicate = "(transDate > '2021-01-01' and transStatus='OK')"
).toDF().createOrReplaceTempView("trans")
见Glue programming ETL partitions pushdowns
使用数据库实用程序批量插入/导出 tables
在某些情况下,您可以考虑使用数据库引擎将 table 导出到文件中,然后从文件中读取。同样意味着写入时,首先写入文件然后使用 db bulk insert 命令。这可以避免将 Spark 与 JDBC.
一起使用的瓶颈
Glue spark 集群启动通常只需要10分钟。所以那个时间(9分钟)似乎是合理的(除非你运行 Glue2.0,但你没有指定你使用的胶水版本)。
启用指标:
AWS Glue 提供 Amazon CloudWatch 指标,可用于提供有关执行者的信息以及每个执行者完成的工作量。您可以通过执行以下操作之一在您的 AWS Glue 作业上启用 CloudWatch 指标:
使用特殊参数:将以下参数添加到您的 AWS Glue 作业。此参数允许您为您的工作 运行 收集工作分析指标。这些指标在 AWS Glue 控制台和 CloudWatch 控制台上可用。
Key: --enable-metrics
使用 AWS Glue 控制台:要对现有作业启用指标,请执行以下操作:
- 打开 AWS Glue 控制台。
- 在导航窗格中,选择作业。
- Select 您要为其启用指标的作业。
- 选择操作,然后选择编辑作业。
- 在监控选项下,select 作业
指标。
- 选择保存。
礼貌:https://softans.com/aws-glue-etl-job-running-for-a-long-time/
我继承了一些在 AWS Glue 上运行速度极慢的代码。
在作业中,它会创建许多动态帧,然后使用 spark.sql
将这些帧连接起来。从 MySQL 和 Postgres 数据库中读取表格,然后使用 Glue 将它们连接在一起,最终将另一个 table 写回 Postgres。
示例(注意 dbs 等已被重命名和简化,因为我无法直接粘贴我的实际代码)
jobName = args['JOB_NAME']
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(jobName, args)
# MySQL
glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "trans").toDF().createOrReplaceTempView("trans")
glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "types").toDF().createOrReplaceTempView("types")
glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "currency").toDF().createOrReplaceTempView("currency")
# DB2 (Postgres)
glueContext.create_dynamic_frame.from_catalog(database = "db2", table_name = "watermark").toDF().createOrReplaceTempView("watermark")
# transactions
new_transactions_df = spark.sql("[SQL CODE HERE]")
# Write to DB
conf_g = glueContext.extract_jdbc_conf("My DB")
url = conf_g["url"] + "/reporting"
new_transactions_df.write.option("truncate", "true").jdbc(url, "staging.transactions", properties=conf_g, mode="overwrite")
[SQL CODE HERE]
实际上是一个简单的 select 语句,将三个 table 连接在一起以产生一个输出,然后将其写入 staging.transactions table.
当我上次 运行 时,它只写了 150 行,但花了 9 分钟才完成。有人可以告诉我如何优化这个吗?
附加信息:
- 最大容量:6
- 工人类型:G.1X
- 工人人数:6
通常,当 reading/writing 数据在 spark 中使用 JDBC 驱动程序时,常见的问题是操作未并行化。以下是您可能想尝试的一些优化:
指定读取时的并行度
从您提供的代码看来,所有 tables 数据都是使用一个查询和一个 spark 执行程序读取的。
如果直接使用spark dataframe reader,可以设置选项partitionColumn
、lowerBound
、upperBound
、fetchSize
并行读取多个分区使用多个 worker,如 docs 中所述。示例:
spark.read.format("jdbc") \
#...
.option("partitionColumn", "partition_key") \
.option("lowerBound", "<lb>") \
.option("upperBound", "<ub>") \
.option("numPartitions", "<np>") \
.option("fetchsize", "<fs>")
使用读取分区时,请注意 spark 会并行发出多个查询,因此请确保数据库引擎支持它并优化索引,尤其是 partition_column
以避免整个 table 扫描.
在 AWS Glue 中,这可以通过使用参数 additional_options
:
To use a JDBC connection that performs parallel reads, you can set the
hashfield
,hashexpression
, orhashpartitions
options:
glueContext.create_dynamic_frame.from_catalog(
database = "db1",
table_name = "trans",
additional_options = {"hashfield": "transID", "hashpartitions": "10"}
).toDF().createOrReplaceTempView("trans")
Glue 文档对此进行了描述:Reading from JDBC Tables in Parallel
写入时使用batchsize
选项:
在您的特定情况下,不确定这是否有帮助,因为您只写了 150 行,但您可以指定此选项以提高写入性能:
new_transactions_df.write.format('jdbc') \
# ...
.option("batchsize", "10000") \
.save()
下推优化
您还可以通过将某些查询(筛选器、列选择)直接下推到数据库引擎来优化读取,而不是将整个 table 加载到动态框架中然后进行筛选。
在 Glue 中,这可以使用 push_down_predicate
参数来完成:
glueContext.create_dynamic_frame.from_catalog(
database = "db1",
table_name = "trans",
push_down_predicate = "(transDate > '2021-01-01' and transStatus='OK')"
).toDF().createOrReplaceTempView("trans")
见Glue programming ETL partitions pushdowns
使用数据库实用程序批量插入/导出 tables
在某些情况下,您可以考虑使用数据库引擎将 table 导出到文件中,然后从文件中读取。同样意味着写入时,首先写入文件然后使用 db bulk insert 命令。这可以避免将 Spark 与 JDBC.
一起使用的瓶颈Glue spark 集群启动通常只需要10分钟。所以那个时间(9分钟)似乎是合理的(除非你运行 Glue2.0,但你没有指定你使用的胶水版本)。
启用指标:
AWS Glue 提供 Amazon CloudWatch 指标,可用于提供有关执行者的信息以及每个执行者完成的工作量。您可以通过执行以下操作之一在您的 AWS Glue 作业上启用 CloudWatch 指标:
使用特殊参数:将以下参数添加到您的 AWS Glue 作业。此参数允许您为您的工作 运行 收集工作分析指标。这些指标在 AWS Glue 控制台和 CloudWatch 控制台上可用。
Key: --enable-metrics
使用 AWS Glue 控制台:要对现有作业启用指标,请执行以下操作:
- 打开 AWS Glue 控制台。
- 在导航窗格中,选择作业。
- Select 您要为其启用指标的作业。
- 选择操作,然后选择编辑作业。
- 在监控选项下,select 作业 指标。
- 选择保存。
礼貌:https://softans.com/aws-glue-etl-job-running-for-a-long-time/