AWS Glue (Spark) 非常慢

AWS Glue (Spark) very slow

我继承了一些在 AWS Glue 上运行速度极慢的代码。

在作业中,它会创建许多动态帧,然后使用 spark.sql 将这些帧连接起来。从 MySQL 和 Postgres 数据库中读取表格,然后使用 Glue 将它们连接在一起,最终将另一个 table 写回 Postgres。

示例(注意 dbs 等已被重命名和简化,因为我无法直接粘贴我的实际代码)

jobName = args['JOB_NAME']
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(jobName, args)
    
# MySQL
glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "trans").toDF().createOrReplaceTempView("trans")
glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "types").toDF().createOrReplaceTempView("types")
glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "currency").toDF().createOrReplaceTempView("currency")

# DB2 (Postgres)
glueContext.create_dynamic_frame.from_catalog(database = "db2", table_name = "watermark").toDF().createOrReplaceTempView("watermark")

# transactions
new_transactions_df = spark.sql("[SQL CODE HERE]")

# Write to DB
conf_g = glueContext.extract_jdbc_conf("My DB")
url = conf_g["url"] + "/reporting"

new_transactions_df.write.option("truncate", "true").jdbc(url, "staging.transactions", properties=conf_g, mode="overwrite")

[SQL CODE HERE] 实际上是一个简单的 select 语句,将三个 table 连接在一起以产生一个输出,然后将其写入 staging.transactions table.

当我上次 运行 时,它只写了 150 行,但花了 9 分钟才完成。有人可以告诉我如何优化这个吗?

附加信息:

通常,当 reading/writing 数据在 spark 中使用 JDBC 驱动程序时,常见的问题是操作未并行化。以下是您可能想尝试的一些优化:

指定读取时的并行度

从您提供的代码看来,所有 tables 数据都是使用一个查询和一个 spark 执行程序读取的。

如果直接使用spark dataframe reader,可以设置选项partitionColumnlowerBoundupperBoundfetchSize并行读取多个分区使用多个 worker,如 docs 中所述。示例:

spark.read.format("jdbc") \
    #...
    .option("partitionColumn", "partition_key") \
    .option("lowerBound", "<lb>") \
    .option("upperBound", "<ub>") \
    .option("numPartitions", "<np>") \
    .option("fetchsize", "<fs>")

使用读取分区时,请注意 spark 会并行发出多个查询,因此请确保数据库引擎支持它并优化索引,尤其是 partition_column 以避免整个 table 扫描.

在 AWS Glue 中,这可以通过使用参数 additional_options:

传递附加选项来完成

To use a JDBC connection that performs parallel reads, you can set the hashfield, hashexpression, or hashpartitions options:

glueContext.create_dynamic_frame.from_catalog(
    database = "db1", 
    table_name = "trans", 
    additional_options = {"hashfield": "transID", "hashpartitions": "10"}
).toDF().createOrReplaceTempView("trans")

Glue 文档对此进行了描述:Reading from JDBC Tables in Parallel

写入时使用batchsize选项:

在您的特定情况下,不确定这是否有帮助,因为您只写了 150 行,但您可以指定此选项以提高写入性能:

new_transactions_df.write.format('jdbc') \
    # ...
    .option("batchsize", "10000") \
    .save()

下推优化

您还可以通过将某些查询(筛选器、列选择)直接下推到数据库引擎来优化读取,而不是将整个 table 加载到动态框架中然后进行筛选。

在 Glue 中,这可以使用 push_down_predicate 参数来完成:

glueContext.create_dynamic_frame.from_catalog(
    database = "db1", 
    table_name = "trans", 
    push_down_predicate = "(transDate > '2021-01-01' and transStatus='OK')"
).toDF().createOrReplaceTempView("trans")

Glue programming ETL partitions pushdowns

使用数据库实用程序批量插入/导出 tables

在某些情况下,您可以考虑使用数据库引擎将 table 导出到文件中,然后从文件中读取。同样意味着写入时,首先写入文件然后使用 db bulk insert 命令。这可以避免将 Spark 与 JDBC.

一起使用的瓶颈

Glue spark 集群启动通常只需要10分钟。所以那个时间(9分钟)似乎是合理的(除非你运行 Glue2.0,但你没有指定你使用的胶水版本)。

https://aws.amazon.com/es/about-aws/whats-new/2020/08/aws-glue-version-2-featuring-10x-faster-job-start-times-1-minute-minimum-billing-duration/#:~:text=With%20Glue%20version%202.0%2C%20job,than%20a%2010%20minute%20minimum.

启用指标:

AWS Glue 提供 Amazon CloudWatch 指标,可用于提供有关执行者的信息以及每个执行者完成的工作量。您可以通过执行以下操作之一在您的 AWS Glue 作业上启用 CloudWatch 指标:

使用特殊参数:将以下参数添加到您的 AWS Glue 作业。此参数允许您为您的工作 运行 收集工作分析指标。这些指标在 AWS Glue 控制台和 CloudWatch 控制台上可用。

   Key: --enable-metrics

使用 AWS Glue 控制台:要对现有作业启用指标,请执行以下操作:

  1. 打开 AWS Glue 控制台。
  2. 在导航窗格中,选择作业。
  3. Select 您要为其启用指标的作业。
  4. 选择操作,然后选择编辑作业。
  5. 在监控选项下,select 作业 指标。
  6. 选择保存。

礼貌:https://softans.com/aws-glue-etl-job-running-for-a-long-time/