如何加速 spark df.write jdbc 到 postgres 数据库?

How to speed up spark df.write jdbc to postgres database?

我是 spark 的新手,我正在尝试使用 df.write:

加快将数据框的内容(可能有 200k 到 2M 行)附加到 postgres 数据库
df.write.format('jdbc').options(
      url=psql_url_spark,
      driver=spark_env['PSQL_DRIVER'],
      dbtable="{schema}.{table}".format(schema=schema, table=table),
      user=spark_env['PSQL_USER'],
      password=spark_env['PSQL_PASS'],
      batchsize=2000000,
      queryTimeout=690
      ).mode(mode).save()

我尝试增加批处理大小但没有帮助,因为完成此任务仍然需要大约 4 小时。我还在下面包含了一些来自 aws emr 的快照,显示了有关作业 运行 的更多详细信息。将数据框保存到 postgres table 的任务只分配给了一个执行者(我发现它是 st运行ge),加速这个任务是否涉及在执行者之间划分这个任务?

此外,我已阅读 spark's performance tuning docs,但增加 batchsizequeryTimeout 似乎并未提高性能。 (我尝试在 df.write 之前在我的脚本中调用 df.cache(),但脚本的运行时间仍然是 4 小时)

此外,我的 aws emr 硬件设置和 spark-submit 是:

主节点(1):m4.xlarge

核心节点(2):m5.xlarge

spark-submit --deploy-mode client --executor-cores 4 --num-executors 4 ...

要解决性能问题,一般需要解决以下2个瓶颈:

  1. 确保 spark 作业正在将数据并行写入数据库 - 要解决此问题,请确保您有一个分区数据框。使用“df.repartition(n)”对数据帧进行分区,以便每个分区都并行写入数据库。 注意 - 大量的执行者也会导致插入速度变慢。所以从 5 个分区开始,然后将分区数增加 5,直到获得最佳性能。
  2. 确保数据库具有摄取大量数据所需的足够计算、内存和存储空间。

Spark 是一个分布式数据处理引擎,因此当您处理数据或将其保存在文件系统上时,它会使用其所有执行程序来执行任务。 Spark JDBC 很慢,因为当您建立 JDBC 连接时,其中一个执行程序会建立 link 到目标数据库,因此导致速度慢和失败。

要克服此问题并加快数据写入数据库的速度,您需要使用以下方法之一:

方法一:

在这种方法中,您需要使用 postgres COPY 命令实用程序 以加快写入操作。这要求您的 EMR 集群上有 psycopg2 库。

COPY 实用程序的文档是 here

如果您想了解基准差异以及复制速度更快的原因,请访问

Postgres 还建议使用 COPY 命令进行批量插入。现在如何批量插入 spark 数据框。 现在要实现更快的写入,首先将您的 spark 数据帧以 csv 格式保存到 EMR 文件系统,并重新分区您的输出,以便没有文件包含超过 100k 行。

#Repartition your dataframe dynamically based on number of rows in df
df.repartition(10).write.option("maxRecordsPerFile", 100000).mode("overwrite").csv("path/to/save/data)

现在使用python读取文件并对每个文件执行复制命令。

import psycopg2    
#iterate over your files here and generate file object you can also get files list using os module
file = open('path/to/save/data/part-00000_0.csv')
file1 = open('path/to/save/data/part-00000_1.csv')

#define a function
def execute_copy(fileName):
    con = psycopg2.connect(database=dbname,user=user,password=password,host=host,port=port)
    cursor = con.cursor()
    cursor.copy_from(fileName, 'table_name', sep=",")
    con.commit()
    con.close()

为了获得额外的速度提升,因为您使用的是 EMR 集群,您可以利用 python 多处理一次复制多个文件。

from multiprocessing import Pool, cpu_count
with Pool(cpu_count()) as p:
        print(p.map(execute_copy, [file,file1]))

这是推荐的方法,因为由于连接限制,无法调整 spark JDBC 以获得更高的写入速度。

方法二: 由于您已经在使用 AWS EMR 集群,因此您始终可以利用 hadoop 功能来更快地执行 table 写入。 所以在这里我们将使用 sqoop export 将我们的数据从 emrfs 导出到 postgres 数据库。

#If you are using s3 as your source path
sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir s3://mybucket/myinputfiles/ --driver org.postgresql.Driver --username master --password password --input-null-string '\N' --input-null-non-string '\N' --direct -m 16

#If you are using EMRFS as your source path
sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir /path/to/save/data/ --driver org.postgresql.Driver --username master --password password --input-null-string '\N' --input-null-non-string '\N' --direct -m 16

为什么选择 sqoop? 因为sqoop是根据指定的mapper个数与数据库开启多个连接。因此,如果您将 -m 指定为 8,那么将有 8 个并发连接流,并且这些连接流会将数据写入 postgres。

此外,有关使用 sqoop 的更多信息,请阅读此 AWS Blog, SQOOP Considerations and SQOOP Documentation

如果您可以使用代码破解您的方法,那么方法 1 肯定会给您带来您所寻求的性能提升,如果您对 table 像 SQOOP 这样的 hadoop 组件感到满意,那么请使用第二种方法。

希望对您有所帮助!

Spark 端调整 => 在 Datafarme 上执行 repartition 以便有多个执行程序并行写入数据库

df
.repartition(10)        // No. of concurrent connection Spark to PostgreSQL
.write.format('jdbc').options(
      url=psql_url_spark,
      driver=spark_env['PSQL_DRIVER'],
      dbtable="{schema}.{table}".format(schema=schema, table=table),
      user=spark_env['PSQL_USER'],
      password=spark_env['PSQL_PASS'],
      batchsize=2000000,
      queryTimeout=690
      ).mode(mode).save()

Postgresql 端调优=> 在 PostgreSQL 上需要分别提高以下参数。

  1. max_connections决定最大并发数 与数据库服务器的连接。默认值通常为 100 连接。
  2. shared_buffers配置参数决定多少 内存专供 PostgreSQL 用于缓存数据。

通过重新分区数据框,您可以获得更好的写入性能,这是一个已知的答案。但是有一种重新分区数据框的最佳方法。 由于您是 运行 EMR 集群上的此过程,因此首先了解每个从属实例上 运行 的实例类型和核心数。根据该指定数据帧上的分区数。 在您的情况下,您使用的是 m5.xlarge(2 slaves),每个有 4 个 vCPU,这意味着每个实例有 4 个线程。因此,当您处理大量数据时,8 个分区将为您提供最佳结果。

注意:应根据您的数据大小增加或减少分区数。

注意:批量大小也是您在写入时应考虑的因素。批量越大性能越好